Learn how to a build a cloud-first strategyRegister Now

x
  • Status: Solved
  • Priority: Medium
  • Security: Public
  • Views: 967
  • Last Modified:

Reading from ObjectInputStream

I am using NIO in my program. On Receiver side i am getting multiple messages in same buffer and want to read all using ByteArrayInput Stream and ObjectInputStream . It is working for fixed size.

Code Snippet:
SerializedData is my data class.
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
offset =0;
while( offset < limit)
{
bStream = new ByteArrayInputStream(byteArray,<offset>,limit);
ObjectInputStream iStream = new ObjectInputStream( bStream );
serializeData = (SerializedData) iStream.readObject();
System.out.println("Msg: " + serializeData.getData());

offset+= <received_msg_size>
}

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

This logic is working for fixed size message as in that case i know the <received_msg_size>.

here my logic is to use get the number of bytes read from byte[] equals to offset for next msg.
 
My Query is that how should i know the number of bytes reads from bStream. iStream.available() methos always returns zero.

What are the alternatives? My understading is that as i am able to retrieve the first message then i should be able to calculate the  length  read ( in bytes ) also.
0
srinitin
Asked:
srinitin
  • 4
  • 4
1 Solution
 
mkatmonkeyCommented:
You could wrap an ByteArray that's the same size as your message.

byte [] myBytes = byte[MessageSize];
ByteBuffer myByteBuffer = ByteBuffer.wrap(myBytes);
ByteArrayInputStream myBAIS = ByteArrayInputStream(myBytes);
//Code to read message...
//Code to read objects ...
myByteBuffer.flip();
myBAIS.reset();

This is a simplified alternative to the one I recommended before in your open question.
0
 
srinitinAuthor Commented:
Hi,

Thanks for the suggested approach. A mix of my approach and ur soles the problem.
 
Current working code snippet for receiving multiple messages are:

=================================================

  int count =0;

  while( count < limit)
  {
  ByteArrayInputStream bStream1 = new ByteArrayInputStream(byteArray,count,limit);
  ObjectInputStream iStream = new ObjectInputStream( bStream1 );g
  serializeData = (SerializedData) iStream.readObject();
  count+=limit-bStream1.available();
  bStream1.reset();
 }

====================================================

Still have few problems:

1. On receiver side:
                     numRead =n socketChannel.read(bytebuffer);
sometimes i am getting numRead == -1 , what exactly it indicates and what should be my logic on getting -1. right now i am closing the socket and recreating a new one.

2. On Sender side:
on write(), sometimes i am getting following exception:
                          java.io.IOException: Broken pipe

3. as i am sending messages in infinite loop .. my code snippet work fine for 3-4 thousands msg of variable length after that i am getting StremCorrupted exception. what should be a reason of this? and how to handle this.?

4. i am not using selector ib my program only soketChannel in blocking mode using ByteBuffer ( as it suites my requirement). Is use of selector is necessary?

5. Right now i have not introduce the logic of receiving incomplete message. How to handle that situation?

 
0
 
mkatmonkeyCommented:
1) when read returns -1 your sender sent an End of Stream (EOS) message and probably disconnected shortly after. The only thing to do in this case, is to disconnect and optionally attempt to reconnect.

2) When writing to a socket throws that IOException, you are disconnected.

3) You are probably getting this because the bytes that you are de-serializing your objects from somehow got mangled. I suspect that this may be due to framing issues. You may be trying to read an incomplete message. Professionally, it's best to implement some way to ensure that you have the message properly framed before you try to deserialize it into an object. IE, if you are dealing with variable message sizes, then wrap your message with a header that contains the length (in bytes) and a trailer that contains a checksum. If a message fails the checksum, you will want to have some way to request the sender retransmit.

4) Using a selector is generally a good idea. However, you can get by without it in certain cases. If your using the blocking mode, you may not need it. IE if your receiver is only receiving messages and your sender is only sending. Any case where you want only one end point to use the wire at a time would work. This is called "synchronous" messaging.

5) To handle incomplete messages, you can block (wait for the message to complete), balk (throw an exception), or wait for the message to complete before balking.

0
Technology Partners: We Want Your Opinion!

We value your feedback.

Take our survey and automatically be enter to win anyone of the following:
Yeti Cooler, Amazon eGift Card, and Movie eGift Card!

 
srinitinAuthor Commented:
Thanks mkatmonkey for the detailed response.

