How to implement multi-threaded polling

I have a small single threaded app that continually polls a remote server for new items. If any items are found, it processes a batch of 50 items (max).  After the items are processed it sends a request to the remote server, telling it to mark those 50 items as processed (if successful). Finally it closes the connection and sleeps for a few minutes. Then grabs the next batch of items. In psuedo-code it looks something like this:

    while (!canceled) {

       SomeConnection conn = getConnectionToRemoteServer();
       List<Foo> items = conn.getNewItems(50);

       List<Foo> processed = new ArrayList<Foo>();
       for (Foo foo: items) {
           successful = processItem(foo);

           if (successful) {
              processed.add(foo);
           }
           else {
              doSomethingElse();
           }

           if (canceled) {
               break;
           }
       }  

       conn.removeItems(processed);              
       
       //... close connection
      Thread.sleep(someInterval);
    }

This all works fine. However, I would like to speed up the process by splitting up the work across multiple threads. Ideally, I'd like to submit the 50 tasks for processing, and do something when all tasks are finished. (Then begin again, when the next batch is received). But also support the ability to cancel the current batch at any time and shutdown the app if needed.

I'm looking for general ideas on how to accomplish this. I've been reading about ExcecutorService and CompletionService, and it seems promising, but I'm not clear on how to structure the overall process.  

Again, I'm just looking for high level advice on fitting the various pieces together rather than actual code.  Any thoughts or suggestions would be greatly appreciated!
LVL 53
_agx_Asked:
Who is Participating?
 
dpearsonCommented:
I think you can do this with an ExecutorService.

The basic structure would be:

a) Create the executor (you choose how many threads will actually do the work here)
b) Create a list of tasks
c) Each task is a Callable instance which means they have a call() method that does all the work for one request - submit request to remote app, wait for result.
d) When you submit them to the executor (via invokeAll) you'll get back a list of Future objects.  Keep those futures.
e) Now your master app can do whatever it wants (including just calling awaitTermination() on the executor if you just want to wait for the work to be done)
f) If you want to abort the requests, go through the futures and call cancel() on them, which will trigger an InterruptedException inside the tasks - which will break them out of where they are waiting for a response from the remote host and let you abort.

I hope that helps get you started in the right direction,

Doug
0
 
_agx_Author Commented:
Thanks, that helps solidify some of what I've been reading in the javadocs.

One part I'm unclear on.. since the polling would be an ongoing process is the ExecutorService intended to be created once for each batch of tasks ... or would it be created once and reused many times?  I was thinking the latter, but I'm not sure.  If it could be done either way, what are the pros/cons?

Also, any advantage to ExecutorService over CompletionService?
0
 
dpearsonCommented:
An executorService is a pool of threads.  So usually there's no advantage to disposing of all of the threads and creating new ones between batches of tasks.  It's better to just re-use them, just so you don't have the overhead of extra thread creation.

I've not personally used CompletionServices (and I've used dozens of executor services) but I believe the benefit is when you are more interested in getting access to the results from the executor as soon as they become available.  So where you want to have the Callable return an object to you and you want to handle those objects.

I find that's rarely the pattern I use (generally the task is updating some other state) and it sounds like you would most likely be doing that too with the "processItem()" sort of model.  But if you instead had more of a "getMeTheResult()" model then a completion service should make it easier to check for which tasks are "done now" and so where those results are available when each task could take different amounts of time.

A good example of this would be if you had a series of queues and you wanted to pass results from one to the next queue, with each queue implemented as a thread pool.  A completion service would potentially make that more efficient.  I think you can do the same with an executor service, it's just you'd need to write more of the code to check for what's done yourself.

Anyway I'm thinking an executor is probably enough for you here.

Doug
0
Free Tool: Subnet Calculator

The subnet calculator helps you design networks by taking an IP address and network mask and returning information such as network, broadcast address, and host range.

One of a set of tools we're offering as a way of saying thank you for being a part of the community.

 
_agx_Author Commented:
> getting access to the results from the executor as soon as they become available.  

Hm.. that aspect could be applicable to my process, ie Run the pre-processing steps which *can* be cancelled:

             runStep1();
             runStep2();

Then pass off the result to another process to do the notifications and db inserts: But I don't know that it entirely fits the multi queues model. That might add a layer of complexity I don't think is needed. I'll have to give it some more thought.

> d) When you submit them to the executor (via invokeAll) you'll get back a list of Future objects.  Keep those futures.

With regards to the returned Futures list, doesn't invokeAll block until the tasks are completed? In which case, I couldn't loop through the FutureTasks's and invoke cancel(..) on each one.
0
 
dpearsonCommented:
With regards to the returned Futures list, doesn't invokeAll block until the tasks are completed? In which case, I couldn't loop through the FutureTasks's and invoke cancel(..) on each one.

Yep - sorry I mispoke there.  You don't want to use invokeAll in your case.  You just want to walk the list of tasks and submit() them - which will give you the Future, which you keep :)

Doug
0
 
_agx_Author Commented:
Gotcha.  Thanks for your clear responses. Exactly the kind of sounding board I was looking for :)  

Cheers
0
Question has a verified solution.

Are you are experiencing a similar issue? Get a personalized answer when you ask a related question.

Have a better answer? Share it in a comment.

All Courses

From novice to tech pro — start learning today.