Class ConcurrentQ<I,R>

java.lang.Object
generic.concurrent.ConcurrentQ<I,R>
Type Parameters:
I - The type of the items to be processed.
R - The type of objects resulting from processing an item; if you don't care about the return value, then make this value whatever you want, like Object or the same value as I and return null from QCallback.process(Object, TaskMonitor).

public class ConcurrentQ<I,R> extends Object
A queue for easily scheduling tasks to be run in parallel (or sequentially) via a thread pool. This class provides a clean separation of items that need to be processed from the algorithm that does the processing, making it easy to parallelize the processing of multiple items. Further, you can control the maximum number of items that can be processed concurrently. This is useful to throttle operations that may starve the other threads in the system. You may also control how many items get placed into the queue at one time, blocking if some threshold is exceeded.

Examples:


Put and Forget:

 QCallback<ITEM, RESULT> callback = new AbstractQCallback<ITEM, RESULT>() {
     public RESULT process(ITEM item, TaskMonitor monitor) {
         // do work here...
     }
 };

 ConcurrentQBuilder<ITEM, RESULT> builder = new ConcurrentQBuilder<ITEM, RESULT>();
 builder.setThreadPoolName("Thread Pool Name");
 concurrentQ = builder.getQueue(callback);
 ...
 ...
 concurrentQ.add(item); // where item is one of the instances of ITEM

 

Put Items and Handle Results in Any Order as They Available:

 QCallback<ITEM, RESULT> callback = new AbstractQCallback<ITEM, RESULT>() {
     public RESULT process(ITEM item, TaskMonitor monitor) {
         // do work here...
     }
 };

 QItemListener<ITEM, RESULT> itemListener = new QItemListener<ITEM, RESULT>() {
     public void itemProcessed(QResult<ITEM, RESULT> result) {
         RESULT result = result.getResult();
             // work on my result...
         }
 };

 ConcurrentQBuilder<ITEM, RESULT> builder = new ConcurrentQBuilder<ITEM, RESULT>();
 builder.setThreadPoolName("Thread Pool Name");
 builder.setListener(itemListener);
 concurrentQ = builder.build(callback);
 ...
 ...
 concurrentQ.add(item); // where item is one of the instances of ITEM
 concurrentQ.add(item);
 concurrentQ.add(item);

 

Put Items and Handle Results When All Items Have Been Processed:

 QCallback<ITEM, RESULT> callback = new AbstractQCallback<ITEM, RESULT>() {
     public RESULT process(ITEM item, TaskMonitor monitor) {
         // do work here...
     }
 };

 ConcurrentQBuilder<ITEM, RESULT> builder = new ConcurrentQBuilder<ITEM, RESULT>();
 builder.setThreadPoolName("Thread Pool Name");
 builder.setCollectResults(true);
 concurrentQ = builder.getQueue(callback);
 ...
 ...
 concurrentQ.add(item); // where item is one of the instances of ITEM
 concurrentQ.add(item);
 concurrentQ.add(item);
 ...

 List<QResult<I, R>> results = concurrentQ.waitForResults();
 // process the results...

 

