Solved

What to use for interthread communication for this problem?

Posted on 2002-05-13
10
271 Views
Last Modified: 2012-06-22
Hi,
My knowledge of Java is quite basic and I need to port a C++ application to Java.
I would like to receive some assistance, hints for the following inter-thread communication problem :
Thread A reads data from serial port. When a complete message has been received, thread A needs to notify thread B that a complete message has been retrieved. Thread B performs some processing on this data.
Thread A needs constantly processing serial data, even while thread B does processing on already received data.

How did we deal with this in C++ :
Thread A places serial data in global buffer. When a complete message was retrieved, an event is sent to thread B to notify new data is available. Through the use of pointers we're able to read (by thread B) and to write (by thread A) to this commonly used buffer.

I'm interesting in :
How to deal with this in Java?
I did some research yet, but didn't find a usefull solution.
? use of events -> can thread B (in its 'run' method) be notified of this event?
? use of pipes -> how to tell thread B where he can process?
? use of wait/notify -> we need to use a commonly used object to communicate between threads A and B. Thread A can't process serial data while thread B does the processing.

You see, a lot of questions. What will be the preferred solution and why? If some code would be available, like to see the basic steps.

Thanks.jlsjls.

 
0
Comment
Question by:jlsjls
  • 5
  • 4
10 Comments
 

Expert Comment

by:hardeepbsingh
ID: 7005627
? use of wait/notify -> we need to use a commonly used object to communicate between threads A and B.
Thread A can't process serial data while thread B does the processing.


This is wrong. Wait/Notify is the preferred method and both threads can go on working. So much code is available on the net. Or read any book.
0
 
LVL 1

Accepted Solution

by:
BaneBane earned 250 total points
ID: 7005700
Hi,

What you may want to do is use thread A to read from the socket, use the same thread to construct some simple object representing the data and enqueue that object into a queue. Thread A is the producer.

At the same time you could use thread B as the consumer of the same data. Thread B would dequeue the object and manipulate it. Thus you separate the production of consumption of the data.

The Queue is question should be a synchronized queue which would block if B tries to dequeue from an empty queue. Each enqueue operation should notifyAll()--> waking thread B and allowing it to continue working.

I could supply the code for the queue etc.
Let me know.

Hope this helps.
0
 
LVL 3

Author Comment

by:jlsjls
ID: 7005754
To BaneBane :
This way of working seems ok for me.
Can you confirm that thread B only blocks on this queue if the queue is empty and isn't blocked when processing the object which has been put into the queue by thread A. I presume that we can retrieve an object from the queue one by one.

Could you provide me with the code for the queue?
Thx.
jlsjls
0
 
LVL 1

Expert Comment

by:BaneBane
ID: 7005781
Yes B will block if it tries to dequeue an empty queue.
And you are right you can retrieve an object from the queue one by one.



And here comes the code:

********************************************
Queue Interface
********************************************

/**
 * A Queue is a FIFO (First In First Out) dispenser.
 */
public interface Queue {

    /**
     * Enqueue an element into the queue
     *
     * @param o The element to enqueue
     */
    public void enqueue(Object o);
   
    /**
     * De-queue an element from the queue
     *
     * @return The element
    */
    public Object dequeue();

    /**
     * Is the queue empty?
     *
     * @return true iff the queue is empty
     */
    public boolean isEmpty();
   
    /**
     * How many elements are currently enqueued?
     *
     * @return the number of elements in the queue
     */
     public int size();
}


********************************************
Queue implementation
********************************************


import java.util.*;

/**
 * A Queue is a FIFO (First In First Out) dispenser.
 */
public class SynchronizedQueue implements Queue {
    protected LinkedList m_list;

    /**
     * Default constructor
     */
    public SynchronizedQueue(){
        m_list=new LinkedList();
    }

    /**
     * Enqueue an element into the queue
     *
     * @param o The element to enqueue
     */
    synchronized public void enqueue(Object o){
        m_list.add(o)   ;

        notifyAll(); //Let waiting threads know that the queue is not empty.
    }

