[x]
Posted via EE Mobile

Search, ask, and monitor your questions on the go with EE Mobile. Visit Experts Exchange from your mobile device and never be out of touch again.

Question
[x]
Attachment Details
[x]
The Solution Rating System

With so many solutions, how can you tell which solutions are most likely to help you and which ones are not? To provide you with a tool to use, we rate our solutions based on various elements that most accurately determine if a solution is a quality solution. To explain what factors affect the solution rating, here are the elements we take into consideration when formulating our solution rating.

  • The Grade of the Solution
  • The Zone Rank of the Expert Providing the Solution
  • The Number of Author and Expert Comments
  • The Number of Experts Contributing
  • The Feedback of the Community

Your Input Matters
Because of the way the system is set up, the most important variable in this equation is you. As a member of Experts Exchange, you are able to cast your vote on the quality of the solutions in regard to how complete, accurate, helpful and easy to understand each solution is. When you provide your feedback, each rating is adjusted accordingly. So, if you see a solution that has a poor rating that you think is a good solution, let us know by rating it. As you do, the rating will be adjusted and will become more accurate for other members of our site.

If you have any suggestions that you would like to make for our rating system, please ask a question in the Suggestions Zone of Community Support.

Thank you!

6.4

Client Server programming

Asked by pankajtiwary in Java Programming Language

Tags: java, createsocketchannel

Hello experts,

I am a newbie in Java and having problems with code.

