?
Solved

Java Queue  producer/consumer

Posted on 2013-11-21
2
Medium Priority
?
554 Views
Last Modified: 2013-12-11
Hello,
  I am new to Java and I hope someone can help with my question

I have a Java application that read information from  files and depending on the type of data then some additional process is needed.

What I would like to do is to add the data to a queue . Then have a thread that watched the queue and process the information. Adding data to the queue is a continuous process

How can I do this in Java.
0
Comment
Question by:SiemensSEN
[X]
Welcome to Experts Exchange

Add your voice to the tech community where 5M+ people just like you are talking about what matters.

  • Help others & share knowledge
  • Earn cash & points
  • Learn & ask questions
2 Comments
 
LVL 28

Assisted Solution

by:dpearson
dpearson earned 400 total points
ID: 39667845
Check out the Executor classes:
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html

// Create executor with 5 threads to do the work
Executor executor = Executors.newFixedThreadPool(5) ;

// The task you want to have run
Runnable task = new Runnable() {
    public void run() {
        // Put the code that actually processes the data here
    }
}

// Add the task to the queue - the executor will handle picking an available
// thread, pulling the task off the queue, running the task etc.
executor.execute(task) ;

Rinse and repeat.

Doug
0
 
LVL 36

Accepted Solution

by:
mccarl earned 1600 total points
ID: 39668058
Another option is to just use a plain Thread object (or indeed an Executor with just the one constantly running task) that takes items from a BlockingQueue. And then the main thread can just add items to the BlockingQueue when they become available to be processed.

See this example code of a simple Producer and Consumer that demonstrates the general idea...

Producer.java
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class Producer {
    
    public Producer(BlockingQueue<String> queue) {
        Random random = new Random();
        
        try {
            // Loop and produce 10 messages
            for (int i = 0; i < 10; i++) {
                
                // Create the message to put on the queue
                String itemToQueue = "Item " + (i + 1);
                
                // Put the message on the queue
                queue.put(itemToQueue);
                System.out.println("P: Produced an item - " + itemToQueue);
                
                // Delay for a random amount of time between 1 second and 6 seconds
                int randomDelay = random.nextInt(5000) + 1000;
                Thread.sleep(randomDelay);
            }
            
            // Send a "poison pill" to stop the consumer
            queue.put("END");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public static void main(String[] args) {
        // Create a queue to hold the messages between the Producer and Consumer
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
        
        // Start the consumer (this method returns straight away because the consumer runs in a different thread
        new Consumer(queue);
        
        // Start the producer (this method will return when all messages have been produced
        new Producer(queue);
    }
    
}

Open in new window

Consumer.java
import java.util.concurrent.BlockingQueue;

public class Consumer {
    
    public Consumer(final BlockingQueue<String> queue) {
        
        // Start a new thread to consume the messages from the queue
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    // Continue looping for new messages
                    while (true) {
                        
                        // Wait for the next message on the queue, and take it when it is there
                        String itemTaken = queue.take();
                        
                        // Look for the "poison pill" and exit the loop when received
                        if ("END".equals(itemTaken)) {
                            break;
                        }
                        
                        // Simulate some processing of the item that might take a bit of time (4 seconds in this example)
                        System.out.println("                                         C: Consuming item - " + itemTaken);
                        Thread.sleep(4000);
                        System.out.println("                                         C: Finished consuming item - " + itemTaken);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();
    }
}

Open in new window

0

Featured Post

Optimize your web performance

What's in the eBook?
- Full list of reasons for poor performance
- Ultimate measures to speed things up
- Primary web monitoring types
- KPIs you should be monitoring in order to increase your ROI

Question has a verified solution.

If you are experiencing a similar issue, please ask a related question

By the end of 1980s, object oriented programming using languages like C++, Simula69 and ObjectPascal gained momentum. It looked like programmers finally found the perfect language. C++ successfully combined the object oriented principles of Simula w…
Introduction This article is the second of three articles that explain why and how the Experts Exchange QA Team does test automation for our web site. This article covers the basic installation and configuration of the test automation tools used by…
Viewers learn how to read error messages and identify possible mistakes that could cause hours of frustration. Coding is as much about debugging your code as it is about writing it. Define Error Message: Line Numbers: Type of Error: Break Down…
This theoretical tutorial explains exceptions, reasons for exceptions, different categories of exception and exception hierarchy.
Suggested Courses
Course of the Month10 days, 14 hours left to enroll

770 members asked questions and received personalized solutions in the past 7 days.

Join the community of 500,000 technology professionals and ask your questions.

Join & Ask a Question