Link to home
Start Free TrialLog in
Avatar of saedpalnet
saedpalnet

asked on

Client Server to disseminat financial data

Hi everybody,
I tried to develop a client server program where a java applet connects to a server via a TCP/IP soecket to get realtime data from a source server. The server that accepts applet connections gets a life update from another server (through TCP/IP) and disseminates the received data directly to all connected clients. I used the common model for the chat server to implement this with some modifications (no contribution is expected from the java applets 'clients' in my case).
the system works well for a while but as the server receives more and more connections, it stops sending realtime data to connected clients (although it still accpets applet connections)
I have 3 main classes, the java applet that connects to the server, the main server that connects to the data source and listens to connections from applets, and a handler class to process realtime data and send it to all active connections.
I will include part of the server and handler classes

server class:

import java.net.*;
import java.io.*;
import java.util.*;

public class pserver2 {

      Timer timer;

      public pserver2 (int port) throws IOException {


        Socket echoSocket = null;
        PrintWriter out = null;
        BufferedReader in = null;

        try {
            echoSocket = new Socket("ipaddress of data source server", portnumber);
            out = new PrintWriter(echoSocket.getOutputStream(), true);
            in = new BufferedReader(new InputStreamReader(
                                        echoSocket.getInputStream()));
        } catch (UnknownHostException e) {
            System.err.println("Don't know about host: PSE server");
            System.exit(1);
        } catch (IOException e) {
            System.err.println("Couldn't get I/O for "
                               + "the connection to: PSE server" + e);
            System.exit(1);
        }

          // start listening at the port sent as parameter to the this class
          ServerSocket server = new ServerSocket (port);

          while (true) {
                  Socket client = server.accept ();
                  // Notify a user has just been accepted
                   phandler2 c = new phandler2(client, echoSocket);

                  c.start ();

                r.gc();

                }

              }


      // The main() method creates an instance of the pserver, passing the
      // command-line port as a parameter. This is the port to which clients
      // will connect.

      public static void main (String args[]) throws IOException {
          if (args.length != 1)
                  throw new RuntimeException ("Syntax: pserver ");
          new pserver2 (Integer.parseInt (args[0]));

        }

}

the handler class

import java.net.*;
import java.io.*;
import java.util.*;
import java.sql.DriverManager;
import java.sql.ResultSetMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Connection;


public class phandler2 implements Runnable{

      protected Socket s;
      protected Socket p;
        protected DataInputStream i;
      private PrintStream o = null;

      private PrintWriter out;

        static Vector handlers = new Vector();
        static String NewsLine = new String();
      static String serverInput;

      static String[] MarketTicker ;
      static String[] MarketSummary ;
      static String[] MarketHistory;
        protected Thread listener;

public phandler2 (Socket s, Socket p) throws IOException {


    this.s = s;
    this.p = p;

System.out.println("starting handler...");

  }



  public synchronized void start () {

          if (listener == null) {

            try{
                i = new DataInputStream (new BufferedInputStream (p.getInputStream ()));

                  o = new PrintStream(s.getOutputStream(), true);

                out = new PrintWriter(p.getOutputStream(), true);

        listener = new Thread (this);
        listener.start ();
      }catch (Exception ex) {
                     System.out.println ("couldn't establish connection" );
      }

    }
  }

  public synchronized void stop () {
    if (listener != null) {
      try {
        if (listener != Thread.currentThread ())
          listener.interrupt ();
        listener = null;
      } catch (Exception ignored) {

      }
    }
  }


  public void run () {

      Runtime runtime = Runtime.getRuntime();

    try {

                  synchronized(handlers) {
                handlers.addElement(this);
                  }

       String inputLine, outputLine, query;

         PrintWriter output = new PrintWriter(s.getOutputStream(), true);


      int LN;
      int LineNumber=1;
      String Data="";

        System.out.println ("online users: " + handlers.size());

      while ((serverInput = i.readLine()) != null) {

            Data = serverInput;
                                // some data processing
                                // .......

            broadcast (Data + "\n");
      }

    } catch (IOException ex) {
           System.out.println ("Error Reading this msg" + ex);
      ex.printStackTrace ();
    } finally {


             handlers.removeElement (this);


    }


  }

public void broadcast (String message) {

    synchronized (handlers) {


            int k=0;
            for(int i = 0; i < handlers.size(); i++) {
                  k++;
                  synchronized(handlers) {
                        phandler2 handler =
                          (phandler2)handlers.elementAt(i);
                        try{

                          handler.o.println(message);
                              handler.o.flush();
                              if (o.checkError())
                              {
                                  handlers.removeElement (this);
                                        
                              }

                      } catch (Exception ex) {
                             
                            System.out.println ("not valid stream" + ex );
                        }

                  }
            }

    }

  }
}

can anyone help here? I know its a bit complicated to follow on somone else's code but I will appreciate any help or suggestions.