I am working on an application where there is a server (which serves multiple connections using asynchronous I/O using select facility. It does nothing but accepts a connection and reply back with the same data as received (imitating an echo server). Code for the server is as follows:

// Server.java
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;

public class Server implements Runnable
{
  // The port we will listen on
  private int port;

  // A pre-allocated buffer for encrypting data
  private final ByteBuffer buffer = ByteBuffer.allocate( 100 );

  public Server( int port ) {
    this.port = port;
    new Thread( this ).start();
  }

  public void run() {
    try {
      // Instead of creating a ServerSocket,
      // create a ServerSocketChannel
      ServerSocketChannel ssc = ServerSocketChannel.open();

      // Set it to non-blocking, so we can use select
      ssc.configureBlocking( false );

      // Get the Socket connected to this channel, and bind it
      // to the listening port
      ServerSocket ss = ssc.socket();
      InetSocketAddress isa = new InetSocketAddress( port );
      ss.bind( isa );

      // Create a new Selector for selecting
      Selector selector = Selector.open();

      // Register the ServerSocketChannel, so we can
      // listen for incoming connections
      ssc.register( selector, SelectionKey.OP_ACCEPT );
      System.out.println( "Listening on port "+port );

      while (true) {
        // See if we've had any activity -- either
        // an incoming connection, or incoming data on an
        // existing connection
        int num = selector.select();

        // If we don't have any activity, loop around and wait
        // again
        if (num == 0) {
          continue;
        }

        // Get the keys corresponding to the activity
        // that has been detected, and process them
        // one by one
        Set keys = selector.selectedKeys();
        Iterator it = keys.iterator();
        while (it.hasNext()) {
          // Get a key representing one of bits of I/O
          // activity
          SelectionKey key = (SelectionKey)it.next();

          // What kind of activity is it?
          if ((key.readyOps() & SelectionKey.OP_ACCEPT) ==
            SelectionKey.OP_ACCEPT) {

            // It's an incoming connection.
            // Register this socket with the Selector
            // so we can listen for input on it

            Socket s = ss.accept();
            System.out.println( "Got connection from "+s );

            // Make sure to make it non-blocking, so we can
            // use a selector on it.
            SocketChannel sc = s.getChannel();
            sc.configureBlocking( false );

            // Register it with the selector, for reading
            sc.register( selector, SelectionKey.OP_READ );
          } else if ((key.readyOps() & SelectionKey.OP_READ) ==
            SelectionKey.OP_READ) {

            SocketChannel sc = null;

            try {

              // It's incoming data on a connection, so
              // process it
              sc = (SocketChannel)key.channel();
              boolean ok = processInput( sc );

              // If the connection is dead, then remove it
              // from the selector and close it
              if (!ok) {
                key.cancel();

                Socket s = null;
                try {
                  s = sc.socket();
                  s.close();
                } catch( IOException ie ) {
                  System.err.println( "Error closing socket "+s+": "+ie );
                }
              }

            } catch( IOException ie ) {

              // On exception, remove this channel from the selector
              key.cancel();

              try {
                sc.close();
              } catch( IOException ie2 ) { System.out.println( ie2 ); }

              System.out.println( "Closed "+sc );
            }
          }
        }

        // We remove the selected keys, because we've dealt
        // with them.
        keys.clear();
      }
    } catch( IOException ie ) {
      System.err.println( ie );
    }
  }

  // Do some cheesy encryption on the incoming data,
  // and send it back out
  private boolean processInput( SocketChannel sc ) throws IOException {
    buffer.clear();
    sc.read( buffer );
    buffer.flip();

    // If no data, close the connection
    if (buffer.limit()==0) {
      return false;
    }

    int i = 0;
    for (i=0; i<buffer.limit(); ++i) {
      byte b = buffer.get( i );

      if ( b == (char)13 ) {
        break;
      }

      buffer.put( i, b );
    }

    sc.write( buffer );

    System.out.println( "Processed "+buffer.limit()+" from "+sc );

    return true;
  }

  static public void main( String args[] ) throws Exception {
    int port = Integer.parseInt( args[0] );

    new Server( port );
  }
}

Client works in the following manner. It creates a sender thread which opens 10 connections to the server and starts sending some data (messages in our case). Client also creates a receiver thread which looks for incoming data on the same channel using the same select mechanism. When the data becomes available on one of the connections, it creates a worker thread and assigns the connection to it. The worker thread then reads the data from the connection (which is actually sent by the server) and prints it on the screen. Client code involves creation of  a ThreadPool and worker threads so its pretty big, but I have no option other than pasting the whole thing.

//Client.java
import java.net.*;
import java.io.*;
import java.nio.channels.*;
import java.nio.*;
import java.util.*;

class ThreadPool {
  /**
   * The threads in the pool.
   */
  protected Thread threads[] = null;
  /**
   * The backlog of assignments, which are waiting
   * for the thread pool.
   */
  Collection assignments = new ArrayList(3);
  /**
   * A Done object that is used to track when the
   * thread pool is done, that is has no more work
   * to perform.
   */
  protected Done done = new Done();

  /**
   * The constructor.
   *
   * @param size  How many threads in the thread pool.
   * @param s SocketChannel to read from.
   */
  public ThreadPool(int size)
  {
    threads = new WorkerThread[size];
    for (int i=0;i<threads.length;i++) {
      threads[i] = new WorkerThread(this);
      //System.out.println ("WorkerThread created");
    }
  }

  public void startWorking() {
    for (int i=0;i<threads.length;i++) {
      threads[i].start();
    }
  }


  /**
   * Add a task to the thread pool. Any class
   * which implements the Runnable interface
   * may be assienged. When this task runs, its
   * run method will be called.
   *
   * @param r   An object that implements the Runnable interface
   */
  public synchronized void assign(Runnable r)
  {
    //System.out.println ("assign");
    done.workerBegin();
    assignments.add(r);
    notify();
  }

  /**
   * Get a new work assignment.
   *
   * @return A new assignment
   */
  public synchronized Runnable getAssignment()
  {
    try {
      while ( !assignments.iterator().hasNext() )
        wait();

      Runnable r = (Runnable)assignments.iterator().next();
      assignments.remove(r);
      return r;
    } catch (InterruptedException e) {
      done.workerEnd();
      return null;
    }
  }

  /**
   * Called to block the current thread until
   * the thread pool has no more work.
   */
  public void complete()
  {
    done.waitBegin();
    done.waitDone();
  }

  protected void finalize()
  {
    done.reset();
    for (int i=0;i<threads.length;i++) {
      threads[i].interrupt();
      done.workerBegin();
      threads[i].destroy();
    }
    done.waitDone();
  }

}

/**
 * The worker threads that make up the thread pool.
 *
 */
class WorkerThread extends Thread {
  /**
   * True if this thread is currently processing.
   */
  public boolean busy;
  /**
   * The thread pool that this object belongs to.
   */
  public ThreadPool owner;

  /**
   * The constructor.
   *
   * @param o the thread pool
   */
  WorkerThread(ThreadPool o)
  {
    //System.out.println ("WorkerThread constructor");
    owner = o;
  }

  /**
   * Scan for and execute tasks.
   */
  public void run()
  {
    Runnable target = null;
    do {
      target = owner.getAssignment();
      if (target!=null) {
        target.run();
        owner.done.workerEnd();
      }
    } while (target!=null);
  }
}


/**
 *
 * This is a very simple object that
 * allows the TheadPool to determine when
 * it is done. This object implements
 * a simple lock that the ThreadPool class
 * can wait on to determine completion.
 * Done is defined as the ThreadPool having
 * no more work to complete.
 *
 */
class Done {

  /**
   * The number of Worker object
   * threads that are currently working
   * on something.
   */
  private int _activeThreads = 0;

  /**
   * This boolean keeps track of if
   * the very first thread has started
   * or not. This prevents this object
   * from falsely reporting that the ThreadPool
   * is done, just because the first thread
   * has not yet started.
   */
  private boolean _started = false;
  /**
   * This method can be called to block
   * the current thread until the ThreadPool
   * is done.
   */

  synchronized public void waitDone()
  {
    try {
      while ( _activeThreads>0 ) {
        wait();
      }
    } catch ( InterruptedException e ) {
    }
  }
  /**
   * Called to wait for the first thread to
   * start. Once this method returns the
   * process has begun.
   */

  synchronized public void waitBegin()
  {
    try {
    while ( !_started ) {
      wait();
    }
   } catch ( InterruptedException e ) {
   }
  }


 /**
   * Called by a Worker object
   * to indicate that it has begun
   * working on a workload.
   */
  synchronized public void workerBegin()
  {
    _activeThreads++;
    _started = true;
    notify();
  }

  /**
   * Called by a Worker object to
   * indicate that it has completed a
   * workload.
   */
  synchronized public void workerEnd()
  {
    _activeThreads--;
    notify();
  }

  /**
   * Called to reset this object to
   * its initial state.
   */
  synchronized public void reset()
  {
    _activeThreads = 0;
  }

}

/**
 * This class shows an example worker thread that can
 * be used with the thread pool. It demonstrates the main
 * points that should be included in any worker thread. Use
 * this as a starting point for your own threads.
 *
 */
class TestWorkerThread implements Runnable {
  static private int count = 0;
  private int taskNumber;
  protected Done done;

  /**
   * Buffer to read data from SocketChannel.
   */
  //private final ByteBuffer buffer = ByteBuffer.allocate( 1024 );

  /**
   * SocketChennel to read data from.
   */
  private SocketChannel sc;

  /**
   *
   * @param done
   */
  TestWorkerThread(SocketChannel s)
  {
    count++;
    taskNumber = count;
    sc = s;
  }

  public void run()
  {
    /*
    for (int i=0;i<100;i+=2) {
      System.out.println("Task number: " + taskNumber +
             ",percent complete = " + i );
      try {
        Thread.sleep((int)(Math.random()*500));
      } catch (InterruptedException e) {
      }
    }
    */
    try {
      final ByteBuffer buffer = ByteBuffer.allocate( 1024 );
      buffer.clear();
      sc.read( buffer );
      StringBuffer m = new StringBuffer();
      if (buffer.limit()==0) {
        //continue;
        return;
      }
      System.out.println ( "buffer.limit() = " + buffer.limit() );
      for (int i=0; i<buffer.limit(); ++i) {
        byte b = buffer.get( i );
        if ( b == (char)13 ) {
          break;
        } else {
          m.append((char)b);
        }
      }
      System.out.println("Read from server: " + m );
    } catch ( IOException g ) {
      System.out.println (g);
    }
  }
}

class SenderThread implements Runnable {
  Thread t;
  SocketChannel con[];
  SenderThread(SocketChannel sc[]) {
    t = new Thread(this, "Sender Thread");
    int i;
    con = new SocketChannel[10];
    for ( i = 0; i < 10; i++ ) {
      con[i] = sc[i];
    }
    System.out.println("Sender Thread initialized");
    t.start();
  }

  public void run() {
    String message[] = { "01message01" + (char)13, "02message02" + (char)13,
                         "03message03" + (char)13, "04message04" + (char)13,
                         "05message05" + (char)13, "06message06" + (char)13,
                         "07message07" + (char)13, "08message08" + (char)13,
                         "09message09" + (char)13, "10message10" + (char)13 };
    int i = 0;
    for (i=0; i<10; i++) {
      ByteBuffer buf = ByteBuffer.wrap ( message[i].getBytes() );
      try {
        System.out.println ( "Writing to socket channel:" + message[i] );
        int n = con[i].write(buf);
      }
      catch ( IOException e) {
        System.out.println (e);
      }
      if ( i == 9 ) {
        i = -1;
      }
    }
  }
}

class ReceiverThread implements Runnable {
  Thread t;
  ThreadPool pool;
  Selector s;
  SocketChannel con[];
  private final ByteBuffer buffer = ByteBuffer.allocate( 1024 );
  ReceiverThread (SocketChannel sc[]) {
    t = new Thread(this, "Receiver Thread");
    pool = new ThreadPool(10);
    System.out.println ("In ReceiverThread constructor: pool initialized");
    try {
      s = Selector.open();
      int i;
      con = new SocketChannel[10];
      for ( i = 0; i < 10; i++ ) {
        con[i] = sc[i];
        con[i].register(s, SelectionKey.OP_READ);
      }
    } catch ( IOException g ) {
      System.out.println (g);
    }
    System.out.println("Receiver Thread initialized");
    t.start();
  }
  public void run() {
    pool.startWorking();
    try {
      while ( true ) {
        int num = s.select();
        if ( num == 0 ) {
          System.out.println ( "num == 0");
          continue;
        }
        Set keys = s.selectedKeys();
        Iterator it = keys.iterator();
        while ( it.hasNext() ) {
          SelectionKey key = (SelectionKey)it.next();
          SocketChannel sc = null;
          if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
            sc = (SocketChannel)key.channel();
            /*
            buffer.clear();
            sc.read( buffer );
            StringBuffer m = new StringBuffer();
            if (buffer.limit()==0) {
              continue;
            }
            System.out.println ( "buffer.limit() = " + buffer.limit() );
            for (int i=0; i<buffer.limit(); ++i) {
              byte b = buffer.get( i );
              if ( b == (char)13 ) {
                break;
              } else {
                m.append((char)b);
              }
            }
            System.out.println("Read from server: " + m );
            */
            pool.assign ( new TestWorkerThread ( sc ) );
          }
        }
        keys.clear();
      }
    } catch ( IOException g ) {
      System.out.println (g);
    }
  }
}

