Solved

Java Real Time Data process program help

Posted on 2011-02-20
15
1,248 Views
Last Modified: 2012-05-11
Hi All,

I've have no much experiences with java. But I have to develop this real time data processing program with java.

I'm receiving large number of UDP data on a port (say 23456) of my server. Peak rate would be 1000 lines per second. Each line should be passed against some rules with a minimum delay.  Rules are bit complex. They need to be implemented with some java code.  Cannot give some general format. So What I’m currently doing is get the data line and pass it to a thread (MainThread). Then the MainThead passes them to rule thread (Rule_001, Rule_002 ….).  Rule threads do the final job. I have attached the main parts of my developed code just to understand only. (Not the real code).  

I’ve following questions.

1. Is there any better way to do this? Data should be process as soon as possible after it received.   Currently I’m running only two rules. But after sometime receiving the error “Exception in thread "Thread-1519897" java.lang.OutOfMemoryError: unable to create new native thread”.

2.What method I can use to dynamically add and remove rule? Currently I’m thinking write the rule thread class separately and update a configuration file or a database table. The main program reads the configuration at each hour and dynamically load these rule classes and run. Is this possible?

Thank you.

public class Test
{
	public static void main(String[] args)
	{
		DatagramSocket serverSocket = new DatagramSocket(23456);
		byte[] receiveData = new byte[1024];
		while(true)
		{
			receiveData = new byte[1024];
			DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
			serverSocket.receive(receivePacket);
			String input = new String(receivePacket.getData());
			Thread MT = new MainThread(input);
			MT.start();
		}
	}
}
class MainThread extends Thread
{
	String input;
	MainThread(String s)
	{
		input=s;
	}
	public void run()
	{
		Thread R001 = new Rule_001(input);
		R001.start();
		Thread R002 = new Rule_002(input);
		R002.start();
		//........etc
	}
}
class Rule_001 extends Thread
{
	String input;
	Rule_001(String s)
	{
		input=s;
	}
	public void run()
	{
		//Process the rule here 
		// if(input.equals("bla bla bla"))
		// {}
	}
}
class Rule_002 extends Thread
{
	String input;
	Rule_002(String s)
	{
		input=s;
	}
	public void run()
	{
		//Process the rule here 
		// if(input.equals("Test Test Test"))
		// {}
	}
}
//....etc

Open in new window

0
Comment
Question by:shanikawm
  • 6
  • 5
  • 2
  • +1
15 Comments
 
LVL 47

Expert Comment

by:for_yan
Comment Utility
If you haven't already done the first thing is to increase memory with the command line argument on VM
0
 
LVL 47

Expert Comment

by:for_yan
Comment Utility
And there is no reason why you cannot read rules from database - maybe it will be
better to have some coding language for them, rather than to read classes themselves; I don't know for sure,
but it seems more natural to me in this way, although, I blieve,  you  can store classes and read them also.
Maybe better to store them as binary serialized objects and store them as BLOBS
0
 
LVL 8

Author Comment

by:shanikawm
Comment Utility
What is the default memory size of VM or how to find it?

And about loading rules. I meant just to store the rule class name in the config table (May be with time period when the rules should be run). Actual byte code will be on a file.

Again I'm thinking run all the rule within the MainThread sequentially instead of lot of threads. In this case how can I dynamically remove and add rules. As explained my rules cannot be expressed as a general format. I must use some coding.

thanks.

0
 
LVL 47

Assisted Solution

by:for_yan
for_yan earned 200 total points
Comment Utility

What comes to my mind you can serialize rules in the form of objects and they could contain methods inside, so
that they would basically implement the same interface but the actual contents of the methods
could be different - you can then retrieve these objects and cast them
to this interface and use methods which would have the same name, but different contents - don't
think that such fancy system would be better than something more simple, but in general I believe it should be doable.

You can of course also store class name, but you mean these names would not be provided for in the code before ?
So that you'll load the class jsut by the name which you load from database - I'm pretty sure
it is doable, with something like Class.forName(...)
0
 
LVL 47

Expert Comment

by:for_yan
Comment Utility

This is description of java command for Unix

http://download.oracle.com/javase/1.5.0/docs/tooldocs/solaris/java.html

-Xmx switch near the bottom specifies memory - it is 64 MB by default for Java 1.5 on Unix
Of course it may depend on version and also there may be different virtual machines,
although most people use the one from Sun/Oracle

This is general page - you can go to windows from this page
http://download.oracle.com/javase/1.5.0/docs/tooldocs/#basic
0
 
LVL 8

Author Comment

by:shanikawm
Comment Utility
I'll give a try with object serialization. Going to study about it.
0
 
LVL 47

Expert Comment

by:for_yan
Comment Utility
Yes, I did use object serialization - at that time I used LONG RAW in Oracle to store them.
Now it is probably better to use BLOBs. You can write serialized objects to file, or store them in the database.
0
How your wiki can always stay up-to-date

Quip doubles as a “living” wiki and a project management tool that evolves with your organization. As you finish projects in Quip, the work remains, easily accessible to all team members, new and old.
- Increase transparency
- Onboard new hires faster
- Access from mobile/offline

 
LVL 47

Expert Comment

by:for_yan
Comment Utility
Maybe I'm wrong - I did use serialized objects, but my main purpose were still data which
were in fact contained inside, not methods - so when I retrieve them from Oracle and cast them to
the type they represent I actually had this type already loaded in my program, so I'm not sure iot may work
this way with an object just out of the blue. But I think with Class.forName(...) you can actually use
and load dynamically any new class. Check for API and examples for Class.ForName() in the API

Still, I like things simpler, I'd prefer to find the way to parametrize variability of your mhandling ethods, and then store
rules in database, rather than classes
0
 
