Solved

ExecutorService.shutdown - Ensuring tasks are cancelled cleanly

Posted on 2014-02-28
9
918 Views
Last Modified: 2014-03-02
This is a follow-up to my previous question: How to implement multi-threaded polling,


In summary, I'm planning on using an ExecutorService and creating a fixed size pool. Then submitting a batch of tasks and waiting for them to complete. However, it is important that my application supports a shutdown feature that could cancel running tasks - but it must be done cleanly and in a predictable manner.

My tasks consist of several steps:

    1. validation
    2. parsing
    3. notifications              <=== point of no return
    4. store details in db      

... the early steps can be aborted with no ill effects, but after the task reaches a certain stage, I need to ensure the app waits for it to complete. Otherwise, it would leave things in an inconsistent state.

How can I ensure tasks either run to completion or are cancelled before the point of no return ie step 3?  I've been reading up on ExecutorService.shutdown/shutdownNow() and Future.cancel(..) but the API's seem to filled with a lot of disclaimers like "attempt to..." and "not guaranteed...". Is this even possible?

Any thoughts or advice would be GREATLY appreciated.
0
Comment
Question by:_agx_
  • 4
  • 3
  • 2
9 Comments
 
LVL 26

Accepted Solution

by:
dpearson earned 334 total points
Comment Utility
For a fully controlled shutdown, it's cancel(true) that you want to focus on.  You submit a Callable, get back the Future and then later on if you wish to interrupt the thread you call cancel(true) on it.

The good news is this does NOT immediately stop the thread.  It's just a signal to say "please stop now".  More technically the task will be Interrupted, which means if it's sleeping it'll throw an exception (which you can catch) or if it's running in a series of steps or a loop you can test for this flag with Thread.currentThread().isInterrupted().

So in your case you'd want to make sure that
a) you check for whether you've been interrupted before step #3 (your point of no return)
b) after that, make sure you catch any InterruptedExceptions that could get thrown and keep executing.

If you get an InterruptedException during your steps 1 or 2 it sounds like you can just allow it to kill the task (i.e. throw it out of the call() method in the task).

Anyway, the key is that the interruption is a cooperative process.  So you can definitely do this as you are in control of both the request to shutdown (when and how you call cancel()) and what you do about it when those requests come in (inside the task, how you handle InterruptedException and checking for the isInterrupted() flag).

The only really tricky cases come up if you make a call to some blocking process which doesn't support being interrupted (e.g. to a database driver) and you want to be able to abort at that point.  In that case you may need to rewrite that so it doesn't block as long, so you can "come up for air" and check the isInterrupted() flag manually to see if somebody wants you to stop running.  But it doesn't sound like that'll be a problem for you here.

Hope that helps,

Doug
0
 
LVL 86

Assisted Solution

by:CEHJ
CEHJ earned 166 total points
Comment Utility
The only really tricky cases come up if you make a call to some blocking process which doesn't support being interrupted (e.g. to a database driver) and you want to be able to abort at that point.
Or indeed, such as IO.
In that case you may need to rewrite that so it doesn't block as long, so you can "come up for air" and check the isInterrupted() flag manually to see if somebody wants you to stop running.  
The problem is that if a thread is blocked on IO, you simply can't come up for air, or interrupt it in any way other by closing the stream from another thread, so things should be arranged to do that if necessary.
0
 
LVL 52

Author Comment

by:_agx_
Comment Utility
Thanks for the responses.  Actually both of those apply in my case ie (processing w/database driver and IO).  


More technically the task will be Interrupted, which means if it's sleeping it'll throw an exception (which you can catch) or if it's running in a series of steps or a loop you can test for this flag with Thread.currentThread().isInterrupted().

That makes sense. I was planning on checking interrupted status at each of the key steps, then reacting accordingly at the end based on whether the task completed or not:

            if ( .. NOT interrupted) {
                doStep1()
            }  
            if ( .. NOT interrupted) {
                doStep2()
            }  
            if ( .. NOT interrupted) {
                doStep3()
                doStep4()
            }  
 
In that case you may need to rewrite that so it doesn't block as long, so you can "come up for air" and check the isInterrupted() flag manually to see if somebody wants you to stop running.  But it doesn't sound like that'll be a problem for you here.

Say the process did block for a long time, what would happen in that case when I tried to shutdown the application? Would it just keep waiting? I know a degree of this is under my control .. and a certain amount of waiting is ok in this case. I just want to make sure it doesn't hang or wait forever, and that everything is left in a consistent state when all's said and done.

The problem is that if a thread is blocked on IO, you simply can't come up for air, or interrupt it in any way other by closing the stream from another thread, so things should be arranged to do that if necessary.

Could you elaborate on that? I'm not sure how to structure it so that it's done safely, given that the closing would essentially be initiated from the parent app ie another thread?
0
 
LVL 86

Expert Comment

by:CEHJ
Comment Utility
You might start with something like this

import java.io.IOException;
import java.io.InputStream;


public class IoUnblockingThread extends Thread {
    private InputStream in;

    public IoUnblockingThread(InputStream in) {
        this.in = in;
    }

    @Override
    public void interrupt() {
        try {
            in.close();
        } catch (IOException e) { /* ignore */
        }

        super.interrupt();
    }
}

Open in new window

0
Highfive Gives IT Their Time Back

