Solved

Concurrent proocessing of requests for example ATM Host interface

Posted on 2013-05-20
4
517 Views
Last Modified: 2013-06-05
I have written code for concurrent consumption and processing of ATM requests coming from a switch....I'm new to threads in JAVA.....But after sometime it consumes the messages but does not process it....

public class Server_1 implements Runnable 
{
private static ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(500);

public void start_server() throws UnknownHostException, IOException
	{
		try
		{
                         server = new ServerSocket(9999, 30, InetAddress.getByName("172.16.10.215"));
			new Listener().start();
			System.out.println("Socket Listener Started...");
		}
		catch (Exception e) 
		{
			e.printStackTrace();
		}

public class Listener extends Thread
	{
		public Listener() 
		{
			//NOTHING
		}
		public void run () 
		{
			while(true)
			{
				try 
				{
					sock = server.accept();
					System.out.println("Connection received from " + sock.getInetAddress().getHostName());
					new Thread(new Server_1(),"j8583-server").start();
					System.out.println("Started New Thread");
				} 
				catch (IOException e) 
				{
					e.printStackTrace();
				}
			}
		}
	}

public void run() 
	{
		int count = 0;
		byte[] lenbuf = new byte[4];
		try 
		{
			socket = sock;
			// For high volume apps you will be better off only reading the
			// stream in this thread
			// and then using another thread to parse the buffers and process
			// the requests
			// Otherwise the network buffer might fill up and you can miss a
			// request.
			while (socket != null && socket.isConnected() && Thread.currentThread().isAlive() && !Thread.currentThread().isInterrupted()) 
			{
				if (socket.getInputStream().read(lenbuf) == 4) 
				{
					int size = ((lenbuf[0] & 0xff) << 8) | (lenbuf[1] & 0xff);
					System.out.println("Size: "+size);
					byte[] buf = new byte[size];
					// We're not expecting ETX in this case
					socket.getInputStream().read(buf);
					count++;
					/*log.debug(socket);
					log.debug(buf);*/
					threadPool.schedule(new Processor(buf, socket), 0, TimeUnit.MILLISECONDS);
					//log.debug(threadPool.schedule(new Processor(buf, socket), 0, TimeUnit.MILLISECONDS));
					/*Processor p = new Processor(buf, sock);
					new Thread(p).start();*/
					System.out.println("~~~4~~~");
				}
			}
		} 
		catch (IOException ex) 
		{
			ex.printStackTrace();
		}
		System.out.println(String.format("Exiting after reading %d requests", count));
		try 
		{
			socket.close();
			//System.out.println("~~~5~~~");
			System.out.println("Socket Closed....");
		} 
		catch (IOException ex) 
		{
			ex.printStackTrace();
		}
	}

private class Processor implements Runnable 
	{
		private byte[] msg;
		private Socket sock;

		Processor(byte[] buf, Socket s) 
		{
			msg = buf;
			sock = s;
		} 
		
		public void run() 
		{
//Processing of 8583 messages happen here and response is sent.

}

}

Open in new window

0
Comment
Question by:CCBRONET
[X]
Welcome to Experts Exchange

Add your voice to the tech community where 5M+ people just like you are talking about what matters.

  • Help others & share knowledge
  • Earn cash & points
  • Learn & ask questions
4 Comments
 
LVL 26

Expert Comment

by:ksivananth
ID: 39180635
1. move the run method implementation from Server_1 to Processor class
2. update line 32 as below

new Thread(new Processor( new byte[1024], socket ),"j8583-server").start();
0
 
LVL 86

Expert Comment

by:CEHJ
ID: 39180642
Firstly, that doesn't even look compilable. Please post code that compiles. There should only be one public class per compilation unit (source code file)
0
 

Author Comment

by:CCBRONET
ID: 39183268
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.hibernate.HibernateException;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.Transaction;
import org.hibernate.cfg.Configuration;
import org.jpos.iso.ISOException;
import org.jpos.iso.ISOHeader;
import org.jpos.iso.ISOMsg;
import org.jpos.iso.packager.GenericPackager;
import org.springframework.util.StringUtils;


/* Server_1.java contains all the business logic to perform
   various transactions on the ATM Interface Host           */
public class Server_1 implements Runnable 
{
	private static ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(500);	
	private static final Log log = LogFactory.getLog(Server_1.class);
	private Socket socket;	
	public static ServerSocket server;	
	public static Socket sock;
		
	/* This method is used to start the socket listener on a
	   designated IP address and Port No. */
	public void start_server() throws UnknownHostException, IOException
	{
		try
		{
			System.out.println("Setting up server socket listener...");
			server = new ServerSocket(9999, 10, InetAddress.getByName("172.16.10.34"));
			new Listener().start();
			System.out.println("Socket Listener Started...");
		}
		catch (HibernateException e) 
		{
			e.printStackTrace();
		}
	} 
	
	/* This class Listener creates a socket listener called
	   "j8583-server" on port no.: 9999                     */
	public class Listener extends Thread
	{
		public Listener() 
		{
			//NOTHING
		}
		public void run () 
		{
			while(true)
			{
				try 
				{
					sock = server.accept();
					System.out.println("Connection received from " + sock.getInetAddress().getHostName());
					new Thread(new Processor(new byte[1024], sock),"j8583-server").start(); 	//The code change suggested
					//new Thread(new Server_1(),"j8583-server").start();
					System.out.println("Started New Thread");
				} 
				catch (IOException e) 
				{
					e.printStackTrace();
				}
			}
		}
	}

	/* This method is to trim the message of it first 4 bytes since
	   it contains only the size of message, which in turn passes
	   the message to Processor(buf, sock) and also start a new
	   thread                                                       */
	public void run() 
	{
		int count = 0;
		byte[] lenbuf = new byte[4];
		try 
		{
			socket = sock;
			// For high volume apps you will be better off only reading the
			// stream in this thread
			// and then using another thread to parse the buffers and process
			// the requests
			// Otherwise the network buffer might fill up and you can miss a
			// request.
			while (socket != null && socket.isConnected() && Thread.currentThread().isAlive() && !Thread.currentThread().isInterrupted()) 
			{
				if (socket.getInputStream().read(lenbuf) == 4) 
				{
					int size = ((lenbuf[0] & 0xff) << 8) | (lenbuf[1] & 0xff);
					System.out.println("Size: "+size);
					byte[] buf = new byte[size];
					// We're not expecting ETX in this case
					socket.getInputStream().read(buf);
					count++;
					threadPool.schedule(new Processor(buf, socket), 0, TimeUnit.MILLISECONDS);
					/*Processor p = new Processor(buf, sock);
					new Thread(p).start();*/
					System.out.println("~~~4~~~");
				}
			}
		} 
		catch (IOException ex) 
		{
			ex.printStackTrace();
		}
		System.out.println(String.format("Exiting after reading %d requests", count));
		try 
		{
			socket.close();
			System.out.println("Socket Closed....");
		} 
		catch (IOException ex) 
		{
			ex.printStackTrace();
		}
	}
	
		
	/* This Runnable Processor contains all business logic for ATM
	   Transactions                                                */
	private class Processor implements Runnable 
	{
		/* This contains the message received from the ATM Host Switch  */
		private byte[] msg;
		/* This contains the socket Port No. */
		private Socket sock;

		Processor(byte[] buf, Socket s) 
		{
			msg = buf;
			sock = s;
		} 
		
		/* This portion of code will run each time a new request come
		   from the ATM Host Switch, such as Balance Inquiry, Mini
		   Statement Inquiry, Cash Withdrawal, Funds Transfer, Cheque
		   Book Request, etc.                                         */
		public void run() 
		{
			try 
			{
				System.out.println("Msg Received");
				//Process the message here and reply
			}
			catch (MyException e) 
			{
				e.printStackTrace();
			}
				
	}
	
	
	public static void main(String[] args) throws Exception 
	{
		Server_1 s = new Server_1();
		s.start_server();
	}
	
	
}

Open in new window


Hope This Helps.....
0
 
LVL 36

Accepted Solution

by:
mccarl earned 500 total points
ID: 39186508
What is the purpose of passing the socket to the Processor class? It's hard to assume as you haven't included what is the Processor class really doing. You do mention that a response is being sent, is that response sent on the "sock" that is passed in? That would be one (of many) problems with this code, as you could have numerous different threads all trying to send their responses to the same socket.

If the underlying protocol is basically like this, receive a request, process it, then send a response to that request, receive the next request, process that request and then send a response, etc, etc, then there is really no need to start the processor in another thread. Theoretically, you should get any more requests until the response has been sent, so you are only ever processing the one request at a time (per connection that is).

Other issues I can see... the .read() method of the inputstreams for the sockets are guaranteed to return the full amount of data requested, ie. you read from the inputstream into "lenbuf" which is of size 4, but you may get the first 3 bytes returned on one call to .read() and need to call .read() again to get the 4th. (And the same problem .read()ing into the "buf" array, you might not get all "size" bytes in that first read call. This will affect the message that is being received when this first occurs, and 99% likely every other message ever sent on a particular connection after that, as you will be out of sync with the length data that is being sent.

Also, the "socket" variable that you are calling these read() methods (amongst others) on, will be overwritten when a new connection is made. If you separate out your classes into their own top level class (ie. a separate .java file) rather than being inner classes of Server_1 will help you to eliminate issues with synchronization among the various threads that you have.
0

Featured Post

Online Training Solution

Drastically shorten your training time with WalkMe's advanced online training solution that Guides your trainees to action. Forget about retraining and skyrocket knowledge retention rates.

Question has a verified solution.

If you are experiencing a similar issue, please ask a related question

By the end of 1980s, object oriented programming using languages like C++, Simula69 and ObjectPascal gained momentum. It looked like programmers finally found the perfect language. C++ successfully combined the object oriented principles of Simula w…
Java functions are among the best things for programmers to work with as Java sites can be very easy to read and prepare. Java especially simplifies many processes in the coding industry as it helps integrate many forms of technology and different d…
Viewers will learn about if statements in Java and their use The if statement: The condition required to create an if statement: Variations of if statements: An example using if statements:
Viewers will learn about basic arrays, how to declare them, and how to use them. Introduction and definition: Declare an array and cover the syntax of declaring them: Initialize every index in the created array: Example/Features of a basic arr…

728 members asked questions and received personalized solutions in the past 7 days.

Join the community of 500,000 technology professionals and ask your questions.

Join & Ask a Question