    /**
     * De-queue an element from the queue
     *
     * @return The element
     */
    synchronized public Object dequeue() {
        while(isEmpty()) {
            try {
                wait();
            } catch (InterruptedException ex){}
        }
        return m_list.removeFirst();
    }

    /**
     * How many elements are currently enqueued?
     *
     * @return the number of elements in the queue
     */
     synchronized public int size() {
        return m_list.size();
     }


    /**
     * Is the queue empty?
     *
     * @return true iff the queue is empty
     */
    synchronized public boolean isEmpty(){
        return (size()==0);
    }
}


Have fun
0
 
LVL 3

Author Comment

by:jlsjls
ID: 7005894
To BaneBane :
Is the following correct ?
The use of the common queue by both threads can be done as follow :
SynchronizedQueue sQueue = new SynchronizedQueue();
Thread A = new Thread(sQueue);
Thread B = new Thread(sQueue);
A.start();
B.start();

Thx. jlsjls.
0
IT, Stop Being Called Into Every Meeting

Highfive is so simple that setting up every meeting room takes just minutes and every employee will be able to start or join a call from any room with ease. Never be called into a meeting just to get it started again. This is how video conferencing should work!

 
LVL 1

Expert Comment

by:BaneBane
ID: 7005942
Nope.

What you should do is:


Thread A = new Thread();
Thread B = new Thread();
A.start(); //do some work....
B.start(); //do some work....


import java.net.*;
import java.io.*;

public class A extends Thread{

    private Socket  m_Socket;
    private InputStream m_InputStream;
   
    private int LISTENING_PORT    = 10000;


    public A() {
        m_Socket = new Socket(InetAddress.getLocalHost(),LISTENING_PORT);
        m_InputStream = m_Socket.getInputStream();
    }

    public void run(){
       
        while(true){
            //DO some read according to your own logic. read a line/read a number of bytes etc.
            //data = m_InputStream.read();
            //SomeObject obj = new SomeObject(data);
            //sQueue.enqueue(obj);
        }//while
    }//run
}//A


public class B extends Thread{


    public B() {
    }

    public void run(){
       

        while(true) {
            try {
                obj=sQueue.dequeue();
                if(obj!=null){
                    //DO something with obj
                }//if
            } //try
            catch(Throwable th) {
                System.err.println("Unexpected exception caught:");
                th.printStackTrace();
            } //catch
           
        }//while
    }//run
}//B

Note that the above code is a sample, it will not run as is.
0
 
LVL 3

Author Comment

by:jlsjls
ID: 7006025
What I don't see in your code is : how is the queue 'sQueue' shared between thread A and thread B??
How is this done here??

Is the use of sockets (newer worked with) necesarry to let this work. What is LISTENING_PORT = 10000 ?? I suppose not since I will supply my own inputStream (from serial port).

In my code the threads implement the Runnable interface.

Thx.jlsjls.
 
0
 
LVL 1

Expert Comment

by:BaneBane
ID: 7006070
You could create a class which will keep a referance to the queue....

something like this:

public class Monitor  {

    private static Monitor m_instance=null;
    private static Thread m_eventHandlingThread=null;
    private static Queue m_eventQueue=new SynchronizedQueue();
   


    /**
     * Default constructor
     * Since this Class implements the Singletone pattern,
     * this constructor would be executed (at-most) once.
     */
    private Monitor()  {
    }//Monitor


    /**
     * Get the one and only instance of the monitor (singletone)
     *
     * @return the single instance of the Monitor Class
     * @post m_instance!=null
     *
     */
    public static synchronized Monitor getInstance(){

        if(m_instance==null){
            m_instance=new Monitor();
            createEventHandlingThread();
        }//if
        return m_instance;
    }//instance


    /**
     * Check if the event handling deamon thread is alive.
     * @return true iff the event handling deamon thread exists, and 'is alive'
     *
     */
    public static boolean isEventHandlingThreadAlive(){
        if(m_eventHandlingThread==null) return false;
        return m_eventHandlingThread.isAlive();
    } //isEventHandlingThreadAlive