LVL 14

Expert Comment

by:ddsh79
Comment Utility
what will be the final result as it seems that you are passing the string to all the threads. where the rules are writing the output.

      public void run()
      {
            Thread R001 = new Rule_001(input);
            R001.start();
            Thread R002 = new Rule_002(input);
            R002.start();
            //........etc
      }
Also spawning so many threads is not a good idea. you should think of creating something more generic.

creating MainThread does not looks to be doing anything, so i guess you can eliminate it.

If the datainput rate is high and processing is low then again the system will crash. so you'll have to make some pause mechanism to put things on halt until it sees that the queued threads are under control.

do some raw tests to get the number of threads that can be spawned at a given time and then use queue to push data into the processing in controlled manner.
0
 
LVL 8

Author Comment

by:shanikawm
Comment Utility
@ddsh79,

Yes. I decided to process all the rule sequentially in side the main thread instead of starting seperate threads for each rule. I do some raw test for that thread also.

Now my only warry is add and remove rules. I'm going to try Class.forName().
0
 
LVL 8

Author Comment

by:shanikawm
Comment Utility
I did thread testing. If a rule process time is more than 2 seconds everything goes wrong.
Thread out of memory error even with 1GB VM memory and above explained reduced threads.
:-(
0
 
LVL 14

Expert Comment

by:ddsh79
Comment Utility
as i stated above

<<If the datainput rate is high and processing is low then again the system will crash. so you'll have to make some pause mechanism to put things on halt until it sees that the queued threads are under control.
0
 
LVL 10

Accepted Solution

by:
gordon_vt02 earned 300 total points
Comment Utility
You should look into the java.util.concurrent libraries and use an ExecutorService to manage your threads instead of creating a new Thread for every request.  You can use the static methods of the Executors object to create thread pools with various options.  I think Executors.newCachedThreadPool() would probably meet your needs, although you may want to go with Executors.newFixedThreadPool() if you find that you are still running out of memory by creating too many threads at peak times.

You can then write a helper class that implements either Runnable or Callable<>, depending on whether or not you need a return value, and ask the ExecutorService to run that task for you.  The system will handle caching the threads and managing your processes for you.

Probably also not a bad idea to move the socket code into an instance method of your Test class instead of putting it into the main method.  This way you can add methods to signal the server to stop so you can shutdown the ExecutorService gracefully.  The "executors.submit(...)" calls all return a Future<> object that you can use to monitor the state of the thread if you need to.

java.util.conurrent API: http://download.oracle.com/javase/6/docs/api/java/util/concurrent/package-summary.html
public class Test {
    public static void main(String[] args)
    {
        ExecutorService executors = Executors.newCachedThreadPool();
        DatagramSocket serverSocket = new DatagramSocket(23456);
        byte[] receiveData = new byte[1024];
        while(true)
        {
            receiveData = new byte[1024];
            DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
            serverSocket.receive(receivePacket);
            String input = new String(receivePacket.getData());
            executors.submit(new RuleProcessor(input));
        }
        executors.shutdown();
    }

    private static class RuleProcessor implements Runnable {
        private final String input;

        public RuleProcessor(String input) {
            this.input = input;
        }
        
        @Override
        public void run() {
            // process input with rules
        }
    }
}

Open in new window

0
 
LVL 8

Author Comment

by:shanikawm
Comment Utility
@gordon_vt02,

Ineteresting suggetions. Actually I want a gracefull shoutdown method also since there are some database connection in this program.

One clarofication need, Why the class RuleProcessor static? What happened if 2 inputs are proccessed at the same time by different threads?
0
 
LVL 10

Expert Comment

by:gordon_vt02
Comment Utility
Marking an inner class static will make your application perform better because it does not need to include or instantiate a reference to its wrapping object.  Since the RuleProcessor does not need to access any of the members of its enclosing Test object, there is no need to make it a non-static class.  Each instantiation of the RuleProcessor will be its own object and there will be no thread contention between them.

I'd recommend using an AtomicBoolean as a member of the Test class to control the stop state.  You can have a method that checks and sets it to true when you want the server to shutdown.  Check that variable in your while loop instead of saying "while (true)," and then handle the socket and JDBC shutdown once the loop exits.

The executors.shutdown() method will attempt to allow all queued processes to finish before it stops.  There are other methods that will allow you to wait for that to complete before stopping your DB connections or you can use the shutdownNow() method that will try to stop all running processes and will not execute any that are waiting in the queue.
0

Featured Post

How to run any project with ease

Manage projects of all sizes how you want. Great for personal to-do lists, project milestones, team priorities and launch plans.
- Combine task lists, docs, spreadsheets, and chat in one
- View and edit from mobile/offline
- Cut down on emails

Join & Write a Comment

An old method to applying the Singleton pattern in your Java code is to check if a static instance, defined in the same class that needs to be instantiated once and only once, is null and then create a new instance; otherwise, the pre-existing insta…
Java functions are among the best things for programmers to work with as Java sites can be very easy to read and prepare. Java especially simplifies many processes in the coding industry as it helps integrate many forms of technology and different d…
Viewers learn about the third conditional statement “else if” and use it in an example program. Then additional information about conditional statements is provided, covering the topic thoroughly. Viewers learn about the third conditional statement …
This tutorial explains how to use the VisualVM tool for the Java platform application. This video goes into detail on the Threads, Sampler, and Profiler tabs.

772 members asked questions and received personalized solutions in the past 7 days.

Join the community of 500,000 technology professionals and ask your questions.

Join & Ask a Question

Need Help in Real-Time?

Connect with top rated Experts

10 Experts available now in Live!

Get 1:1 Help Now