public class Client {
  public static void main(String[] args) {
    String addr = "localhost";
    int port = 19999;
    SocketChannel connection[];
    connection = new SocketChannel[10];
    int i;
    for (i = 0; i < 10; i++) {
      try {
        connection[i] = createSocketChannel(addr, port);

        // Before the socket is usable, the connection must be completed
        // by calling finishConnect(), which is non-blocking
        while (!connection[i].finishConnect()) {
          // Do something else
        }
        System.out.println("Connection[" + i + "] to server established");
      }
      catch (IOException g) {
        System.out.println("IOException: " + g);
      }
      catch (Exception g) {
        System.out.println("Exception: " + g);
      }
    }
    new SenderThread ( connection );
    new ReceiverThread ( connection );
  }

  // Creates a non-blocking socket channel for the specified host name and port.
  // connect() is called on the new channel before it is returned.
  private static SocketChannel createSocketChannel(String hostName, int port)
                                                   throws IOException {
    // Create a non-blocking socket channel
    SocketChannel sChannel = SocketChannel.open();
    sChannel.configureBlocking(false);

    // Send a connection request to the server; this method is non-blocking
    sChannel.connect(new InetSocketAddress(hostName, port));
    return sChannel;
  }
}
 Is there any obvious problems that you can see here in the code? If not, I ran Server and when I ran client I get the following output.