Put Items, Blocking While Full, and Handle Results in Any Order as They Available:

 QCallback<ITEM, RESULT> callback = new AbstractQCallback<ITEM, RESULT>() {
     public RESULT process(ITEM item, TaskMonitor monitor) {
         // do work here...
     }
 };

 QItemListener<ITEM, RESULT> itemListener = new QItemListener<ITEM, RESULT>() {
     public void itemProcessed(QResult<ITEM, RESULT> result) {
         RESULT result = result.getResult();
             // work on my result...
         }
 };

 ConcurrentQBuilder<ITEM, RESULT> builder = new ConcurrentQBuilder<ITEM, RESULT>();
 builder.setThreadPoolName("Thread Pool Name");
 builder.setQueue(new LinkedBlockingQueue(100));
 concurrentQ = builder.getQueue(callback);
 ...
 ...
 Iterator<ITEM> iterator = <get an iterator for 1000s of items somewhere>
 concurrentQ.offer(iterator); // this call will block when the queue fills up (100 items or more)

 

  • Constructor Details

    • ConcurrentQ

      public ConcurrentQ(String name, QCallback<I,R> callback)
      Creates a ConcurrentQ that will process as many items as the given threadPool can handle at one time.
      Parameters:
      name - The name of the thread pool that will be created by this constructor.
      callback - the QWorker object that will be used to process items concurrently.
    • ConcurrentQ

      public ConcurrentQ(QCallback<I,R> callback, Queue<I> queue, GThreadPool threadPool, QItemListener<I,R> listener, boolean collectResults, int maxInProgress, boolean jobsReportProgress)
      Creates a ConcurrentQ that will process at most maxInProgress items at a time, regardless of how many threads are available in the GThreadPool.
      Parameters:
      callback - the QWorker object that will be used to process items concurrently.
      queue - the internal storage queue to use in this concurrent queue.
      threadPool - the GThreadPool to used for providing the threads for concurrent processing.
      listener - An optional QItemListener that will be called back with results when the item has been processed.
      collectResults - specifies if this queue should collect the results as items are processed so they can be returned in a waitForResults() call.
      maxInProgress - specifies the maximum number of items that can be process at a time. If this is set to 0, then this queue will attempt to execute as many items at a time as there are threads in the given threadPool. Setting this parameter to 1 will have the effect of guaranteeing that all times are processed one at a time in the order they were submitted. Any other positive value will run that many items concurrently, up to the number of available threads.
      jobsReportProgress - true signals that jobs wish to report progress via their task monitor. The default is false, which triggers this queue to report an overall progress for each job that is processed. False is a good default for clients that have a finite number of jobs to be done.
  • Method Details

    • addProgressListener

      public void addProgressListener(QProgressListener<I> listener)
      Adds a progress listener for this queue. All the progress and messages reported by a QWorker will be routed to these listener.
      Parameters:
      listener - the listener for receiving progress and message notifications.
    • removeProgressListener

      public void removeProgressListener(QProgressListener<I> listener)
      Removes a progress listener from this queue. All the progress and messages reported by a QWorker will be routed to this listener.
      Parameters:
      listener - the listener for receiving progress and message notifications.
    • setMonitor

      public void setMonitor(TaskMonitor monitor, boolean cancelClearsAllItems)
      Sets the monitor to use with this queue.
      Parameters:
      monitor - the monitor to attache to this queue
      cancelClearsAllItems - if true, cancelling the monitor will cancel all items currently being processed by a thread and clear the scheduled items that haven't yet run. If false, only the items currently being processed will be cancelled.
    • add

      public void add(Collection<I> items)
      Adds the list of items to this queue for concurrent processing.
      Parameters:
      items - the items to be scheduled for concurrent processing
    • add

      public void add(Iterator<I> iterator)
      Adds the items of the given iterator to this queue for concurrent processing.
      Parameters:
      iterator - an iterator from which the items to be scheduled for concurrent processing will be taken.
    • offer

      public void offer(Iterator<I> iterator) throws InterruptedException
      Allows clients to use a bounded queue (such as a LinkedBlockingQueue to control how many items get placed into this queue at one time. Calling the add methods will place all items into the queue, which for a large number of items, can consume a large amount of memory. This method will block once the queue at maximum capacity, continuing to add new items as existing items on the queue are processed.

      To enable blocking on the queue when it is full, construct this ConcurrentQ with an instance of BlockingQueue.

      Parameters:
      iterator - An iterator from which items will be taken.
      Throws:
      InterruptedException - if this queue is interrupted while waiting to add more items
    • add

      public void add(I item)
      Adds the item to this queue for concurrent processing.
      Parameters:
      item - the item to be scheduled for concurrent processing.
    • isEmpty

      public boolean isEmpty()
      Returns true if this queue has no items waiting to be processed or currently being processed.
      Returns:
      true if this queue has no items waiting to be processed or currently being processed.
    • waitForResults

      public Collection<QResult<I,R>> waitForResults() throws InterruptedException
      Waits until all scheduled items have been completed or cancelled and returns a list of QResults if this queue has been told to collect results.

      You can still call this method to wait for items to be processed, even if you did not specify to collect results. In that case, the list returned will be empty.

      Returns:
      the list of QResult objects that have all the results of the completed jobs.
      Throws:
      InterruptedException - if this call was interrupted--Note: this interruption only happens if the calling thread cannot acquire the lock. If the thread is interrupted while waiting for results, then it will try again.
    • waitForNextResult

      public QResult<I,R> waitForNextResult() throws InterruptedException
      Wait until at least one result is available and then return the first result.
      Returns:
      the first available result
      Throws:
      InterruptedException - if interrupted while waiting for a result
      IllegalStateException - if this queue has been set to not collect results (see the constructor).
    • waitUntilDone

      public void waitUntilDone() throws InterruptedException, Exception
      Waits until all items have been processed OR an Exception happens during the processing of ANY item.

      Note: If an exception does occur then the remaining items in the queue will be cleared and all current items will be cancelled.

      If you wish for processing to continue for remaining items when any item encounters an exception, then you should instead use waitForResults(). That method will return all results, both with and without exceptions, which you can then process, including checking for exceptions. Note that to use waitForResults() to examine exceptions, you must have created this queue with collectResults as true.

      Throws:
      InterruptedException - if interrupted while waiting for a result
      Exception - any exception encountered while processing an item (this will cancel all items in the queue).
    • waitForResults

      public Collection<QResult<I,R>> waitForResults(long timeout, TimeUnit unit) throws InterruptedException
      Waits up to the specified time for scheduled jobs to complete. The results of all completed jobs will be returned if this queue has been told to collect results. At the time that this returns, there may still be work to process. The returned list will contain as much work as has been processed when the wait has finished. Repeated calls to this method will not return results from previous waits.

      You can still call this method to wait for items to be processed, even if you did not specify to collect results. In that case, the list returned will be empty.

      Parameters:
      timeout - the timeout
      unit - the timeout unit
      Returns:
      the list of QResult objects that have all the results of the completed jobs.
      Throws:
      InterruptedException - if this call was interrupted.
    • cancelAllTasks

      public List<I> cancelAllTasks(boolean interruptRunningTasks)
      Cancels the processing of currently scheduled items in this queue. Any items that haven't yet been scheduled on the threadPool are returned immediately from this call. Items that are currently being processed will be cancelled and those results will be available on the next waitForResults() call and also if there is a QItemListener, it will be called with the QResult. There is no guarantee that scheduled tasks will terminate any time soon. If they check the isCancelled() state of their QMonitor, it will be true. Setting the interruptRunningTasks to true, will result in a thread interrupt to any currently running task which might be useful if the task perform waiting operations like I/O.
      Parameters:
      interruptRunningTasks - if true, an attempt will be made to interrupt any currently processing thread.
      Returns:
      a list of all items that have not yet been queued to the threadPool.
    • removeUnscheduledJobs

      public List<I> removeUnscheduledJobs()
    • cancelScheduledJobs

      public void cancelScheduledJobs()
    • dispose

      public void dispose()
      Cancels all running tasks and disposes of the internal thread pool if it is a private pool.
    • waitUntilDone

      public boolean waitUntilDone(long timeout, TimeUnit unit) throws InterruptedException
      Throws:
      InterruptedException