Highfive is so simple that setting up every meeting room takes just minutes and every employee will be able to start or join a call from any room with ease. Never be called into a meeting just to get it started again. This is how video conferencing should work!

 
LVL 52

Author Comment

by:_agx_
Comment Utility
Closing the stream on interrupt makes sense, but I'm not sure I follow your idea. I have a bunch of tasks which implement Callable (that's where the IO operations are performed). Where do you envision IoUnblockingThread fitting into the picture?
0
 
LVL 86

Expert Comment

by:CEHJ
Comment Utility
Actually it doesn't quite work. Maybe Doug could think of something. My idea was that, since shutdownNow calls interrupt() on the worker Thread, it would close the stream and break out of any blocking. The problem: getting the ExecutorService to use that Thread subclass. You can set a ThreadFactory in the Executors methods, but the trouble is, the key method of ThreadFactory takes a Runnable as a parameter. There could be a way out but for the moment i can't see it. Perhaps Doug will have a bright idea....
0
 
LVL 26

Assisted Solution

by:dpearson
dpearson earned 334 total points
Comment Utility
If you've got a long blocking call and it doesn't itself support being interrupted, then indeed making sure you can shutdown the app quickly becomes a problem.  Just to be clear - this is a problem totally separate from threading issues and executor services.  You'd have the same issue if it was just a 10 line app making this blocking call and you wanted to interrupt it.

Personally when I've had to deal with this I've used the 'redesign the problem' approach, to chunk up the work into small enough pieces that I don't block for long.  E.g. If it's a call to get data from the database, break the SQL into a series of smaller requests and then assemble the rows together, so rather than:

List<Map> rows = getAllRowsFromDB() ;

it becomes

List<Map> rows ;
while (!done && !interrupted()) {
    rows.addAll(getSomeRowsFromDB()) ;
}

It's better for the database anyway to do this in many cases, so it's more complex application code, but better throughput for all.

If you're instead making a remote call (e.g. an http request) then again it's often possible to redesign.  E.g. Rather than

 "goAndDoWorkAndSendMeResults()"

 you change it to:
                "startWorkAndGetID()"
and then "checkOnProgressOfID()" and then
                "getResultsForFinishedID()"

(Actually very much parallel to how a Future works within an app).

But of course whether you can do this any of these is problem specific and may be a lot of work or a little.

I've never really liked long blocking calls anyway (irrespective of interruptibility) because you generally want progress updates on long processes, to feed back to somebody - user or a master controller - which is why I generally take on the work to break it up.

However, another path is to look into Java's NIO library.  I've not personally done that much but it does have the concept of an InterruptibleChannel:
http://docs.oracle.com/javase/7/docs/api/java/nio/channels/InterruptibleChannel.html

The whole NIO approach for reading data (which is fundamentally not about blocking calls, more checking who has data currently available to read) avoids the long blocked call model at its core.  Alas, I've always found NIO with its low level data buffers a bit of a minefield to get correct, but it is another way to go and there's probably some nice wrapper libraries out there these days.

Sorry that's not a better answer - but since CEHJ finds this hard to solve too what can you expect :)

Doug
0
 
LVL 52

Author Comment

by:_agx_
Comment Utility
You'd have the same issue if it was just a 10 line app making this blocking call and you wanted to interrupt it.

Understood. The tasks I'm processing take varying amounts of time, but almost all of them are relatively short lived. Plus the current process is already broken down into sufficiently small steps, so as to make long blocking unlikely. It is an internal process that we control, so I will have to think about how to best handle real edge cases in the next draft.
0
 
LVL 86

Expert Comment

by:CEHJ
Comment Utility
so I will have to think about how to best handle real edge cases in the next draft.
Come back if you have any news on the blocking issue. I'm sure some of the people here can come up with a solution if it's really thought about.

One way of attacking the blocking problem would be to have another service operating on a List<Closeable> so stuff like streams could be closed before shutdown. But i don't like this way - it's clunky and there's got to be a more elegant solution. For the same reason, i don't like being forced into non-blocking nio.

It's a pity that the shutdown procedures can't be hooked into, much like http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html#beforeExecute(java.lang.Thread,%20java.lang.Runnable)
0

Featured Post

IT, Stop Being Called Into Every Meeting

Highfive is so simple that setting up every meeting room takes just minutes and every employee will be able to start or join a call from any room with ease. Never be called into a meeting just to get it started again. This is how video conferencing should work!

Join & Write a Comment

Java contains several comparison operators (e.g., <, <=, >, >=, ==, !=) that allow you to compare primitive values. However, these operators cannot be used to compare the contents of objects. Interface Comparable is used to allow objects of a cl…
Go is an acronym of golang, is a programming language developed Google in 2007. Go is a new language that is mostly in the C family, with significant input from Pascal/Modula/Oberon family. Hence Go arisen as low-level language with fast compilation…
Viewers will learn one way to get user input in Java. Introduce the Scanner object: Declare the variable that stores the user input: An example prompting the user for input: Methods you need to invoke in order to properly get  user input:
Viewers will learn about the regular for loop in Java and how to use it. Definition: Break the for loop down into 3 parts: Syntax when using for loops: Example using a for loop:

772 members asked questions and received personalized solutions in the past 7 days.

Join the community of 500,000 technology professionals and ask your questions.

Join & Ask a Question

Need Help in Real-Time?

Connect with top rated Experts

10 Experts available now in Live!

Get 1:1 Help Now