bash-2.05$ java Client
Connection[0] to server established
Connection[1] to server established
Connection[2] to server established
Connection[3] to server established
Connection[4] to server established
Connection[5] to server established
Connection[6] to server established
Connection[7] to server established
Connection[8] to server established
Connection[9] to server established
Sender Thread initialized
In ReceiverThread constructor: pool initialized
Receiver Thread initialized
Writing to socket channel:01message01
Writing to socket channel:02message02
Writing to socket channel:03message03
Writing to socket channel:04message04
Writing to socket channel:05message05
Writing to socket channel:06message06
Writing to socket channel:07message07
Writing to socket channel:08message08
Writing to socket channel:09message09
Writing to socket channel:10message10
Writing to socket channel:01message01
Writing to socket channel:02message02
Writing to socket channel:03message03
Writing to socket channel:04message04
Writing to socket channel:05message05
Writing to socket channel:06message06
Writing to socket channel:07message07
Writing to socket channel:08message08
Writing to socket channel:09message09
Writing to socket channel:10message10
...
...
buffer.limit() = 1024
Read from server: 07message07
buffer.limit() = 1024
Read from server: 07message07
buffer.limit() = 1024
Read from server: 04message04
buffer.limit() = 1024
Read from server: 03message03
buffer.limit() = 1024
Read from server: 10message10
buffer.limit() = 1024
Read from server: 02message02
buffer.limit() = 1024
Read from server: 08message08
buffer.limit() = 1024
Read from server: 05message05
buffer.limit() = 1024
Read from server: 01message01
buffer.limit() = 1024
Read from server: 09message09
buffer.limit() = 1024
Read from server: 06message06
buffer.limit() = 1024
Read from server:
buffer.limit() = 1024
Read from server:
...
...
Read from server:
buffer.limit() = 1024
Read from server:
buffer.limit() = 1024
Read from server: 06message06
buffer.limit() = 1024
Read from server: 06message06
buffer.limit() = 1024
Read from server:
buffer.limit() = 1024
...
...
Read from server:
buffer.limit() = 1024
Read from server:
buffer.limit() = 1024
Read from server: 09message09
buffer.limit() = 1024
Read from server:
buffer.limit() = 1024
...
...
Read from server:
buffer.limit() = 1024
Read from server:
buffer.limit() = 1024
buffer.limit() = 1024
Read from server: 03message03
buffer.limit() = 1024
Read from server: 10message10
buffer.limit() = 1024
Read from server: 02message02
buffer.limit() = 1024
Read from server:
buffer.limit() = 1024
...
...
Read from server:
buffer.limit() = 1024
Read from server:
Writing to socket channel:07message07
Writing to socket channel:08message08
Writing to socket channel:09message09
Writing to socket channel:10message10
Writing to socket channel:01message01
Writing to socket channel:02message02
Writing to socket channel:03message03
Writing to socket channel:04message04
Writing to socket channel:05message05
...
...
Writing to socket channel:06message06
Writing to socket channel:07message07
buffer.limit() = 1024
Read from server:
Read from server:
Read from server:
Read from server:
buffer.limit() = 1024
Read from server:
buffer.limit() = 1024
Read from server: 01message01
buffer.limit() = 1024
Read from server: 09message09
buffer.limit() = 1024
Read from server: 06message06
buffer.limit() = 1024
Read from server: 07message07
buffer.limit() = 1024
Read from server: 04message04
buffer.limit() = 1024
Read from server: 03message03
buffer.limit() = 1024
Read from server: 10message10
buffer.limit() = 1024
Read from server: 02message02
buffer.limit() = 1024
Read from server: 08message08
buffer.limit() = 1024
Read from server: 05message05
buffer.limit() = 1024
Read from server:
...
...
Read from server:
buffer.limit() = 1024
Read from server:
buffer.limit() = 1024
Read from server: ssage01
buffer.limit() = 1024
Read from server: ssage09
buffer.limit() = 1024
Read from server:
buffer.limit() = 1024
Read from server: ssage10
buffer.limit() = 1024
Read from server:
buffer.limit() = 1024
Read from server:
buffer.limit() = 1024
Read from server:
Writing to socket channel:08message08
Writing to socket channel:09message09
buffer.limit() = 1024
Read from server: ssage03
buffer.limit() = 1024
Read from server:
buffer.limit() = 1024

