Solved

Abort put/take of BlockingQueue

Posted on 2009-07-15
17
763 Views
Last Modified: 2012-05-07
How is the recommended way of terminating multiple threads that are blocked in java.util.concurrent.BlockingQueue put/take operations?

Assume that we have multiple producers and consumers executing in separate threads that are started independently. An unknown number of producers are locked in blocking calls to put() and an unknown number of consumers are locked in calls to take().  I don't have control of all the producers/consumers due to various reasons. At some time the queue should be closed so all pending put/take calls should be interrupted. How is this done?

It would be nice if there were a BlockingQueue.close() method or something similar to release all blocked threads.
0
Comment
Question by:SvenT
  • 7
  • 5
  • 4
  • +1
17 Comments
 
LVL 23

Expert Comment

by:Ajay-Singh
ID: 24860161
BlockingQueue is an interface
 
http://java.sun.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html

and the implementation is left to the classes. So, you can
have a implementation of this interface which has method to
interrupt the shared lock to handle this situation.
0
 
LVL 1

Author Comment

by:SvenT
ID: 24860259
Yes, sorry.

I am using the http://java.sun.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html which implements the interface, so what I actually need is a LinkedBlockingQueue.close() or any other means of solving this. Maybe there are better solutions than the one that I felt was most obvious (the close/interrupt way)?
0
 
LVL 86

Expert Comment

by:CEHJ
ID: 24860639
You should get your threads from an ExecutorService. Note the return value of this method:

http://java.sun.com/javase/6/docs/api/java/util/concurrent/ExecutorService.html#shutdownNow()
0
 
LVL 1

Author Comment

by:SvenT
ID: 24862405
An ExecutorService seems like a good idea if I had control of all the consumer/producer threads. Unfortunately, in this case I don't have it so for me the only common thing is the queue.  I would appreciate a solution from the "queues perspective".
0
 
LVL 86

Expert Comment

by:CEHJ
ID: 24862600
Surely the queue doesn't keep references to the accessing threads?
0
 
LVL 1

Author Comment

