Solved

Java Queue  producer/consumer

Posted on 2013-11-21
2
516 Views
Last Modified: 2013-12-11
Hello,
  I am new to Java and I hope someone can help with my question

I have a Java application that read information from  files and depending on the type of data then some additional process is needed.

What I would like to do is to add the data to a queue . Then have a thread that watched the queue and process the information. Adding data to the queue is a continuous process

How can I do this in Java.
0
Comment
Question by:SiemensSEN
2 Comments
 
LVL 27

Assisted Solution

by:dpearson
dpearson earned 100 total points
ID: 39667845
Check out the Executor classes:
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html

// Create executor with 5 threads to do the work
Executor executor = Executors.newFixedThreadPool(5) ;

// The task you want to have run
Runnable task = new Runnable() {
    public void run() {
        // Put the code that actually processes the data here
    }
}

// Add the task to the queue - the executor will handle picking an available
// thread, pulling the task off the queue, running the task etc.
executor.execute(task) ;

Rinse and repeat.

Doug
0
 
LVL 35

Accepted Solution

by:
mccarl earned 400 total points
ID: 39668058
Another option is to just use a plain Thread object (or indeed an Executor with just the one constantly running task) that takes items from a BlockingQueue. And then the main thread can just add items to the BlockingQueue when they become available to be processed.

See this example code of a simple Producer and Consumer that demonstrates the general idea...

Producer.java
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Producer {
    
    public Producer(BlockingQueue<String> queue) {
        Random random = new Random();
        
        try {
            // Loop and produce 10 messages
            for (int i = 0; i < 10; i++) {
                
                // Create the message to put on the queue
                String itemToQueue = "Item " + (i + 1);
                
                // Put the message on the queue
                queue.put(itemToQueue);
                System.out.println("P: Produced an item - " + itemToQueue);
                
                // Delay for a random amount of time between 1 second and 6 seconds
                int randomDelay = random.nextInt(5000) + 1000;
                Thread.sleep(randomDelay);
            }
            
            // Send a "poison pill" to stop the consumer
            queue.put("END");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public static void main(String[] args) {
        // Create a queue to hold the messages between the Producer and Consumer
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
        
        // Start the consumer (this method returns straight away because the consumer runs in a different thread
        new Consumer(queue);
        
        // Start the producer (this method will return when all messages have been produced
        new Producer(queue);
    }
    
}

Open in new window

Consumer.java
import java.util.concurrent.BlockingQueue;

public class Consumer {
    
    public Consumer(final BlockingQueue<String> queue) {
        
        // Start a new thread to consume the messages from the queue
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    // Continue looping for new messages
                    while (true) {
                        
                        // Wait for the next message on the queue, and take it when it is there
                        String itemTaken = queue.take();
                        
                        // Look for the "poison pill" and exit the loop when received
                        if ("END".equals(itemTaken)) {
                            break;
                        }
                        
                        // Simulate some processing of the item that might take a bit of time (4 seconds in this example)
                        System.out.println("                                         C: Consuming item - " + itemTaken);
                        Thread.sleep(4000);
                        System.out.println("                                         C: Finished consuming item - " + itemTaken);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();
    }
}

Open in new window

0

Featured Post

Free Tool: SSL Checker

Scans your site and returns information about your SSL implementation and certificate. Helpful for debugging and validating your SSL configuration.

One of a set of tools we are providing to everyone as a way of saying thank you for being a part of the community.

Question has a verified solution.

If you are experiencing a similar issue, please ask a related question

Java contains several comparison operators (e.g., <, <=, >, >=, ==, !=) that allow you to compare primitive values. However, these operators cannot be used to compare the contents of objects. Interface Comparable is used to allow objects of a cl…
Introduction This article is the last of three articles that explain why and how the Experts Exchange QA Team does test automation for our web site. This article covers our test design approach and then goes through a simple test case example, how …
Video by: Michael
Viewers learn about how to reduce the potential repetitiveness of coding in main by developing methods to perform specific tasks for their program. Additionally, objects are introduced for the purpose of learning how to call methods in Java. Define …
This video teaches viewers about errors in exception handling.

733 members asked questions and received personalized solutions in the past 7 days.

Join the community of 500,000 technology professionals and ask your questions.

Join & Ask a Question