As you can see the lines starting with "Read from server" contains the message from the server, we see that the message is not coming properly and it gets truncated somehow. I have no idea as to why it is happening. Please help me.

Regards
Pankaj
[+][-]10/10/05 10:24 PM, ID: 15057819Accepted Solution

View this solution now by starting your 30-day free trial. Setting up your free trial is quick, easy, and secure. We will return you to this solution, unlocked, when you're done.

About this solution

Zone: Java Programming Language
Tags: java, createsocketchannel
Sign Up Now!
Solution Provided By: OnegaZhang
Participating Experts: 1
Solution Grade: A
 
[+][-]10/10/05 02:24 AM, ID: 15050927Expert Comment

At Experts Exchange, members can ask their questions to thousands of technology professionals, also known as Experts. Experts compete and collaborate to answer those questions by leaving comments like this one.

Start your 30-day free trial to view this Expert Comment or ask the Experts your question.

 
[+][-]10/10/05 02:33 AM, ID: 15050963Author Comment

Often, when Experts are collaborating with members who have asked questions, they will request additional information about the problem. Askers respond with an author comment like this one.

Start your 30-day free trial to view this Author Comment or ask the Experts your question.

 
[+][-]10/10/05 03:35 AM, ID: 15051205Author Comment

Often, when Experts are collaborating with members who have asked questions, they will request additional information about the problem. Askers respond with an author comment like this one.

