Link to home
Create AccountLog in
Avatar of CCBRONET
CCBRONET

asked on

Concurrent proocessing of requests for example ATM Host interface

I have written code for concurrent consumption and processing of ATM requests coming from a switch....I'm new to threads in JAVA.....But after sometime it consumes the messages but does not process it....

public class Server_1 implements Runnable 
{
private static ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(500);

public void start_server() throws UnknownHostException, IOException
	{
		try
		{
                         server = new ServerSocket(9999, 30, InetAddress.getByName("172.16.10.215"));
			new Listener().start();
			System.out.println("Socket Listener Started...");
		}
		catch (Exception e) 
		{
			e.printStackTrace();
		}

public class Listener extends Thread
	{
		public Listener() 
		{
			//NOTHING
		}
		public void run () 
		{
			while(true)
			{
				try 
				{
					sock = server.accept();
					System.out.println("Connection received from " + sock.getInetAddress().getHostName());
					new Thread(new Server_1(),"j8583-server").start();
					System.out.println("Started New Thread");
				} 
				catch (IOException e) 
				{
					e.printStackTrace();
				}
			}
		}
	}

public void run() 
	{
		int count = 0;
		byte[] lenbuf = new byte[4];
		try 
		{
			socket = sock;
			// For high volume apps you will be better off only reading the
			// stream in this thread
			// and then using another thread to parse the buffers and process
			// the requests
			// Otherwise the network buffer might fill up and you can miss a
			// request.
			while (socket != null && socket.isConnected() && Thread.currentThread().isAlive() && !Thread.currentThread().isInterrupted()) 
			{
				if (socket.getInputStream().read(lenbuf) == 4) 
				{
					int size = ((lenbuf[0] & 0xff) << 8) | (lenbuf[1] & 0xff);
					System.out.println("Size: "+size);
					byte[] buf = new byte[size];
					// We're not expecting ETX in this case
					socket.getInputStream().read(buf);
					count++;
					/*log.debug(socket);
					log.debug(buf);*/
					threadPool.schedule(new Processor(buf, socket), 0, TimeUnit.MILLISECONDS);
					//log.debug(threadPool.schedule(new Processor(buf, socket), 0, TimeUnit.MILLISECONDS));
					/*Processor p = new Processor(buf, sock);
					new Thread(p).start();*/
					System.out.println("~~~4~~~");
				}
			}
		} 
		catch (IOException ex) 
		{
			ex.printStackTrace();
		}
		System.out.println(String.format("Exiting after reading %d requests", count));
		try 
		{
			socket.close();
			//System.out.println("~~~5~~~");
			System.out.println("Socket Closed....");
		} 
		catch (IOException ex) 
		{
			ex.printStackTrace();
		}
	}

private class Processor implements Runnable 
	{
		private byte[] msg;
		private Socket sock;

		Processor(byte[] buf, Socket s) 
		{
			msg = buf;
			sock = s;
		} 
		
		public void run() 
		{
//Processing of 8583 messages happen here and response is sent.

}

}

Open in new window

Avatar of ksivananth
ksivananth
Flag of United States of America image

1. move the run method implementation from Server_1 to Processor class
2. update line 32 as below

new Thread(new Processor( new byte[1024], socket ),"j8583-server").start();
Avatar of CEHJ
Firstly, that doesn't even look compilable. Please post code that compiles. There should only be one public class per compilation unit (source code file)
Avatar of CCBRONET
CCBRONET

ASKER

import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.hibernate.HibernateException;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.Transaction;
import org.hibernate.cfg.Configuration;
import org.jpos.iso.ISOException;
import org.jpos.iso.ISOHeader;
import org.jpos.iso.ISOMsg;
import org.jpos.iso.packager.GenericPackager;
import org.springframework.util.StringUtils;


/* Server_1.java contains all the business logic to perform
   various transactions on the ATM Interface Host           */
public class Server_1 implements Runnable 
{
	private static ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(500);	
	private static final Log log = LogFactory.getLog(Server_1.class);
	private Socket socket;	
	public static ServerSocket server;	
	public static Socket sock;
		
	/* This method is used to start the socket listener on a
	   designated IP address and Port No. */
	public void start_server() throws UnknownHostException, IOException
	{
		try
		{
			System.out.println("Setting up server socket listener...");
			server = new ServerSocket(9999, 10, InetAddress.getByName("172.16.10.34"));
			new Listener().start();
			System.out.println("Socket Listener Started...");
		}
		catch (HibernateException e) 
		{
			e.printStackTrace();
		}
	} 
	
	/* This class Listener creates a socket listener called
	   "j8583-server" on port no.: 9999                     */
	public class Listener extends Thread
	{
		public Listener() 
		{
			//NOTHING
		}
		public void run () 
		{
			while(true)
			{
				try 
				{
					sock = server.accept();
					System.out.println("Connection received from " + sock.getInetAddress().getHostName());
					new Thread(new Processor(new byte[1024], sock),"j8583-server").start(); 	//The code change suggested
					//new Thread(new Server_1(),"j8583-server").start();
					System.out.println("Started New Thread");
				} 
				catch (IOException e) 
				{
					e.printStackTrace();
				}
			}
		}
	}

	/* This method is to trim the message of it first 4 bytes since
	   it contains only the size of message, which in turn passes
	   the message to Processor(buf, sock) and also start a new
	   thread                                                       */
	public void run() 
	{
		int count = 0;
		byte[] lenbuf = new byte[4];
		try 
		{
			socket = sock;
			// For high volume apps you will be better off only reading the
			// stream in this thread
			// and then using another thread to parse the buffers and process
			// the requests
			// Otherwise the network buffer might fill up and you can miss a
			// request.
			while (socket != null && socket.isConnected() && Thread.currentThread().isAlive() && !Thread.currentThread().isInterrupted()) 
			{
				if (socket.getInputStream().read(lenbuf) == 4) 
				{
					int size = ((lenbuf[0] & 0xff) << 8) | (lenbuf[1] & 0xff);
					System.out.println("Size: "+size);
					byte[] buf = new byte[size];
					// We're not expecting ETX in this case
					socket.getInputStream().read(buf);
					count++;
					threadPool.schedule(new Processor(buf, socket), 0, TimeUnit.MILLISECONDS);
					/*Processor p = new Processor(buf, sock);
					new Thread(p).start();*/
					System.out.println("~~~4~~~");
				}
			}
		} 
		catch (IOException ex) 
		{
			ex.printStackTrace();
		}
		System.out.println(String.format("Exiting after reading %d requests", count));
		try 
		{
			socket.close();
			System.out.println("Socket Closed....");
		} 
		catch (IOException ex) 
		{
			ex.printStackTrace();
		}
	}
	
		
	/* This Runnable Processor contains all business logic for ATM
	   Transactions                                                */
	private class Processor implements Runnable 
	{
		/* This contains the message received from the ATM Host Switch  */
		private byte[] msg;
		/* This contains the socket Port No. */
		private Socket sock;

		Processor(byte[] buf, Socket s) 
		{
			msg = buf;
			sock = s;
		} 
		
		/* This portion of code will run each time a new request come
		   from the ATM Host Switch, such as Balance Inquiry, Mini
		   Statement Inquiry, Cash Withdrawal, Funds Transfer, Cheque
		   Book Request, etc.                                         */
		public void run() 
		{
			try 
			{
				System.out.println("Msg Received");
				//Process the message here and reply
			}
			catch (MyException e) 
			{
				e.printStackTrace();
			}
				
	}
	
	
	public static void main(String[] args) throws Exception 
	{
		Server_1 s = new Server_1();
		s.start_server();
	}
	
	
}

Open in new window


Hope This Helps.....
ASKER CERTIFIED SOLUTION
Avatar of mccarl
mccarl
Flag of Australia image

Link to home
membership
Create an account to see this answer
Signing up is free. No credit card required.
Create Account