saedpalnet
asked on
Client Server to disseminat financial data
Hi everybody,
I tried to develop a client server program where a java applet connects to a server via a TCP/IP soecket to get realtime data from a source server. The server that accepts applet connections gets a life update from another server (through TCP/IP) and disseminates the received data directly to all connected clients. I used the common model for the chat server to implement this with some modifications (no contribution is expected from the java applets 'clients' in my case).
the system works well for a while but as the server receives more and more connections, it stops sending realtime data to connected clients (although it still accpets applet connections)
I have 3 main classes, the java applet that connects to the server, the main server that connects to the data source and listens to connections from applets, and a handler class to process realtime data and send it to all active connections.
I will include part of the server and handler classes
server class:
import java.net.*;
import java.io.*;
import java.util.*;
public class pserver2 {
Timer timer;
public pserver2 (int port) throws IOException {
Socket echoSocket = null;
PrintWriter out = null;
BufferedReader in = null;
try {
echoSocket = new Socket("ipaddress of data source server", portnumber);
out = new PrintWriter(echoSocket.get OutputStre am(), true);
in = new BufferedReader(new InputStreamReader(
echoSocket.getInputStream( )));
} catch (UnknownHostException e) {
System.err.println("Don't know about host: PSE server");
System.exit(1);
} catch (IOException e) {
System.err.println("Couldn 't get I/O for "
+ "the connection to: PSE server" + e);
System.exit(1);
}
// start listening at the port sent as parameter to the this class
ServerSocket server = new ServerSocket (port);
while (true) {
Socket client = server.accept ();
// Notify a user has just been accepted
phandler2 c = new phandler2(client, echoSocket);
c.start ();
r.gc();
}
}
// The main() method creates an instance of the pserver, passing the
// command-line port as a parameter. This is the port to which clients
// will connect.
public static void main (String args[]) throws IOException {
if (args.length != 1)
throw new RuntimeException ("Syntax: pserver ");
new pserver2 (Integer.parseInt (args[0]));
}
}
the handler class
import java.net.*;
import java.io.*;
import java.util.*;
import java.sql.DriverManager;
import java.sql.ResultSetMetaData ;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Connection;
public class phandler2 implements Runnable{
protected Socket s;
protected Socket p;
protected DataInputStream i;
private PrintStream o = null;
private PrintWriter out;
static Vector handlers = new Vector();
static String NewsLine = new String();
static String serverInput;
static String[] MarketTicker ;
static String[] MarketSummary ;
static String[] MarketHistory;
protected Thread listener;
public phandler2 (Socket s, Socket p) throws IOException {
this.s = s;
this.p = p;
System.out.println("starti ng handler...");
}
public synchronized void start () {
if (listener == null) {
try{
i = new DataInputStream (new BufferedInputStream (p.getInputStream ()));
o = new PrintStream(s.getOutputStr eam(), true);
out = new PrintWriter(p.getOutputStr eam(), true);
listener = new Thread (this);
listener.start ();
}catch (Exception ex) {
System.out.println ("couldn't establish connection" );
}
}
}
public synchronized void stop () {
if (listener != null) {
try {
if (listener != Thread.currentThread ())
listener.interrupt ();
listener = null;
} catch (Exception ignored) {
}
}
}
public void run () {
Runtime runtime = Runtime.getRuntime();
try {
synchronized(handlers) {
handlers.addElement(this);
}
String inputLine, outputLine, query;
PrintWriter output = new PrintWriter(s.getOutputStr eam(), true);
int LN;
int LineNumber=1;
String Data="";
System.out.println ("online users: " + handlers.size());
while ((serverInput = i.readLine()) != null) {
Data = serverInput;
// some data processing
// .......
broadcast (Data + "\n");
}
} catch (IOException ex) {
System.out.println ("Error Reading this msg" + ex);
ex.printStackTrace ();
} finally {
handlers.removeElement (this);
}
}
public void broadcast (String message) {
synchronized (handlers) {
int k=0;
for(int i = 0; i < handlers.size(); i++) {
k++;
synchronized(handlers) {
phandler2 handler =
(phandler2)handlers.elemen tAt(i);
try{
handler.o.println(message) ;
handler.o.flush();
if (o.checkError())
{
handlers.removeElement (this);
}
} catch (Exception ex) {
System.out.println ("not valid stream" + ex );
}
}
}
}
}
}
can anyone help here? I know its a bit complicated to follow on somone else's code but I will appreciate any help or suggestions.
Thanks in advance and best regards,
Saed
I tried to develop a client server program where a java applet connects to a server via a TCP/IP soecket to get realtime data from a source server. The server that accepts applet connections gets a life update from another server (through TCP/IP) and disseminates the received data directly to all connected clients. I used the common model for the chat server to implement this with some modifications (no contribution is expected from the java applets 'clients' in my case).
the system works well for a while but as the server receives more and more connections, it stops sending realtime data to connected clients (although it still accpets applet connections)
I have 3 main classes, the java applet that connects to the server, the main server that connects to the data source and listens to connections from applets, and a handler class to process realtime data and send it to all active connections.
I will include part of the server and handler classes
server class:
import java.net.*;
import java.io.*;
import java.util.*;
public class pserver2 {
Timer timer;
public pserver2 (int port) throws IOException {
Socket echoSocket = null;
PrintWriter out = null;
BufferedReader in = null;
try {
echoSocket = new Socket("ipaddress of data source server", portnumber);
out = new PrintWriter(echoSocket.get
in = new BufferedReader(new InputStreamReader(
echoSocket.getInputStream(
} catch (UnknownHostException e) {
System.err.println("Don't know about host: PSE server");
System.exit(1);
} catch (IOException e) {
System.err.println("Couldn
+ "the connection to: PSE server" + e);
System.exit(1);
}
// start listening at the port sent as parameter to the this class
ServerSocket server = new ServerSocket (port);
while (true) {
Socket client = server.accept ();
// Notify a user has just been accepted
phandler2 c = new phandler2(client, echoSocket);
c.start ();
r.gc();
}
}
// The main() method creates an instance of the pserver, passing the
// command-line port as a parameter. This is the port to which clients
// will connect.
public static void main (String args[]) throws IOException {
if (args.length != 1)
throw new RuntimeException ("Syntax: pserver ");
new pserver2 (Integer.parseInt (args[0]));
}
}
the handler class
import java.net.*;
import java.io.*;
import java.util.*;
import java.sql.DriverManager;
import java.sql.ResultSetMetaData
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Connection;
public class phandler2 implements Runnable{
protected Socket s;
protected Socket p;
protected DataInputStream i;
private PrintStream o = null;
private PrintWriter out;
static Vector handlers = new Vector();
static String NewsLine = new String();
static String serverInput;
static String[] MarketTicker ;
static String[] MarketSummary ;
static String[] MarketHistory;
protected Thread listener;
public phandler2 (Socket s, Socket p) throws IOException {
this.s = s;
this.p = p;
System.out.println("starti
}
public synchronized void start () {
if (listener == null) {
try{
i = new DataInputStream (new BufferedInputStream (p.getInputStream ()));
o = new PrintStream(s.getOutputStr
out = new PrintWriter(p.getOutputStr
listener = new Thread (this);
listener.start ();
}catch (Exception ex) {
System.out.println ("couldn't establish connection" );
}
}
}
public synchronized void stop () {
if (listener != null) {
try {
if (listener != Thread.currentThread ())
listener.interrupt ();
listener = null;
} catch (Exception ignored) {
}
}
}
public void run () {
Runtime runtime = Runtime.getRuntime();
try {
synchronized(handlers) {
handlers.addElement(this);
}
String inputLine, outputLine, query;
PrintWriter output = new PrintWriter(s.getOutputStr
int LN;
int LineNumber=1;
String Data="";
System.out.println ("online users: " + handlers.size());
while ((serverInput = i.readLine()) != null) {
Data = serverInput;
// some data processing
// .......
broadcast (Data + "\n");
}
} catch (IOException ex) {
System.out.println ("Error Reading this msg" + ex);
ex.printStackTrace ();
} finally {
handlers.removeElement (this);
}
}
public void broadcast (String message) {
synchronized (handlers) {
int k=0;
for(int i = 0; i < handlers.size(); i++) {
k++;
synchronized(handlers) {
phandler2 handler =
(phandler2)handlers.elemen
try{
handler.o.println(message)
handler.o.flush();
if (o.checkError())
{
handlers.removeElement (this);
}
} catch (Exception ex) {
System.out.println ("not valid stream" + ex );
}
}
}
}
}
}
can anyone help here? I know its a bit complicated to follow on somone else's code but I will appreciate any help or suggestions.
Thanks in advance and best regards,
Saed
ASKER CERTIFIED SOLUTION
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
Firstly you could have a class that will do the general data calculation on a threaded basis like you are doing it at the moment but without the sending to the clients. I would do something like that for future use when you want the clients to have an input on the data that you are working with. Keep your client threads and create a send and receive queue within the class that reads or writes to the client. That would allow you to accept all of the input from all of the clients without them knowing that their data or requests are transfered to a single process queue.
Then you can keep a list of all the client threads within your class that processes the data and periodically before you calculate the stats read what they want to say to the data processing thread. Then when it is finished calculating the data you can just add the output that you need to send to the clients to their sending queues.
Then you can keep a list of all the client threads within your class that processes the data and periodically before you calculate the stats read what they want to say to the data processing thread. Then when it is finished calculating the data you can just add the output that you need to send to the clients to their sending queues.
ASKER
please send some general code on how you think the classes and threads should interact. appreciate your help
Hi,
Look at this example it may help you...
http://www.developer.com/java/other/article.php/3315501
R.K
Look at this example it may help you...
http://www.developer.com/java/other/article.php/3315501
R.K
That would be a nice article to take a look at.
ASKER
well, actually I think I am using the same thing here, this example creates a seperate thread for each connection. the only difference is in the run method of the handler where instead of getting the data from the client, I get the data from a different source ( a nother data source ) !!
Yeah I saw that. However I would refrain from recalculating the stats the whole time. Rather have a thread that does that and then just read whatever it has calculated each time you want to send the data tot he clients.
ASKER
I am not sure I competely got you here, any code that will help explain your point more will be appreciated. besides, the data processing isnt really that complicated here.
I know it isn't complicated
========================== ========== ========== ========
public class Process extends Thread {
private Vector connections = null;
public Process(Vector connections) {
this.connections = connections;
}
public void run() {
//read the input from the clients by looping through the connections list and calling:
ClientHandler.getData()
//do your statistical/whatever calculations and store it to local variables
//send data back to clients by looping through them and calling:
ClientHandler.setData(/*PU T DATA HERE*/);
}
}
========================== ========== ========== ====
public class ClientHandler extends Thread {
public void run() {
//send and receive data in a loop.
}
public void setData(String str) {
//add the data to a list
}
public String[] getData() {
//return data from a list read at an earlier stage
}
}
========================== ========== ========== ========
public class Main extends Thread{
Vector connections = new Vector()
public Main() {
//create an instance of Process and pass the instance of connections to it.
}
public void run() {
//accept connections and add them to the connections list
}
}
========================== ========== ========== ========
Something more or less like that
==========================
public class Process extends Thread {
private Vector connections = null;
public Process(Vector connections) {
this.connections = connections;
}
public void run() {
//read the input from the clients by looping through the connections list and calling:
ClientHandler.getData()
//do your statistical/whatever calculations and store it to local variables
//send data back to clients by looping through them and calling:
ClientHandler.setData(/*PU
}
}
==========================
public class ClientHandler extends Thread {
public void run() {
//send and receive data in a loop.
}
public void setData(String str) {
//add the data to a list
}
public String[] getData() {
//return data from a list read at an earlier stage
}
}
==========================
public class Main extends Thread{
Vector connections = new Vector()
public Main() {
//create an instance of Process and pass the instance of connections to it.
}
public void run() {
//accept connections and add them to the connections list
}
}
==========================
Something more or less like that
ASKER
what is the thread that is responsible of listening to the data source and send the data to all connections in this case?
this one
========================== ========== ========== ========
public class Main extends Thread{
Vector connections = new Vector()
public Main() {
//create an instance of Process and pass the instance of connections to it.
}
public void run() {
//accept connections and add them to the connections list
}
}
========================== ========== ========== ========
==========================
public class Main extends Thread{
Vector connections = new Vector()
public Main() {
//create an instance of Process and pass the instance of connections to it.
}
public void run() {
//accept connections and add them to the connections list
}
}
==========================
ASKER
I have made the modifications and currently testing the results. but I have a question, what is the limit on the number of connected users using this approach, if any.
thanks
thanks
The ammount of connections is firstly limited by the ammount of port's that you have open onthe server and secondly the ammount of memory on the machine since you will run out of memory way before you will run out of actual sockets I would say memory is the biggest contributing factor. Then one would have to take a look @ processing power and that can only be minimised according to your requirments and to my knowledge this is one of the better ways of doing that.
Since you have a common calculation that has to occur on the same data over and over again and stay the same for all users there is no use in re-calculating the data/information every time a user requests it. That is unless your working data changes and for that reason you should place the calculation of the data in a thread that periodically calculates the data and then just keeps the result for random retrieval by the clients. If the data on which you base your calculations the ti is not going to be advantageous to keep on calculating the result since it will always be the same.
Since you have a common calculation that has to occur on the same data over and over again and stay the same for all users there is no use in re-calculating the data/information every time a user requests it. That is unless your working data changes and for that reason you should place the calculation of the data in a thread that periodically calculates the data and then just keeps the result for random retrieval by the clients. If the data on which you base your calculations the ti is not going to be advantageous to keep on calculating the result since it will always be the same.
ASKER
ok. thanks. one more questin, is there a way to know when a client has disconnected its connection so that I remove it from the connections vector, other than checking if there was an error when a packet is sent to him? currenlty I only reomove a connection from the vector only when I get en error (using conection.checkError()) after sending a packet to the client inside the clients loop.
thanks
thanks
There are a number of ways you could do that but the best would be to have the client send a disconnect command or something to the server. The server should then react to that when it is received.
ASKER
hi,
I've done some modifications you suggested and I removed the error check I used to do after I send a packet to a connection (while looping through all connections)
I used to do this if (conection.checkError()) connections.removeElement( conection) ;.
it worked ok, but when I re-implemented the error check again the system hanged after sometime (which could be because one of the connections has been lost)! its not an important issue as I need to do this only to check the number of connection to the system, which can be implemented in another way as you stated. but do you think this could be the problem? I am still testing the system (now withouth the error check) and all I care is to make it work fine!
I've done some modifications you suggested and I removed the error check I used to do after I send a packet to a connection (while looping through all connections)
I used to do this if (conection.checkError()) connections.removeElement(
it worked ok, but when I re-implemented the error check again the system hanged after sometime (which could be because one of the connections has been lost)! its not an important issue as I need to do this only to check the number of connection to the system, which can be implemented in another way as you stated. but do you think this could be the problem? I am still testing the system (now withouth the error check) and all I care is to make it work fine!
Yes, It could have been your initial problem as well but I doubt it. Cound you post your new code just out of interest's sake?
ASKER
ok now I have 3 major classes
server class:
import java.net.*;
import java.io.*;
import java.util.*;
public class pserver2 {
Timer timer;
public pserver2 (int port) throws IOException {
Socket echoSocket = null;
PrintWriter out = null;
BufferedReader in = null;
try {
echoSocket = new Socket("ipaddress of data source server", portnumber);
out = new PrintWriter(echoSocket.get OutputStre am(), true);
in = new BufferedReader(new InputStreamReader(
echoSocket.getInputStream( )));
} catch (UnknownHostException e) {
System.err.println("Don't know about host: PSE server");
System.exit(1);
} catch (IOException e) {
System.err.println("Couldn 't get I/O for "
+ "the connection to: PSE server" + e);
System.exit(1);
}
// start listening at the port sent as parameter to the this class
ServerSocket server = new ServerSocket (port);
DataInputStream i = new DataInputStream (new BufferedInputStream (echoSocket.getInputStream ()));
// this thread will connect to the datasource, wait for data and send it to all stored connections
phandler3 p3 = new phandler3(i);
p3.start ();
while (true) {
Socket client = server.accept ();
// Notify a user has just been accepted
// this thread will just send the new connected users some intial data
phandler2 c = new phandler2(client, echoSocket);
c.start ();
r.gc();
PrintStream o = new PrintStream(client.getOutp utStream() , true);
// I add the connections to the connections vector here!
synchronized(connections) {
connections.addElement(o);
}
}
// The main() method creates an instance of the pserver, passing the
// command-line port as a parameter. This is the port to which clients
// will connect.
public static void main (String args[]) throws IOException {
if (args.length != 1)
throw new RuntimeException ("Syntax: pserver ");
new pserver2 (Integer.parseInt (args[0]));
}
}
the datasource handler class
import java.net.*;
import java.io.*;
import java.util.*;
//import java.sql.*;
import java.sql.DriverManager;
//import java.sql.SQLPermission;
import java.sql.ResultSetMetaData ;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Connection;
public class phandler3 implements Runnable{
protected Socket s;
protected Socket p;
protected DataInputStream i;
private PrintStream o = null;
static Vector handlers = new Vector();
static String NewsLine = new String();
static String serverInput, query;
static String[] MarketTicker ;
static String[] MarketSummary ;
static String[] MarketHistory;
protected Thread listener;
public phandler3 (DataInputStream i) throws IOException {
this.i = i;
}
public synchronized void start () {
if (listener == null) {
try{
listener = new Thread (this);
listener.start ();
}catch (Exception ex) {
System.out.println ("couldn't establish connection" );
}
}
}
public synchronized void stop () {
if (listener != null) {
try {
if (listener != Thread.currentThread ())
listener.interrupt ();
listener = null;
} catch (Exception ignored) {
}
}
}
public void run () {
try {
while ((serverInput = i.readLine()) != null) {
///........process dagta....
// .......................... .
/// and send via broadcast
broadcast (Data + "\n");
}
} catch (IOException ex) {
System.out.println ("Error Reading this msg" + ex);
ex.printStackTrace ();
} finally {
}
}
public void broadcast (String message) {
synchronized (pserver2.connections) {
int k=0;
for(int i = 0; i < pserver2.connections.size( ); i++) {
k++;
synchronized(pserver2.conn ections) {
PrintStream o = (PrintStream)pserver2.conn ections.el ementAt(i) ;
try{
o.println(message);
o.flush();
if (o.checkError())
{
// HERE i CHECK FOR SOCKET ERRORS //pserver2.connections.rem oveElement (o);
}
} catch (Exception ex) {
//handler.stop ();
System.out.println ("not valid stream" + ex );
}
}
}
}
}
}
AND there is the client initialization handler phandler2 which just send some initial data when the client first connect.
server class:
import java.net.*;
import java.io.*;
import java.util.*;
public class pserver2 {
Timer timer;
public pserver2 (int port) throws IOException {
Socket echoSocket = null;
PrintWriter out = null;
BufferedReader in = null;
try {
echoSocket = new Socket("ipaddress of data source server", portnumber);
out = new PrintWriter(echoSocket.get
in = new BufferedReader(new InputStreamReader(
echoSocket.getInputStream(
} catch (UnknownHostException e) {
System.err.println("Don't know about host: PSE server");
System.exit(1);
} catch (IOException e) {
System.err.println("Couldn
+ "the connection to: PSE server" + e);
System.exit(1);
}
// start listening at the port sent as parameter to the this class
ServerSocket server = new ServerSocket (port);
DataInputStream i = new DataInputStream (new BufferedInputStream (echoSocket.getInputStream
// this thread will connect to the datasource, wait for data and send it to all stored connections
phandler3 p3 = new phandler3(i);
p3.start ();
while (true) {
Socket client = server.accept ();
// Notify a user has just been accepted
// this thread will just send the new connected users some intial data
phandler2 c = new phandler2(client, echoSocket);
c.start ();
r.gc();
PrintStream o = new PrintStream(client.getOutp
// I add the connections to the connections vector here!
synchronized(connections) {
connections.addElement(o);
}
}
// The main() method creates an instance of the pserver, passing the
// command-line port as a parameter. This is the port to which clients
// will connect.
public static void main (String args[]) throws IOException {
if (args.length != 1)
throw new RuntimeException ("Syntax: pserver ");
new pserver2 (Integer.parseInt (args[0]));
}
}
the datasource handler class
import java.net.*;
import java.io.*;
import java.util.*;
//import java.sql.*;
import java.sql.DriverManager;
//import java.sql.SQLPermission;
import java.sql.ResultSetMetaData
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Connection;
public class phandler3 implements Runnable{
protected Socket s;
protected Socket p;
protected DataInputStream i;
private PrintStream o = null;
static Vector handlers = new Vector();
static String NewsLine = new String();
static String serverInput, query;
static String[] MarketTicker ;
static String[] MarketSummary ;
static String[] MarketHistory;
protected Thread listener;
public phandler3 (DataInputStream i) throws IOException {
this.i = i;
}
public synchronized void start () {
if (listener == null) {
try{
listener = new Thread (this);
listener.start ();
}catch (Exception ex) {
System.out.println ("couldn't establish connection" );
}
}
}
public synchronized void stop () {
if (listener != null) {
try {
if (listener != Thread.currentThread ())
listener.interrupt ();
listener = null;
} catch (Exception ignored) {
}
}
}
public void run () {
try {
while ((serverInput = i.readLine()) != null) {
///........process dagta....
// ..........................
/// and send via broadcast
broadcast (Data + "\n");
}
} catch (IOException ex) {
System.out.println ("Error Reading this msg" + ex);
ex.printStackTrace ();
} finally {
}
}
public void broadcast (String message) {
synchronized (pserver2.connections) {
int k=0;
for(int i = 0; i < pserver2.connections.size(
k++;
synchronized(pserver2.conn
PrintStream o = (PrintStream)pserver2.conn
try{
o.println(message);
o.flush();
if (o.checkError())
{
// HERE i CHECK FOR SOCKET ERRORS //pserver2.connections.rem
}
} catch (Exception ex) {
//handler.stop ();
System.out.println ("not valid stream" + ex );
}
}
}
}
}
}
AND there is the client initialization handler phandler2 which just send some initial data when the client first connect.
Does that work a bit better for you?
ASKER
unfortunately not ... I am still having the same problem occasionaly, the system stops getting data and updating connected client. when I restart, things gets back to normal!:(
Can you see where the app is hanging?
ASKER
how do I know?
Debug output. Or run it within an ide which youo could use to step through the code as it is executing.
ASKER
ok, I have a question here. what will happen if for some reason the connection to the data source was lost, say for example because of a network connection? the DataInputStream i in phandler3 will no longer be valid and it will never be back again as the thread doesnt try to reconnect! this could be the problem, right?
That could be a problem yes. If you loose your connection you will however receive an exception when you try and send data.
ASKER
you're right, the reason I did so was I modified the code from a chat client server program and the first intentions was that each client could contribute in a way to the main data source (the applet will send some data) in this case I have to keep a seperate thread for each connection right?
anyways, for now since I am only doing data dissemination I will try to do as you said. meanwhile I will apperciate any code snippet to implement what you have suggested.
Thanks