Thanks in advance and best regards,
Saed


ASKER CERTIFIED SOLUTION
Avatar of RuadRauFlessa
RuadRauFlessa

Link to home
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
Start Free Trial
Avatar of saedpalnet
saedpalnet

ASKER

Thanks RuadRauFlessa for your reply,
you're right, the reason I did so was I modified the code from a chat client server program and the first intentions was that each client could contribute in a way to the main data source (the applet will send some data) in this case I have to keep a seperate thread for each connection right?
anyways, for now since I am only doing data dissemination I will try to do as you said. meanwhile I will apperciate any code snippet to implement what you have suggested.

Thanks
Firstly you could have a class that will do the general data calculation on a threaded basis like you are doing it at the moment but without the sending to the clients. I would do something like that for future use when you want the clients to have an input on the data that you are working with. Keep your client threads and create a send and receive queue within the class that reads or writes to the client. That would allow you to accept all of the input from all of the clients without them knowing that their data or requests are transfered to a single process queue.

Then you can keep a list of all the client threads within your class that processes the data and periodically before you calculate the stats read what they want to say to the data processing thread. Then when it is finished calculating the data you can just add the output that you need to send to the clients to their sending queues.
please send some general code on how you think the classes and threads should interact. appreciate your help
Hi,

Look at this example it may help you...

http://www.developer.com/java/other/article.php/3315501

R.K
That would be a nice article to take a look at.
well, actually I think I am using the same thing here, this example creates a seperate thread for each connection. the only difference is in the run method of the handler where instead of getting the data from the client, I get the data from a different source ( a nother data source ) !!
Yeah I saw that. However I would refrain from recalculating the stats the whole time. Rather have a thread that does that and then just read whatever it has calculated each time you want to send the data tot he clients.
I am not sure I competely got you here, any code that will help explain your point more will be appreciated. besides, the data processing isnt really that complicated here.
I know it isn't complicated

======================================================
public class Process extends Thread {

   private Vector connections = null;

   public Process(Vector connections) {
      this.connections = connections;
   }

   public void run() {
      //read the input from the clients by looping through the connections list and calling:
      ClientHandler.getData()
      //do your statistical/whatever calculations and store it to local variables
      //send data back to clients by looping through them and calling:
      ClientHandler.setData(/*PUT DATA HERE*/);
   }
   
}

==================================================
public class ClientHandler extends Thread {
   public void run() {
      //send and receive data in a loop.
   }

   public void setData(String str) {
      //add the data to a list
   }

   public String[] getData() {
      //return data from a list read at an earlier stage
   }
}

======================================================
public class Main extends Thread{
    Vector connections = new Vector()

    public Main() {
       //create an instance of Process and pass the instance of connections to it.
    }

    public void run() {
       //accept connections and add them to the connections list
    }
}
======================================================

Something more or less like that
what is the thread that is responsible of listening to the data source and send the data to all connections in this case?
this one

======================================================
public class Main extends Thread{
    Vector connections = new Vector()

    public Main() {
       //create an instance of Process and pass the instance of connections to it.
    }

    public void run() {
       //accept connections and add them to the connections list
    }
}
======================================================
I have made the modifications and currently testing the results. but I have a question, what is the limit on the number of connected users using this approach, if any.
thanks
The ammount of connections is firstly limited by the ammount of port's that you have open onthe server and secondly the ammount of memory on the machine since you will run out of memory way before you will run out of actual sockets I would say memory is the biggest contributing factor. Then one would have to take a look @ processing power and that can only be minimised according to your requirments and to my knowledge this is one of the better ways of doing that.

Since you have a common calculation that has to occur on the same data over and over again and stay the same for all users there is no use in re-calculating the data/information every time a user requests it. That is unless your working data changes and for that reason you should place the calculation of the data in a thread that periodically calculates the data and then just keeps the result for random retrieval by the clients. If the data on which you base your calculations the ti is not going to be advantageous to keep on calculating the result since it will always be the same.
ok. thanks. one more questin, is there a way to know when a client has disconnected its connection so that I remove it from the connections vector, other than checking if there was an error when a packet is sent to him? currenlty I only reomove a connection from the vector only when I get en error (using conection.checkError())  after sending a packet to the client inside the clients loop.
thanks
There are a number of ways you could do that but the best would be to have the client send a disconnect command or something to the server. The server should then react to that when it is received.
hi,
I've done some modifications you suggested and I removed the error check I used to do after I send a packet to a connection (while looping through all connections)
I used to do this if (conection.checkError())  connections.removeElement(conection);.
it worked ok, but when I re-implemented the error check again the system hanged after sometime (which could be because one of the connections has been lost)! its not an important issue as I need to do this only to check the number of connection to the system, which can be implemented in another way as you stated. but do you think this could be the problem? I am still testing the system (now withouth the error check) and all I care is to make it work fine!
Yes, It could have been your initial problem as well but I doubt it. Cound you post your new code just out of interest's sake?
ok now I have 3 major classes