by:SvenT
ID: 24864155
No, the producer/consumer threads are created and connected via a queue through reflection (Oh, I hate reflection :-( and if I had any control of it  I would definitely remove that solution. The queue has no reference to the accessing threads. Since I don't have control over the blocked threads, I don't know how I should indicate that there's no more data, queue closed or "abort mission" to the blocked queues.
0
 
LVL 86

Expert Comment

by:CEHJ
ID: 24864398
If you keep a List<Thread> you can iterate them and do
if (!t.getState().equals(Thread.State.TERMINATED)) {

  // Probably blocked

}
 

OR
 

if (t.getState().equals(Thread.State.BLOCKED)) {

  // Definitely blocked if synch is arranged in normal way

}

Open in new window

0
 
LVL 92

Expert Comment

by:objects
ID: 24865395
Its really nothing to do with your queue (its just a data store), Its the threads you have to be stopping and you would do that the same way as any other thread. In your case an interrupt() should do it.
0
Find Ransomware Secrets With All-Source Analysis

Ransomware has become a major concern for organizations; its prevalence has grown due to past successes achieved by threat actors. While each ransomware variant is different, we’ve seen some common tactics and trends used among the authors of the malware.

 
LVL 1

Author Comment

by:SvenT
ID: 24867008
Maybe I am trying to do something that isn't meant to be done, but as I have seen it the queue is only a connection between to execution entities.

Please correct me if I am wrong but if I recall it correct, I can close a Socket and both the server side and the client side will be notified (i.e. read/write attempts will be terminated with an exception).

If I have a concurrent queue, and have one producer and one consumer, when the producer closes down/exits/terminates, the consumer will never be notified but only blocked forever in a read attempt.

Your suggestions are to have a superior object that iterates over the producer/consumer objects and tell them to terminate e.g. via interrupt(). Since I don't have that control (these objects are created with f***ing reflection) it will be a huge job to change.

It seems like a really low coupling solution:
* a producer is created and told to produce data to a queue. Whenever the queue is full, the producer will wait.
* A consumer is created and told to read data from a queue. Whenever the queue is empty, the consumer will wait for more data. Whenever the queue is closed, the consumer will exit.

No extra control objects is needed to handle the interaction between the producer/consumer.
The only drawback is that I cannot awaken consumers that are blocked in take() operations.
0
 
LVL 92

Expert Comment

by:objects
ID: 24867022
> Whenever the queue is closed

there is no concept of a queue being closed, its just a data structure.
You need some other indicator
0
 
LVL 86

Assisted Solution

by:CEHJ
CEHJ earned 100 total points
ID: 24867053
To do what you're imagining, you would need some kind of 'smart queue' that was producer/consumer-aware. That of course is not normally the case with a queue, so that would require significant changes to be made
0
 
LVL 1

Author Comment

by:SvenT
ID: 24867233
OK, I'll have to accept that but I cannot see why this is so different from e.g. a SocketChannel. From my point of view, a SocketChannel and a LinkedBlockingQueue are means for two execution entities to communicate and exchange data.

The java.nio.channels.SocketChannel.read() will indicate if the channel is closed with
    NotYetConnectedException - If this channel is not yet connected
    ClosedChannelException - If this channel is closed
    AsynchronousCloseException - If another thread closes this channel while the read operation is in progress
    ClosedByInterruptException - If another thread interrupts the current thread while the read operation is in progress, thereby closing the channel and setting the current thread's interrupt status
    IOException - If some other I/O error occurs

but the java.util.concurrent.LinkedBlockingQueue.take() will not indicate anything. Maybe I am wrong, but to me that seems like a fault in the design.
0
 
LVL 92

Assisted Solution

by:objects
objects earned 200 total points
ID: 24867319
A channel has the concept of being open or closed, a queue does not. it is just a data structure (like a list).
one option may be to create a queue subclass or wrapper that added the functionality you you require.


0
 
LVL 1

Accepted Solution

by:
SvenT earned 0 total points
ID: 24867441
For me who doesn't have English as my first language there's absolutely no difference between a queue and a channel or a pipe. Maybe wrong, but to me they are all concepts for transferring data from one place (in time or space) to another place. To me, a list is another concept since it is used for temporary storing, it is not blocking for insertions or retrievals.

I am not the only one with this problem since I have found an answer to this myself, the Poison Pill approach. It is not optimal, but seems to be the best (?) way for me::
http://stackoverflow.com/questions/812342/how-to-interrupt-a-blockingqueue-which-is-blocking-on-take

Thanks for all the comments and clues :-)
0
 
LVL 92

Expert Comment

by:objects
ID: 24867476
thats one usage of a queue, it is not what it was specifically designed for.

>  It is not optimal, but seems to be the best (?) way for me::

I mentioned similar earlier, ie. finding a different way to indicate that it needs closing. In your case you'll need to change your consumers to check the head of the queue everytime before takinging an element. Similar for the producer. Though my understanding was that you couldn't change the consumer/procduser.
0
 
LVL 1

Author Comment

by:SvenT
ID: 24867727
Yes exactly, I don't have any easy control over the producers/consumers. What I have control over is the communication means between them and it was written as a wrapper class based on a LinkedBlockingQueue. The wrapper class contains a legacy open/close method, but since it didn't manage to abort a take(), it didn't work and caused a lot of problems in the consumers.

With the feedback that I've got, maybe using the LinkedBlockingQueue isn't the way that this should have been done in the first place. Maybe there are better ways of communicating data in a thread safe manner between different entities in the same JVM.

The Poison Pill approach will work but it feels a little as carrying coals to Newcastle :-)

0
 
LVL 92

Expert Comment

by:objects
ID: 24867742
yes sounds like its worth refactoring at some stage
0

Featured Post

How to run any project with ease

Manage projects of all sizes how you want. Great for personal to-do lists, project milestones, team priorities and launch plans.
- Combine task lists, docs, spreadsheets, and chat in one
- View and edit from mobile/offline
- Cut down on emails

Join & Write a Comment

Suggested Solutions

Title # Comments Views Activity
Spring Controller - inheritance in request parameter 3 38
json example 39 115
groupSum6 challenge 6 45
HashMap Vs TreeMap 12 49
Java had always been an easily readable and understandable language.  Some relatively recent changes in the language seem to be changing this pretty fast, and anyone that had not seen any Java code for the last 5 years will possibly have issues unde…
Introduction Java can be integrated with native programs using an interface called JNI(Java Native Interface). Native programs are programs which can directly run on the processor. JNI is simply a naming and calling convention so that the JVM (Java…
The viewer will learn how to implement Singleton Design Pattern in Java.
This tutorial covers a step-by-step guide to install VisualVM launcher in eclipse.

746 members asked questions and received personalized solutions in the past 7 days.

Join the community of 500,000 technology professionals and ask your questions.

Join & Ask a Question

Need Help in Real-Time?

Connect with top rated Experts

13 Experts available now in Live!

Get 1:1 Help Now