T - public abstract class MultiThreadedProcessingSystem<T> extends Object
add(Object) for each entry. This is not thread safe.process(Object) for each entry, potentially out of order if numberOfProcessingThreads>1 and potentially in a different thread from add(Object)finishedProcessing(Object) for each entry, guaranteed to be in the same order as they were added but potentially in a different thread from add(Object)add(Object) without synchronization
final AtomicLong totalLength = new AtomicLong();
MultiThreadedProcessingSystem<NucleotideSequenceDocument> processingSystem = new MultiThreadedProcessingSystem<NucleotideSequenceDocument>("testProcess",10,10,Runtime.getRuntime().availableProcessors()) {MultiThreadedTask| Constructor and Description |
|---|
MultiThreadedProcessingSystem(String threadNamePrefix,
int maxBacklogSize,
int valuesPerBacklogEntry,
int numberOfProcessingThreads) |
| Modifier and Type | Method and Description |
|---|---|
void |
add(T entry)
Adds a new entry to be processed.
|
protected void |
batchProcess(List<T> values)
Processes multiple values in a single thread.
|
void |
finishedProcessing(T entry)
An implementation may optionally override this to perform any additional cleanup.
|
abstract void |
process(T t)
Processes a single entry previously added using
add(Object) |
void |
setThresholdToStartThreads(int thresholdToStartThreads)
Sets a threshold for the number of items to be added before this processing systems
starts creating additional threads.
|
void |
waitForCompletion()
Waits until all items added using
add(Object) have been processed by process(Object) and have returned from finishedProcessing(Object) |
public MultiThreadedProcessingSystem(String threadNamePrefix, int maxBacklogSize, int valuesPerBacklogEntry, int numberOfProcessingThreads)
threadNamePrefix - a prefix for the name to use for any threads createdmaxBacklogSize - the maximum number of batches of valuesPerBacklogEntry entries to store in memory ready awaiting processing. add(Object) will block as long as this threshold is reached.valuesPerBacklogEntry - the number of entries added using add(Object) to process in a block. Up to this many entries will passed to batchProcess(java.util.List)numberOfProcessingThreads - the number of threads to use for processing once a minimum number of items have been added as specified by setThresholdToStartThreads(int). You may want to use Runtime.getRuntime().availableProcessors()public void setThresholdToStartThreads(int thresholdToStartThreads)
thresholdToStartThreads - the number of items to be be added before this processing systems
starts creating additional threadspublic void add(T entry) throws DocumentOperationException
entry - the entry to be processed.DocumentOperationException - if the processing method (process(Object) throws a DocumentOperationExceptionprotected void batchProcess(List<T> values) throws DocumentOperationException
process(Object)
for each value.values - all the values to processDocumentOperationException - if there is a problem processing any of the entriespublic abstract void process(T t) throws DocumentOperationException
add(Object)t - the entry to processDocumentOperationException - if there is a problem processing this entry. In this case this exception will be thrown by either add(Object) or waitForCompletion()public void finishedProcessing(T entry) throws DocumentOperationException
entry - the entry that has been processed.DocumentOperationException - if there is a problem completing processing of the entrypublic void waitForCompletion()
throws DocumentOperationException
add(Object) have been processed by process(Object) and have returned from finishedProcessing(Object)DocumentOperationException - if either process(Object) or finishedProcessing(Object) threw a DocumentOperationException