1) Already doing the same, as suggested.

2) True.

3) Thanks for the valuable input. This may be the reason that i am getting the partial message.

4) In my program  my both end can read and write on the same socket channel.  I have used synchronization to handle the same. Actually i have a application built on old io and the design is as per blocking mode. That is the reason i have used the mode as blocking. i want to port it on NIO to speedup the communication.
Does use of selector is must in such application? My view is no need to use selector until asynchronous.

5) Thanks for the valuable input.

Apart from this one strange thing i have noticed that my NIO prototype has decresed  the performnace of the system rather than inceasing. I mean say using the old sockets my application takes 34 sec to send 10000 messages. Using my prototype code it is taking around 50 secs .

What could be the reason of this? Wrong implementation or something else.
I kept Buffer size(25000) used to write data  < buffer size used to read the data(100000)
and also incresed socketChannel.socket().setReceiveBufferSize(100000).

Is it right or it impacts the performance negatively?
0
 
srinitinAuthor Commented:
one update here:

In my Sample program posted here:
One Sender , one receiver : Slight performance improvement of NIO over old IO

On my Application using prototype code:
Both end acting as a sender and receiver:
Negative performance impact of using NIO over old IO
0
 
mkatmonkeyCommented:
It's interesting that performance degrades with NIO.

I doubt buffers are slowing things down. Buffers are meant to help keep performance consistent during bursts of activity. If they are the cause, then increasing the size should help, but I doubt it because that means your messaging rate is higher then your network can bear.

Using an NIO selector may speed things up. Selectors allow for low level interrupts (read: native hardware interupt) to trigger your code to read the incoming message. Your code is looping on a read call which has your code executing pooling loops it need not execute.

Given the scheduling mechanism of your computer/java runtime, this useless polling done by your program could theoretically have as much priority as your sending program assuming that they're running on the same computer.
0
 
srinitinAuthor Commented:
Now my program is able to handle message but as i shared earlier that overall performance is not as compare to simple sockets :)

One of the reason for the same is that each time for sending message i am creating a new instance if ByteArrayOutputStream and ObjectOutputStream so it may degrade performance. Similar at receiver side for each message using new instance of ByteArrayinputStream and ObjectInputStream.

// Sending message in loop
private static ByteBuffer objOutput;
objOutput = ByteBuffer.allocate(BUFFER_SIZE);
for(;;)
{
          ByteArrayOutputStream   bStream = new ByteArrayOutputStream();
         ObjectOutputStream oStream = new ObjectOutputStream( bStream );
         
         serializedata = new SerializedData(str1);

        oStream.writeObject (serializedata);
        byteArray = bStream.toByteArray();

        objOutput.clear();
        objOutput.put(byteArray);
        objOutput.flip();

        sc.write(objOutput);
        System.out.println("Sender: Msg[" + count + "]: " + serializedata.getData());
}

For speedup when i try to use same objects of both the streams i am getting StreamCorruptedException at the receiver side after receiving first message.
Not able to understand the exact reason.

Sample code snippet for using the same stream object ( oStrem.flush() and resetting both streams after each use):

/////////////////////////////////////////////////////////////////////////////////////

    ByteArrayOutputStream   bStream = new ByteArrayOutputStream();
    ObjectOutputStream oStream = new ObjectOutputStream( bStream );

for(;;)
{
         serializedata = new SerializedData(str1);

        oStream.writeObject (serializedata);
        oStream.flush();
        byteArray = bStream.toByteArray();

        objOutput.clear();
        objOutput.put(byteArray);
        objOutput.flip();

        sc.write(objOutput);
        System.out.println("Sender: Msg[" + count + "]: " + serializedata.getData());

       oStream.reset();
       bStream.reset();
}

Any input why i am getting StreamCorruptedException each time on receiver side. For new streams all the things are working fine.
0
 
mkatmonkeyCommented:
Sounds like you may have multiple connections to your receiver. If all these connections share the same objects, those objects will easily get corrupted.
0
 
Computer101Commented:
Forced accept.

Computer101
EE Admin
0

Featured Post

Industry Leaders: We Want Your Opinion!

We value your feedback.

Take our survey and automatically be enter to win anyone of the following:
Yeti Cooler, Amazon eGift Card, and Movie eGift Card!

  • 4
  • 4
Tackle projects and never again get stuck behind a technical roadblock.
Join Now