Solved

Concurrent proocessing of requests for example ATM Host interface

Posted on 2013-05-20
4
477 Views
Last Modified: 2013-06-05
I have written code for concurrent consumption and processing of ATM requests coming from a switch....I'm new to threads in JAVA.....But after sometime it consumes the messages but does not process it....

public class Server_1 implements Runnable 
{
private static ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(500);

public void start_server() throws UnknownHostException, IOException
	{
		try
		{
                         server = new ServerSocket(9999, 30, InetAddress.getByName("172.16.10.215"));
			new Listener().start();
			System.out.println("Socket Listener Started...");
		}
		catch (Exception e) 
		{
			e.printStackTrace();
		}

public class Listener extends Thread
	{
		public Listener() 
		{
			//NOTHING
		}
		public void run () 
		{
			while(true)
			{
				try 
				{
					sock = server.accept();
					System.out.println("Connection received from " + sock.getInetAddress().getHostName());
					new Thread(new Server_1(),"j8583-server").start();
					System.out.println("Started New Thread");
				} 
				catch (IOException e) 
				{
					e.printStackTrace();
				}
			}
		}
	}

public void run() 
	{
		int count = 0;
		byte[] lenbuf = new byte[4];
		try 
		{
			socket = sock;
			// For high volume apps you will be better off only reading the
			// stream in this thread
			// and then using another thread to parse the buffers and process
			// the requests
			// Otherwise the network buffer might fill up and you can miss a
			// request.
			while (socket != null && socket.isConnected() && Thread.currentThread().isAlive() && !Thread.currentThread().isInterrupted()) 
			{
				if (socket.getInputStream().read(lenbuf) == 4) 
				{
					int size = ((lenbuf[0] & 0xff) << 8) | (lenbuf[1] & 0xff);
					System.out.println("Size: "+size);
					byte[] buf = new byte[size];
					// We're not expecting ETX in this case
					socket.getInputStream().read(buf);
					count++;
					/*log.debug(socket);
					log.debug(buf);*/
					threadPool.schedule(new Processor(buf, socket), 0, TimeUnit.MILLISECONDS);
					//log.debug(threadPool.schedule(new Processor(buf, socket), 0, TimeUnit.MILLISECONDS));
					/*Processor p = new Processor(buf, sock);
					new Thread(p).start();*/
					System.out.println("~~~4~~~");
				}
			}
		} 
		catch (IOException ex) 
		{
			ex.printStackTrace();
		}
		System.out.println(String.format("Exiting after reading %d requests", count));
		try 
		{
			socket.close();
			//System.out.println("~~~5~~~");
			System.out.println("Socket Closed....");
		} 
		catch (IOException ex) 
		{
			ex.printStackTrace();
		}
	}

private class Processor implements Runnable 
	{
		private byte[] msg;
		private Socket sock;

		Processor(byte[] buf, Socket s) 
		{
			msg = buf;
			sock = s;
		} 
		
		public void run() 
		{
//Processing of 8583 messages happen here and response is sent.

}

}

Open in new window

0
Comment
Question by:CCBRONET
4 Comments
 
LVL 26

Expert Comment

by:ksivananth
ID: 39180635
1. move the run method implementation from Server_1 to Processor class
2. update line 32 as below

new Thread(new Processor( new byte[1024], socket ),"j8583-server").start();
0
 
LVL 86

Expert Comment

by:CEHJ
ID: 39180642
Firstly, that doesn't even look compilable. Please post code that compiles. There should only be one public class per compilation unit (source code file)
0
 

Author Comment

by:CCBRONET
ID: 39183268
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.hibernate.HibernateException;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.Transaction;
import org.hibernate.cfg.Configuration;
import org.jpos.iso.ISOException;
import org.jpos.iso.ISOHeader;
import org.jpos.iso.ISOMsg;
import org.jpos.iso.packager.GenericPackager;
import org.springframework.util.StringUtils;


/* Server_1.java contains all the business logic to perform
   various transactions on the ATM Interface Host           */
public class Server_1 implements Runnable 
{
	private static ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(500);	
	private static final Log log = LogFactory.getLog(Server_1.class);
	private Socket socket;	
	public static ServerSocket server;	
	public static Socket sock;
		
	/* This method is used to start the socket listener on a
	   designated IP address and Port No. */
	public void start_server() throws UnknownHostException, IOException
	{
		try
		{
			System.out.println("Setting up server socket listener...");
			server = new ServerSocket(9999, 10, InetAddress.getByName("172.16.10.34"));
			new Listener().start();
			System.out.println("Socket Listener Started...");
		}
		catch (HibernateException e) 
		{
			e.printStackTrace();
		}
	} 
	
	/* This class Listener creates a socket listener called
	   "j8583-server" on port no.: 9999                     */
	public class Listener extends Thread
	{
		public Listener() 
		{
			//NOTHING
		}
		public void run () 
		{
			while(true)
			{
				try 
				{
					sock = server.accept();
					System.out.println("Connection received from " + sock.getInetAddress().getHostName());
					new Thread(new Processor(new byte[1024], sock),"j8583-server").start(); 	//The code change suggested
					//new Thread(new Server_1(),"j8583-server").start();
					System.out.println("Started New Thread");
				} 
				catch (IOException e) 
				{
					e.printStackTrace();
				}
			}
		}
	}

	/* This method is to trim the message of it first 4 bytes since
	   it contains only the size of message, which in turn passes
	   the message to Processor(buf, sock) and also start a new
	   thread                                                       */
	public void run() 
	{
		int count = 0;
		byte[] lenbuf = new byte[4];
		try 
		{
			socket = sock;
			// For high volume apps you will be better off only reading the
			// stream in this thread
			// and then using another thread to parse the buffers and process
			// the requests
			// Otherwise the network buffer might fill up and you can miss a
			// request.
			while (socket != null && socket.isConnected() && Thread.currentThread().isAlive() && !Thread.currentThread().isInterrupted()) 
			{
				if (socket.getInputStream().read(lenbuf) == 4) 
				{
					int size = ((lenbuf[0] & 0xff) << 8) | (lenbuf[1] & 0xff);
					System.out.println("Size: "+size);
					byte[] buf = new byte[size];
					// We're not expecting ETX in this case
					socket.getInputStream().read(buf);
					count++;
					threadPool.schedule(new Processor(buf, socket), 0, TimeUnit.MILLISECONDS);
					/*Processor p = new Processor(buf, sock);
					new Thread(p).start();*/
					System.out.println("~~~4~~~");
				}
			}
		} 
		catch (IOException ex) 
		{
			ex.printStackTrace();
		}
		System.out.println(String.format("Exiting after reading %d requests", count));
		try 
		{
			socket.close();
			System.out.println("Socket Closed....");
		} 
		catch (IOException ex) 
		{
			ex.printStackTrace();
		}
	}
	
		
	/* This Runnable Processor contains all business logic for ATM
	   Transactions                                                */
	private class Processor implements Runnable 
	{
		/* This contains the message received from the ATM Host Switch  */
		private byte[] msg;
		/* This contains the socket Port No. */
		private Socket sock;

		Processor(byte[] buf, Socket s) 
		{
			msg = buf;
			sock = s;
		} 
		
		/* This portion of code will run each time a new request come
		   from the ATM Host Switch, such as Balance Inquiry, Mini
		   Statement Inquiry, Cash Withdrawal, Funds Transfer, Cheque
		   Book Request, etc.                                         */
		public void run() 
		{
			try 
			{
				System.out.println("Msg Received");
				//Process the message here and reply
			}
			catch (MyException e) 
			{
				e.printStackTrace();
			}
				
	}
	
	
	public static void main(String[] args) throws Exception 
	{
		Server_1 s = new Server_1();
		s.start_server();
	}
	
	
}

Open in new window


Hope This Helps.....
0
 
LVL 35

Accepted Solution

by:
mccarl earned 500 total points
ID: 39186508
What is the purpose of passing the socket to the Processor class? It's hard to assume as you haven't included what is the Processor class really doing. You do mention that a response is being sent, is that response sent on the "sock" that is passed in? That would be one (of many) problems with this code, as you could have numerous different threads all trying to send their responses to the same socket.

If the underlying protocol is basically like this, receive a request, process it, then send a response to that request, receive the next request, process that request and then send a response, etc, etc, then there is really no need to start the processor in another thread. Theoretically, you should get any more requests until the response has been sent, so you are only ever processing the one request at a time (per connection that is).

Other issues I can see... the .read() method of the inputstreams for the sockets are guaranteed to return the full amount of data requested, ie. you read from the inputstream into "lenbuf" which is of size 4, but you may get the first 3 bytes returned on one call to .read() and need to call .read() again to get the 4th. (And the same problem .read()ing into the "buf" array, you might not get all "size" bytes in that first read call. This will affect the message that is being received when this first occurs, and 99% likely every other message ever sent on a particular connection after that, as you will be out of sync with the length data that is being sent.

Also, the "socket" variable that you are calling these read() methods (amongst others) on, will be overwritten when a new connection is made. If you separate out your classes into their own top level class (ie. a separate .java file) rather than being inner classes of Server_1 will help you to eliminate issues with synchronization among the various threads that you have.
0

Featured Post

Do You Know the 4 Main Threat Actor Types?

Do you know the main threat actor types? Most attackers fall into one of four categories, each with their own favored tactics, techniques, and procedures.

Join & Write a Comment

Suggested Solutions

After being asked a question last year, I went into one of my moods where I did some research and code just for the fun and learning of it all.  Subsequently, from this journey, I put together this article on "Range Searching Using Visual Basic.NET …
Introduction This article is the second of three articles that explain why and how the Experts Exchange QA Team does test automation for our web site. This article covers the basic installation and configuration of the test automation tools used by…
Viewers learn about the “for” loop and how it works in Java. By comparing it to the while loop learned before, viewers can make the transition easily. You will learn about the formatting of the for loop as we write a program that prints even numbers…
Viewers will learn how to properly install Eclipse with the necessary JDK, and will take a look at an introductory Java program. Download Eclipse installation zip file: Extract files from zip file: Download and install JDK 8: Open Eclipse and …

747 members asked questions and received personalized solutions in the past 7 days.

Join the community of 500,000 technology professionals and ask your questions.

Join & Ask a Question

Need Help in Real-Time?

Connect with top rated Experts

9 Experts available now in Live!

Get 1:1 Help Now