Start your 30-day free trial to view this Author Comment or ask the Experts your question.

 
[+][-]10/10/05 05:14 AM, ID: 15051619Expert Comment

At Experts Exchange, members can ask their questions to thousands of technology professionals, also known as Experts. Experts compete and collaborate to answer those questions by leaving comments like this one.

Start your 30-day free trial to view this Expert Comment or ask the Experts your question.

 
[+][-]10/10/05 05:23 AM, ID: 15051659Expert Comment

At Experts Exchange, members can ask their questions to thousands of technology professionals, also known as Experts. Experts compete and collaborate to answer those questions by leaving comments like this one.

Start your 30-day free trial to view this Expert Comment or ask the Experts your question.

 
[+][-]10/10/05 05:32 AM, ID: 15051700Author Comment

Often, when Experts are collaborating with members who have asked questions, they will request additional information about the problem. Askers respond with an author comment like this one.

Start your 30-day free trial to view this Author Comment or ask the Experts your question.

 
[+][-]10/10/05 05:34 AM, ID: 15051711Author Comment

Often, when Experts are collaborating with members who have asked questions, they will request additional information about the problem. Askers respond with an author comment like this one.

Start your 30-day free trial to view this Author Comment or ask the Experts your question.

 
[+][-]10/10/05 05:45 AM, ID: 15051771Author Comment

Often, when Experts are collaborating with members who have asked questions, they will request additional information about the problem. Askers respond with an author comment like this one.

Start your 30-day free trial to view this Author Comment or ask the Experts your question.

 
[+][-]10/10/05 07:37 AM, ID: 15052543Expert Comment

At Experts Exchange, members can ask their questions to thousands of technology professionals, also known as Experts. Experts compete and collaborate to answer those questions by leaving comments like this one.

Start your 30-day free trial to view this Expert Comment or ask the Experts your question.

 
[+][-]12/28/05 04:30 PM, ID: 15567370Administrative Comment

Experts Exchange has a courteous staff of administrators who help members get the most out of the website by means of administrative comments like this one.

Start your 30-day free trial to view this Administrative Comment or ask the Experts your question.

 
[+][-]01/02/06 05:47 AM, ID: 15591149Administrative Comment

Experts Exchange has a courteous staff of administrators who help members get the most out of the website by means of administrative comments like this one.

Start your 30-day free trial to view this Administrative Comment or ask the Experts your question.

 
 
Loading Advertisement...
20091111-EE-VQP-92