server class:

import java.net.*;
import java.io.*;
import java.util.*;

public class pserver2 {

     Timer timer;

     public pserver2 (int port) throws IOException {


        Socket echoSocket = null;
        PrintWriter out = null;
        BufferedReader in = null;

        try {
            echoSocket = new Socket("ipaddress of data source server", portnumber);
            out = new PrintWriter(echoSocket.getOutputStream(), true);
            in = new BufferedReader(new InputStreamReader(
                                        echoSocket.getInputStream()));
        } catch (UnknownHostException e) {
            System.err.println("Don't know about host: PSE server");
            System.exit(1);
        } catch (IOException e) {
            System.err.println("Couldn't get I/O for "
                               + "the connection to: PSE server" + e);
            System.exit(1);
        }

         // start listening at the port sent as parameter to the this class
         ServerSocket server = new ServerSocket (port);


          DataInputStream i = new DataInputStream (new BufferedInputStream (echoSocket.getInputStream ()));

                                // this thread will connect to the datasource, wait for data and send it to all stored connections
            phandler3  p3 = new phandler3(i);

            p3.start ();

         while (true) {
                Socket client = server.accept ();
               // Notify a user has just been accepted
               
              // this thread will just send the new connected users some intial data
               phandler2 c = new phandler2(client, echoSocket);

               c.start ();

              r.gc();
             
               PrintStream o = new PrintStream(client.getOutputStream(), true);
                // I add the connections to the connections vector here!
      synchronized(connections) {
                connections.addElement(o);
              }

            }


     // The main() method creates an instance of the pserver, passing the
     // command-line port as a parameter. This is the port to which clients
     // will connect.

     public static void main (String args[]) throws IOException {
         if (args.length != 1)
                throw new RuntimeException ("Syntax: pserver ");
         new pserver2 (Integer.parseInt (args[0]));

       }

}




the datasource handler class


import java.net.*;
import java.io.*;
import java.util.*;
//import java.sql.*;
import java.sql.DriverManager;
//import java.sql.SQLPermission;
import java.sql.ResultSetMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Connection;


public class phandler3 implements Runnable{

      protected Socket s;
      protected Socket p;
        protected DataInputStream i;
      private PrintStream o = null;


       static Vector handlers = new Vector();

        static String NewsLine = new String();
      static String serverInput, query;

      static String[] MarketTicker ;
      static String[] MarketSummary ;
      static String[] MarketHistory;
        protected Thread listener;

public phandler3 (DataInputStream i) throws IOException {


    this.i = i;

  }



  public synchronized void start () {

          if (listener == null) {

      try{

        listener = new Thread (this);
        listener.start ();
      }catch (Exception ex) {
         System.out.println ("couldn't establish connection" );
     }

    }
  }

  public synchronized void stop () {
    if (listener != null) {
      try {
        if (listener != Thread.currentThread ())
          listener.interrupt ();
        listener = null;
      } catch (Exception ignored) {

      }
    }
  }


  public void run () {



    try {


          while ((serverInput = i.readLine()) != null) {

                               ///........process dagta....
                               // ...........................
                              /// and send via broadcast
            broadcast (Data + "\n");
      }

    } catch (IOException ex) {
           System.out.println ("Error Reading this msg" + ex);
      ex.printStackTrace ();
    } finally {

    }
  }

public void broadcast (String message) {

    synchronized (pserver2.connections) {

      int k=0;
            for(int i = 0; i < pserver2.connections.size(); i++) {
                  k++;
                  synchronized(pserver2.connections) {
                        PrintStream o = (PrintStream)pserver2.connections.elementAt(i);

                      try{
                      o.println(message);
                                          o.flush();
                   if (o.checkError())
                   {
                  // HERE i CHECK FOR SOCKET ERRORS                                        //pserver2.connections.removeElement(o);
                 }

                } catch (Exception ex) {
                             //handler.stop ();
                            System.out.println ("not valid stream" + ex );
                        }

                  }
            }

    }

  }

}


AND there is the client initialization handler phandler2 which just send some initial data when the client first connect.


Does that work a bit better for you?
unfortunately not ... I am still having the same problem occasionaly, the system stops getting data and updating connected client. when I restart, things gets back to normal!:(
Can you see where the app is hanging?
how do I know?
Debug output. Or run it within an ide which youo could use to step through the code as it is executing.
ok, I have a question here. what will happen if for some reason the connection to the data source was lost, say for example because of a network connection? the DataInputStream i in phandler3 will no longer be valid and it will never be back again as the thread doesnt try to reconnect! this could be the problem, right?
That could be a problem yes. If you loose your connection you will however receive an exception when you try and send data.