Want to win a PS4? Go Premium and enter to win our High-Tech Treats giveaway. Enter to Win

x
?
Solved

Abort put/take of BlockingQueue

Posted on 2009-07-15
17
Medium Priority
?
787 Views
Last Modified: 2012-05-07
How is the recommended way of terminating multiple threads that are blocked in java.util.concurrent.BlockingQueue put/take operations?

Assume that we have multiple producers and consumers executing in separate threads that are started independently. An unknown number of producers are locked in blocking calls to put() and an unknown number of consumers are locked in calls to take().  I don't have control of all the producers/consumers due to various reasons. At some time the queue should be closed so all pending put/take calls should be interrupted. How is this done?

It would be nice if there were a BlockingQueue.close() method or something similar to release all blocked threads.
0
Comment
Question by:SvenT
[X]
Welcome to Experts Exchange

Add your voice to the tech community where 5M+ people just like you are talking about what matters.

  • Help others & share knowledge
  • Earn cash & points
  • Learn & ask questions
  • 7
  • 5
  • 4
  • +1
17 Comments
 
LVL 23

Expert Comment

by:Ajay-Singh
ID: 24860161
BlockingQueue is an interface
 
http://java.sun.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html

and the implementation is left to the classes. So, you can
have a implementation of this interface which has method to
interrupt the shared lock to handle this situation.
0
 
LVL 1

Author Comment

by:SvenT
ID: 24860259
Yes, sorry.

I am using the http://java.sun.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html which implements the interface, so what I actually need is a LinkedBlockingQueue.close() or any other means of solving this. Maybe there are better solutions than the one that I felt was most obvious (the close/interrupt way)?
0
 
LVL 86

Expert Comment

by:CEHJ
ID: 24860639
You should get your threads from an ExecutorService. Note the return value of this method:

http://java.sun.com/javase/6/docs/api/java/util/concurrent/ExecutorService.html#shutdownNow()
0
Concerto Cloud for Software Providers & ISVs

Can Concerto Cloud Services help you focus on evolving your application offerings, while delivering the best cloud experience to your customers? From DevOps to revenue models and customer support, the answer is yes!

Learn how Concerto can help you.

 
LVL 1

Author Comment

by:SvenT
ID: 24862405
An ExecutorService seems like a good idea if I had control of all the consumer/producer threads. Unfortunately, in this case I don't have it so for me the only common thing is the queue.  I would appreciate a solution from the "queues perspective".
0
 
LVL 86

Expert Comment

by:CEHJ
ID: 24862600
Surely the queue doesn't keep references to the accessing threads?
0
 
LVL 1

Author Comment