    /**
     * @return the amount of events waiting to be handled in the queue
     *
     */
    public static int getQueueSize(){
        return m_eventQueue.size();
    }//getQueueSize


    /**
     * @return the priority of the event handing thread
     *
     */
    public static int getEventHandlingThreadPriority(){
        return m_eventHandlingThread.getPriority();
    }//getEventHandlingThreadPriority


    /**
     * Create an event handling thread, if one doesn't exist.
     *
     */
    public static synchronized void createEventHandlingThread(){
        if(isEventHandlingThreadAlive()) return;

        //I'm using an anonymous inner class so that no other class could refer to run().
        Runnable service=new Runnable(){

            /**
            * Handle the events int the event queue.
            * This work is done by an Event Handling Thread.
            *
            */
            public void run() {
                int nSize=0;


                Object obj=null;

                while(true) {
                    try {
                        obj=m_eventQueue.dequeue();
                        if(obj!=null){
                              //do some thing
                        }

                    } catch(Throwable th) {
                        System.err.println("Unexpected exception caught:");
                        th.printStackTrace();
                    }
                }//while
            } //run
        };


        m_eventHandlingThread=new Thread(service);
        m_eventHandlingThread.setName("Event Handler");
        m_eventHandlingThread.setPriority(Thread.MAX_PRIORITY);//Set the priority highest.
        m_eventHandlingThread.setDaemon(true);

        m_eventHandlingThread.start();
    }

    /**
     * Asynchronously handle the objects
     *
     *
     * @pre object!=null
     * @pre m_instance!=null
     *
     */
    public void enqueue(Object object)  {
      m_eventQueue.enqueue(object);
    }


    protected void finalize() throws Throwable  {
        try {
            //If queue is not empty, give m_eventHandlingThread a chance to clean it
            while(!m_eventQueue.isEmpty()) {
                Thread.sleep(1000);
                System.out.println("Monitor exiting - queue not empty. sleeping");
            }
        } finally {
            super.finalize();
        }
    }
}//Monitor



Just call Monitor.getInstance();
once the call would create the event handling thread which will dequeue objects from the queue and do soemthing to them.


I terms of what you need there is no real difference between extends Thread and implements Runable. Objects which extends threads are heavier but they enjoy the versatility of threads objects. objects which implements the Runable interface are not as versatile (you need a thread object to run them).


0
 
LVL 3

Author Comment

by:jlsjls
ID: 7006412
To BaneBane:
I've put all your code into mine and tested it out. Everything works fine and the two threads communicate through this queue. Thanks a lot for your time and effort.
You provided me very usefull information.
BTW : I added 100 extra points for your detailed code.
jlsjls
0
 
LVL 1

Expert Comment

by:BaneBane
ID: 7007831
Sure :-)
0

Featured Post

How to run any project with ease

Manage projects of all sizes how you want. Great for personal to-do lists, project milestones, team priorities and launch plans.
- Combine task lists, docs, spreadsheets, and chat in one
- View and edit from mobile/offline
- Cut down on emails

Join & Write a Comment

Go is an acronym of golang, is a programming language developed Google in 2007. Go is a new language that is mostly in the C family, with significant input from Pascal/Modula/Oberon family. Hence Go arisen as low-level language with fast compilation…
Introduction This article is the last of three articles that explain why and how the Experts Exchange QA Team does test automation for our web site. This article covers our test design approach and then goes through a simple test case example, how …
Viewers will learn about arithmetic and Boolean expressions in Java and the logical operators used to create Boolean expressions. We will cover the symbols used for arithmetic expressions and define each logical operator and how to use them in Boole…
This theoretical tutorial explains exceptions, reasons for exceptions, different categories of exception and exception hierarchy.

743 members asked questions and received personalized solutions in the past 7 days.

Join the community of 500,000 technology professionals and ask your questions.

Join & Ask a Question

Need Help in Real-Time?

Connect with top rated Experts

11 Experts available now in Live!

Get 1:1 Help Now