Concurrent proocessing of requests for example ATM Host interface

I have written code for concurrent consumption and processing of ATM requests coming from a switch....I'm new to threads in JAVA.....But after sometime it consumes the messages but does not process it....

public class Server_1 implements Runnable 
{
private static ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(500);

public void start_server() throws UnknownHostException, IOException
	{
		try
		{
                         server = new ServerSocket(9999, 30, InetAddress.getByName("172.16.10.215"));
			new Listener().start();
			System.out.println("Socket Listener Started...");
		}
		catch (Exception e) 
		{
			e.printStackTrace();
		}

public class Listener extends Thread
	{
		public Listener() 
		{
			//NOTHING
		}
		public void run () 
		{
			while(true)
			{
				try 
				{
					sock = server.accept();
					System.out.println("Connection received from " + sock.getInetAddress().getHostName());
					new Thread(new Server_1(),"j8583-server").start();
					System.out.println("Started New Thread");
				} 
				catch (IOException e) 
				{
					e.printStackTrace();
				}
			}
		}
	}

public void run() 
	{
		int count = 0;
		byte[] lenbuf = new byte[4];
		try 
		{
			socket = sock;
			// For high volume apps you will be better off only reading the
			// stream in this thread
			// and then using another thread to parse the buffers and process
			// the requests
			// Otherwise the network buffer might fill up and you can miss a
			// request.
			while (socket != null && socket.isConnected() && Thread.currentThread().isAlive() && !Thread.currentThread().isInterrupted()) 
			{
				if (socket.getInputStream().read(lenbuf) == 4) 
				{
					int size = ((lenbuf[0] & 0xff) << 8) | (lenbuf[1] & 0xff);
					System.out.println("Size: "+size);
					byte[] buf = new byte[size];
					// We're not expecting ETX in this case
					socket.getInputStream().read(buf);
					count++;
					/*log.debug(socket);
					log.debug(buf);*/
					threadPool.schedule(new Processor(buf, socket), 0, TimeUnit.MILLISECONDS);
					//log.debug(threadPool.schedule(new Processor(buf, socket), 0, TimeUnit.MILLISECONDS));
					/*Processor p = new Processor(buf, sock);
					new Thread(p).start();*/
					System.out.println("~~~4~~~");
				}
			}
		} 
		catch (IOException ex) 
		{
			ex.printStackTrace();
		}
		System.out.println(String.format("Exiting after reading %d requests", count));
		try 
		{
			socket.close();
			//System.out.println("~~~5~~~");
			System.out.println("Socket Closed....");
		} 
		catch (IOException ex) 
		{
			ex.printStackTrace();
		}
	}

private class Processor implements Runnable 
	{
		private byte[] msg;
		private Socket sock;

		Processor(byte[] buf, Socket s) 
		{
			msg = buf;
			sock = s;
		} 
		
		public void run() 
		{
//Processing of 8583 messages happen here and response is sent.

}

}

Open in new window

CCBRONETAsked:
Who is Participating?
 
mccarlIT Business Systems Analyst / Software DeveloperCommented:
What is the purpose of passing the socket to the Processor class? It's hard to assume as you haven't included what is the Processor class really doing. You do mention that a response is being sent, is that response sent on the "sock" that is passed in? That would be one (of many) problems with this code, as you could have numerous different threads all trying to send their responses to the same socket.

If the underlying protocol is basically like this, receive a request, process it, then send a response to that request, receive the next request, process that request and then send a response, etc, etc, then there is really no need to start the processor in another thread. Theoretically, you should get any more requests until the response has been sent, so you are only ever processing the one request at a time (per connection that is).

Other issues I can see... the .read() method of the inputstreams for the sockets are guaranteed to return the full amount of data requested, ie. you read from the inputstream into "lenbuf" which is of size 4, but you may get the first 3 bytes returned on one call to .read() and need to call .read() again to get the 4th. (And the same problem .read()ing into the "buf" array, you might not get all "size" bytes in that first read call. This will affect the message that is being received when this first occurs, and 99% likely every other message ever sent on a particular connection after that, as you will be out of sync with the length data that is being sent.

Also, the "socket" variable that you are calling these read() methods (amongst others) on, will be overwritten when a new connection is made. If you separate out your classes into their own top level class (ie. a separate .java file) rather than being inner classes of Server_1 will help you to eliminate issues with synchronization among the various threads that you have.
0
 
ksivananthCommented:
1. move the run method implementation from Server_1 to Processor class
2. update line 32 as below

new Thread(new Processor( new byte[1024], socket ),"j8583-server").start();
0
 
CEHJCommented:
Firstly, that doesn't even look compilable. Please post code that compiles. There should only be one public class per compilation unit (source code file)
0
 
CCBRONETAuthor Commented:
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.hibernate.HibernateException;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.Transaction;
import org.hibernate.cfg.Configuration;
import org.jpos.iso.ISOException;
import org.jpos.iso.ISOHeader;
import org.jpos.iso.ISOMsg;
import org.jpos.iso.packager.GenericPackager;
import org.springframework.util.StringUtils;


/* Server_1.java contains all the business logic to perform
   various transactions on the ATM Interface Host           */
public class Server_1 implements Runnable 
{
	private static ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(500);	
	private static final Log log = LogFactory.getLog(Server_1.class);
	private Socket socket;	
	public static ServerSocket server;	
	public static Socket sock;
		
	/* This method is used to start the socket listener on a
	   designated IP address and Port No. */
	public void start_server() throws UnknownHostException, IOException
	{
		try
		{
			System.out.println("Setting up server socket listener...");
			server = new ServerSocket(9999, 10, InetAddress.getByName("172.16.10.34"));
			new Listener().start();
			System.out.println("Socket Listener Started...");
		}
		catch (HibernateException e) 
		{
			e.printStackTrace();
		}
	} 
	
	/* This class Listener creates a socket listener called
	   "j8583-server" on port no.: 9999                     */
	public class Listener extends Thread
	{
		public Listener() 
		{
			//NOTHING
		}
		public void run () 
		{
			while(true)
			{
				try 
				{
					sock = server.accept();
					System.out.println("Connection received from " + sock.getInetAddress().getHostName());
					new Thread(new Processor(new byte[1024], sock),"j8583-server").start(); 	//The code change suggested
					//new Thread(new Server_1(),"j8583-server").start();
					System.out.println("Started New Thread");
				} 
				catch (IOException e) 
				{
					e.printStackTrace();
				}
			}
		}
	}

	/* This method is to trim the message of it first 4 bytes since
	   it contains only the size of message, which in turn passes
	   the message to Processor(buf, sock) and also start a new
	   thread                                                       */
	public void run() 
	{
		int count = 0;
		byte[] lenbuf = new byte[4];
		try 
		{
			socket = sock;
			// For high volume apps you will be better off only reading the
			// stream in this thread
			// and then using another thread to parse the buffers and process
			// the requests
			// Otherwise the network buffer might fill up and you can miss a
			// request.
			while (socket != null && socket.isConnected() && Thread.currentThread().isAlive() && !Thread.currentThread().isInterrupted()) 
			{
				if (socket.getInputStream().read(lenbuf) == 4) 
				{
					int size = ((lenbuf[0] & 0xff) << 8) | (lenbuf[1] & 0xff);
					System.out.println("Size: "+size);
					byte[] buf = new byte[size];
					// We're not expecting ETX in this case
					socket.getInputStream().read(buf);
					count++;
					threadPool.schedule(new Processor(buf, socket), 0, TimeUnit.MILLISECONDS);
					/*Processor p = new Processor(buf, sock);
					new Thread(p).start();*/
					System.out.println("~~~4~~~");
				}
			}
		} 
		catch (IOException ex) 
		{
			ex.printStackTrace();
		}
		System.out.println(String.format("Exiting after reading %d requests", count));
		try 
		{
			socket.close();
			System.out.println("Socket Closed....");
		} 
		catch (IOException ex) 
		{
			ex.printStackTrace();
		}
	}
	
		
	/* This Runnable Processor contains all business logic for ATM
	   Transactions                                                */
	private class Processor implements Runnable 
	{
		/* This contains the message received from the ATM Host Switch  */
		private byte[] msg;
		/* This contains the socket Port No. */
		private Socket sock;

		Processor(byte[] buf, Socket s) 
		{
			msg = buf;
			sock = s;
		} 
		
		/* This portion of code will run each time a new request come
		   from the ATM Host Switch, such as Balance Inquiry, Mini
		   Statement Inquiry, Cash Withdrawal, Funds Transfer, Cheque
		   Book Request, etc.                                         */
		public void run() 
		{
			try 
			{
				System.out.println("Msg Received");
				//Process the message here and reply
			}
			catch (MyException e) 
			{
				e.printStackTrace();
			}
				
	}
	
	
	public static void main(String[] args) throws Exception 
	{
		Server_1 s = new Server_1();
		s.start_server();
	}
	
	
}

Open in new window


Hope This Helps.....
0
Question has a verified solution.

Are you are experiencing a similar issue? Get a personalized answer when you ask a related question.

Have a better answer? Share it in a comment.

All Courses

From novice to tech pro — start learning today.