Java Queue producer/consumer

Hello,
  I am new to Java and I hope someone can help with my question

I have a Java application that read information from  files and depending on the type of data then some additional process is needed.

What I would like to do is to add the data to a queue . Then have a thread that watched the queue and process the information. Adding data to the queue is a continuous process

How can I do this in Java.
SiemensSENAsked:
Who is Participating?
 
mccarlIT Business Systems Analyst / Software DeveloperCommented:
Another option is to just use a plain Thread object (or indeed an Executor with just the one constantly running task) that takes items from a BlockingQueue. And then the main thread can just add items to the BlockingQueue when they become available to be processed.

See this example code of a simple Producer and Consumer that demonstrates the general idea...

Producer.java
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Producer {
    
    public Producer(BlockingQueue<String> queue) {
        Random random = new Random();
        
        try {
            // Loop and produce 10 messages
            for (int i = 0; i < 10; i++) {
                
                // Create the message to put on the queue
                String itemToQueue = "Item " + (i + 1);
                
                // Put the message on the queue
                queue.put(itemToQueue);
                System.out.println("P: Produced an item - " + itemToQueue);
                
                // Delay for a random amount of time between 1 second and 6 seconds
                int randomDelay = random.nextInt(5000) + 1000;
                Thread.sleep(randomDelay);
            }
            
            // Send a "poison pill" to stop the consumer
            queue.put("END");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public static void main(String[] args) {
        // Create a queue to hold the messages between the Producer and Consumer
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
        
        // Start the consumer (this method returns straight away because the consumer runs in a different thread
        new Consumer(queue);
        
        // Start the producer (this method will return when all messages have been produced
        new Producer(queue);
    }
    
}

Open in new window

Consumer.java
import java.util.concurrent.BlockingQueue;

public class Consumer {
    
    public Consumer(final BlockingQueue<String> queue) {
        
        // Start a new thread to consume the messages from the queue
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    // Continue looping for new messages
                    while (true) {
                        
                        // Wait for the next message on the queue, and take it when it is there
                        String itemTaken = queue.take();
                        
                        // Look for the "poison pill" and exit the loop when received
                        if ("END".equals(itemTaken)) {
                            break;
                        }
                        
                        // Simulate some processing of the item that might take a bit of time (4 seconds in this example)
                        System.out.println("                                         C: Consuming item - " + itemTaken);
                        Thread.sleep(4000);
                        System.out.println("                                         C: Finished consuming item - " + itemTaken);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();
    }
}

Open in new window

0
 
dpearsonCommented:
Check out the Executor classes:
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html

// Create executor with 5 threads to do the work
Executor executor = Executors.newFixedThreadPool(5) ;

// The task you want to have run
Runnable task = new Runnable() {
    public void run() {
        // Put the code that actually processes the data here
    }
}

// Add the task to the queue - the executor will handle picking an available
// thread, pulling the task off the queue, running the task etc.
executor.execute(task) ;

Rinse and repeat.

Doug
0
Question has a verified solution.

Are you are experiencing a similar issue? Get a personalized answer when you ask a related question.

Have a better answer? Share it in a comment.

All Courses

From novice to tech pro — start learning today.