by:SvenT
ID: 24864155
No, the producer/consumer threads are created and connected via a queue through reflection (Oh, I hate reflection :-( and if I had any control of it  I would definitely remove that solution. The queue has no reference to the accessing threads. Since I don't have control over the blocked threads, I don't know how I should indicate that there's no more data, queue closed or "abort mission" to the blocked queues.
0
 
LVL 86

Expert Comment

by:CEHJ
ID: 24864398
If you keep a List<Thread> you can iterate them and do
if (!t.getState().equals(Thread.State.TERMINATED)) {
  // Probably blocked
}
 
OR
 
if (t.getState().equals(Thread.State.BLOCKED)) {
  // Definitely blocked if synch is arranged in normal way
}

Open in new window

0
 
LVL 92

Expert Comment

by:objects
ID: 24865395
Its really nothing to do with your queue (its just a data store), Its the threads you have to be stopping and you would do that the same way as any other thread. In your case an interrupt() should do it.
0
 
LVL 1

Author Comment

by:SvenT
ID: 24867008
Maybe I am trying to do something that isn't meant to be done, but as I have seen it the queue is only a connection between to execution entities.

Please correct me if I am wrong but if I recall it correct, I can close a Socket and both the server side and the client side will be notified (i.e. read/write attempts will be terminated with an exception).

If I have a concurrent queue, and have one producer and one consumer, when the producer closes down/exits/terminates, the consumer will never be notified but only blocked forever in a read attempt.

Your suggestions are to have a superior object that iterates over the producer/consumer objects and tell them to terminate e.g. via interrupt(). Since I don't have that control (these objects are created with f***ing reflection) it will be a huge job to change.

It seems like a really low coupling solution:
* a producer is created and told to produce data to a queue. Whenever the queue is full, the producer will wait.
* A consumer is created and told to read data from a queue. Whenever the queue is empty, the consumer will wait for more data. Whenever the queue is closed, the consumer will exit.

No extra control objects is needed to handle the interaction between the producer/consumer.
The only drawback is that I cannot awaken consumers that are blocked in take() operations.
0
 
LVL 92

Expert Comment

by:objects
ID: 24867022
> Whenever the queue is closed

there is no concept of a queue being closed, its just a data structure.
You need some other indicator
0
 
LVL 86

Assisted Solution

by:CEHJ
CEHJ earned 400 total points
ID: 24867053
To do what you're imagining, you would need some kind of 'smart queue' that was producer/consumer-aware. That of course is not normally the case with a queue, so that would require significant changes to be made
0
 
LVL 1

Author Comment

by:SvenT
ID: 24867233
OK, I'll have to accept that but I cannot see why this is so different from e.g. a SocketChannel. From my point of view, a SocketChannel and a LinkedBlockingQueue are means for two execution entities to communicate and exchange data.

The java.nio.channels.SocketChannel.read() will indicate if the channel is closed with
    NotYetConnectedException - If this channel is not yet connected
    ClosedChannelException - If this channel is closed
    AsynchronousCloseException - If another thread closes this channel while the read operation is in progress
    ClosedByInterruptException - If another thread interrupts the current thread while the read operation is in progress, thereby closing the channel and setting the current thread's interrupt status
    IOException - If some other I/O error occurs

but the java.util.concurrent.LinkedBlockingQueue.take() will not indicate anything. Maybe I am wrong, but to me that seems like a fault in the design.
0
 
LVL 92

Assisted Solution

by:objects
objects earned 800 total points
ID: 24867319
A channel has the concept of being open or closed, a queue does not. it is just a data structure (like a list).
one option may be to create a queue subclass or wrapper that added the functionality you you require.


0
 
LVL 1

Accepted Solution

by:
SvenT earned 0 total points
ID: 24867441
For me who doesn't have English as my first language there's absolutely no difference between a queue and a channel or a pipe. Maybe wrong, but to me they are all concepts for transferring data from one place (in time or space) to another place. To me, a list is another concept since it is used for temporary storing, it is not blocking for insertions or retrievals.

I am not the only one with this problem since I have found an answer to this myself, the Poison Pill approach. It is not optimal, but seems to be the best (?) way for me::
http://stackoverflow.com/questions/812342/how-to-interrupt-a-blockingqueue-which-is-blocking-on-take

Thanks for all the comments and clues :-)
0
 
LVL 92

Expert Comment

by:objects
ID: 24867476
thats one usage of a queue, it is not what it was specifically designed for.

>  It is not optimal, but seems to be the best (?) way for me::

I mentioned similar earlier, ie. finding a different way to indicate that it needs closing. In your case you'll need to change your consumers to check the head of the queue everytime before takinging an element. Similar for the producer. Though my understanding was that you couldn't change the consumer/procduser.
0
 
LVL 1

Author Comment

by:SvenT
ID: 24867727
Yes exactly, I don't have any easy control over the producers/consumers. What I have control over is the communication means between them and it was written as a wrapper class based on a LinkedBlockingQueue. The wrapper class contains a legacy open/close method, but since it didn't manage to abort a take(), it didn't work and caused a lot of problems in the consumers.

With the feedback that I've got, maybe using the LinkedBlockingQueue isn't the way that this should have been done in the first place. Maybe there are better ways of communicating data in a thread safe manner between different entities in the same JVM.

The Poison Pill approach will work but it feels a little as carrying coals to Newcastle :-)

0
 
LVL 92

Expert Comment

by:objects
ID: 24867742
yes sounds like its worth refactoring at some stage
0

Featured Post

Tech or Treat! - Giveaway

Submit an article about your scariest tech experience—and the solution—and you’ll be automatically entered to win one of 4 fantastic tech gadgets.

Question has a verified solution.

If you are experiencing a similar issue, please ask a related question

Java contains several comparison operators (e.g., <, <=, >, >=, ==, !=) that allow you to compare primitive values. However, these operators cannot be used to compare the contents of objects. Interface Comparable is used to allow objects of a cl…
Introduction Java can be integrated with native programs using an interface called JNI(Java Native Interface). Native programs are programs which can directly run on the processor. JNI is simply a naming and calling convention so that the JVM (Java…
Viewers will learn how to properly install Eclipse with the necessary JDK, and will take a look at an introductory Java program. Download Eclipse installation zip file: Extract files from zip file: Download and install JDK 8: Open Eclipse and …
How to fix incompatible JVM issue while installing Eclipse While installing Eclipse in windows, got one error like above and unable to proceed with the installation. This video describes how to successfully install Eclipse. How to solve incompa…
Suggested Courses

618 members asked questions and received personalized solutions in the past 7 days.

Join the community of 500,000 technology professionals and ask your questions.

Join & Ask a Question