Link to home
Start Free TrialLog in
Avatar of richardsimnett
richardsimnett

asked on

Issues with NIO Selection continued

This is a continuation of a previous question found at https://www.experts-exchange.com/questions/21798808/Issues-with-NIO-Selection.html.

I am having the identical problem as in the previous post. I got a bit ahead of myself. Here is the new code I am working with (I have only changed a few things around to make the code a bit easier to read). The code stops again after the first write over socket channel. It seems to keep checking for a key, and the selector reports having a key registered, but the selector never reports the key as having a valid operation after the first iteration. The write seems to complete successfully, although for some reason either A) the selector isnt properly detecting operation events or B) the response never gets to its destination (or maybe not completely) so a response is never sent back. Hopefully someone can tell me what I am doing wrong here cuz I have exhausted every idea I could come up with. There is no point in including all the code, as A) the interface for data is proprietary, and B) there are tons of classes that all have been tested thoroughly and work, I have included the classes that are in question.

//Program Output
Found A Key To Service
SMTP Read Write
Getting an envelope.
Envelope Attached.
Reading from Channel.
220 localhost.localdomain ESMTP service ready
Error Level:220
Performing Step 0
Writing:HELO localhost.localdomain

Write Completed.

//Initialization code
// Init the outbound connection manager and the delivery thread
                              conn_out = new outboundconnections(cfg);
                              
                              //start the delivery thread
                              deliveryThread dT = new deliveryThread(conn_out,new ConnectionManager(),mailq,mtastats,cfg);
                              dT.start();

//Outbound Connections Code
import java.net.InetSocketAddress;

import java.nio.channels.*;

import java.io.IOException;

import java.util.Iterator;



public class outboundconnections {

      

      int maxconnections = 0;

      Selector connections;

      

      public outboundconnections(config c)

      {

            //set the maxconnections

            maxconnections = Integer.parseInt(c.getValue("max_smtp_out").toString());

            

            //create the selector

            try

            {

                  connections = Selector.open();

            

                  //create the new socketchannels

                  for (int i=0; i < maxconnections; i++)

                  {

                          openSocketChannel(c.getValue("mta_ip").toString(),25);                  

                  }

            }

            catch(IOException ie)

            {

                  //Error on selector open, socketchannel open, or socketchannel blocking

                  System.out.println("ERROR creating channel.");

            }

            

      }

      

      Iterator getReadyConnections()

      {

            //this function returns an iterator of the socket channels ready to be processed

            //it blocks until there is something to return

            int i = 0;

            

            //System.out.println("Keys in selector: " + connections.keys().size());

            try

            {

                  i = connections.select();

            }

            catch (IOException ie)

            {

                  System.out.println("ERROR While attempting select of connections.");

            }

            

            //System.out.println("I=" + i);

            

            if (i > 0)

                  return connections.selectedKeys().iterator();

            else

                  return null;

            

      }

      

      public void openSocketChannel(String host, int port)

      {

            //register the socketchannel, and control object with the selector

            try

            {

                  //try to open a connection

                  SocketChannel sc = SocketChannel.open(new InetSocketAddress(host,port));

      

                  //set blocking to no

                  sc.configureBlocking(false);

                  

                  //register the socketchannel with the selector, and register the control object with the channel

                  sc.register(connections, SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT, new ConnStatus());

            }

            catch (IOException ie)

            {

                  System.out.println("Error While Attempting to Open a Connection.");

            }

      }

}

      




//Delivery Thread Code
import java.util.*;

import java.nio.channels.*;



public class deliveryThread extends Thread{



      private smtpQueue mailQ;

      private SelectionKey key;

      private mtastatus mtastats;

      SMTPHandler smtp = new SMTPHandler();

      private outboundconnections conn_out;

      config cfg;

      

      public deliveryThread(outboundconnections oc, ConnectionManager c, smtpQueue s, mtastatus m, config cf)

      {

            super();

            mailQ = s;

            mtastats = m;

            conn_out = oc;

            cfg = cf;

      }

      

      public synchronized void run()

      {

            //while there is not a system_exit event

            while (mtastats.system_exit() == false)

            {

                  //get any available keys... this will block until there is a connection to service

                  //System.out.println("Searching for connections to service.");

                  Iterator it = conn_out.getReadyConnections();

                  

                  if (it != null)

                  {

                        while(it.hasNext())

                        {

                        System.out.println("Found A Key To Service");

                        //get the next key

                        key = (SelectionKey) it.next();

                  

                        //service the channel

                        if (((key != null) && (key.isValid())) && (key.isReadable() || key.isWritable()))

                        {

                              //first determine the type of operation required

                              System.out.println("SMTP Read Write");

                              

                              //call the smtpEvent handler

                              smtp.processSMTP(key,mailQ,mtastats,cfg,conn_out);                  

                        }      

                        else

                              System.out.println("Bad Key.");

                  

                        //reset the key

                        key = null;

                        }

                  }

                  else

                  {

                        try

                        {

                              sleep(10);

                        }

                        catch(InterruptedException ie)

                        {

                        }

                  }

            }

      }

}


//SMTP handler code
import java.io.*;

import java.nio.channels.*;

import java.nio.ByteBuffer;



public class SMTPHandler {

      

      public SMTPHandler()

      {      

      }

      



      void processSMTP(SelectionKey key,smtpQueue mailQ,mtastatus mtastats, config cfg, outboundconnections conn_out)

      {

            String command = "";



            //processSMTP - handles all read/write/error operations for SMTP

            

            //first get the connstatus object from the key

            ConnStatus cs = (ConnStatus) key.attachment();

            

            //Step 0 = Connection Just Established -- Send the HELO/EHLO

            //Step 1 = Send The MAIL FROM:

            //Step 2 = Send the RCPT TO:

            //Step 3 = Send DATA

            //Step 4 = Send Msg Body to remote

            //Step 5 = Send <crlf>.<crlf>

            //Step 6 = Receive OK

            

            int error = 0;

            

            //get the envelope

            envelope env = cs.getEnvelope();

            

            //if the envelope is null, then get a new envelope and assign it to ConnStatus

            if ((env == null) && (mailQ.spool.getQueueSize() > 0))

                  env = getEnvelope(mailQ, cs);

            

            

            //if there is an envelope to process

            if (env != null)

            {

                  //read from the channel to get status codes      

                  error = smtpRead(key,cs);

                  

                  //evaluate the SMTP error level

                  System.out.println("Error Level:" + error);

                  

                  if ((error == 250) || (error == 220))

                  {

                        //there isnt an SMTP error so send a response

                        command = smtpStep(cs,env,mtastats);

                        

                        //write the response to the channel

                        writeToChannel(key,command);

                  }

                  else

                  {

                        System.out.println("Bad Error Code.");

                  }

                  

            }

            

            //reset the keys interests

            key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT);                  

      }

      

      String smtpStep(ConnStatus cs, envelope env, mtastatus mtastats)

      {

                  String command="";

            

                  System.out.println("Performing Step " + cs.step);

                  

                  if (cs.step == 0)

                  {

                        //Connection Established send the HELO

                        command = "HELO localhost.localdomain ";

                        cs.setStep(1);

                  }

                  else if (cs.step == 1)

                  {

                        command = "MAIL FROM: <" +env.from+">";

                        cs.setStep(2);

                  }

                  else if (cs.step == 2)

                  {

                        command = "RCPT TO: <" +env.to+">";

                        cs.setStep(3);

                  }

                  else if (cs.step == 3)

                  {

                        command = "DATA";

                        cs.setStep(4);

                  }

                  else if (cs.step == 4)

                  {

                        command=env.messagebody + (char) 13 + (char) 10 + ".";

                        cs.setStep(5);

                  }

                  else if (cs.step == 5)

                  {

                        //message has been sent so set env

                        //System.out.println(input);

                        if (cs.lastResponse.startsWith("250"))

                        {

                              System.out.println("Message Delivered.");



                                    //increment delivery counters

                              mtastats.incDelivered();

                              cs.sessionmessages++;

                              

                              cs.attachEnvelope(null);

                              

                              //write the next command                        

                              command = "";

                              cs.setStep(1);                  

                        }

                  }                  

            return command;

      }

      

      int smtpRead(SelectionKey k, ConnStatus cs)

      {

            System.out.println("Reading from Channel.");

            //read string

            String input = "";

            

            //Perform the read

            input = readFromChannel(k);

            

            //if (input.indexOf('\n') > 0)

            //      input = input.substring(0, input.indexOf('\n'));

                  

            System.out.println(input);

                  

            //set the lastResponse

            cs.setLastResponse(input);

            

            return Integer.parseInt(input.substring(0,3));

      }

      

      envelope getEnvelope(smtpQueue mailQ, ConnStatus cs)

      {

            System.out.println("Getting an envelope.");

            //the envelope is null so there is no envelope attached to this channel

            

            //grab an envelope from the mailQ

            envelope env = (envelope) mailQ.spool.pop();

            

            //assign it to the connection status object

            if (env != null)

            {

                  cs.attachEnvelope(env);

                  System.out.println("Envelope Attached.");

            }



            return env;      

      }

      

      void closeChannel(SelectionKey k)

      {

            try

            {

                  SocketChannel sc = (SocketChannel) k.channel();

                  sc.close();

            }

            catch(IOException ie)

            {

                  System.out.println("Error closing channel.");

            }

      }

      

      String readFromChannel(SelectionKey k)

      {

            String retVal = "";

            try

            {

                  ByteBuffer readBuffer = ByteBuffer.allocate(1024);

            

                  SelectionKey key = k;

            

                  SocketChannel sc = (SocketChannel) key.channel();

            

                  while(sc.read(readBuffer) > 0)

                  {

                        readBuffer.flip();            

                        retVal = retVal + new String(readBuffer.array());

                  }

            

                  readBuffer.clear();

            }

            catch(IOException ie)

            {

                  

            }

            return retVal.trim();

      }

      

      void writeToChannel(SelectionKey k, String output)

      {

            try

            {

                  SelectionKey key = k;

            

                  //add the crlf

                  output = output + (char) 13 + (char) 10 + (char) 0;

            

                  //initialize the buffer

                  ByteBuffer buffer = ByteBuffer.allocate(output.getBytes().length);

            

                  //convert the String to a ByteBuffer

                  buffer.put(output.getBytes());

            

                  SocketChannel sc = (SocketChannel) key.channel();

            

                  System.out.println("Writing:" + output);

                  

                  buffer.flip();

                  

                  while(buffer.hasRemaining())

                  {

                        sc.write(buffer);

                  }

                  buffer.clear();

                  System.out.println("Write Completed.");

            }

            catch (IOException ie)

            {

                  System.out.println("error performing SMTP write.");

            }

      }

      

      

}




//connstatus code


public class ConnStatus {



      int step = 0;

      private envelope e = null;

      int sessionmessages = 0;

      String lastResponse = "";

      int type = 0;

      

      public ConnStatus()

      {      

      }

      

      void attachEnvelope(envelope t)

      {

            t = e;

            lastResponse = "";

      }

      

      envelope getEnvelope()

      {

         return e;      

      }

      

      void setStep(int s)

      {

            step = s;

      }

      

      int getStep()

      {

            return step;

      }

      

      void setLastResponse(String s)

      {

            lastResponse = s;

      }

}



//envelope code

public class envelope {

      public String to="";
      public String from="";
      public String filename="";
      public int retries = 0;
      public int status = 0;
      public String error = "";
      public String messagebody="";
      public String domain = "";
      
      public envelope(String t, String From, String fn, String dom)
      {
            //create the new envelope
            to = t;
            from = From;
            filename = fn;
            
            //set the domain
            domain = dom;
      }
}


A working fix is worth 500 points.

Thanks,
Rick
Avatar of richardsimnett
richardsimnett

ASKER

Oh yeah one other thing Im noticing. In the getReadyConnections function in the outboundconnections class, the select() call on the selector doesnt block as it should. According to the API the select() method should block indefinitely until a key is returned, it doesnt do this, it behaves as selectNow() does in a nonblocking fashion. I would appreciate it if someone could tell me how to fix this as well.

Thanks,
Rick
Avatar of CEHJ
Try getting rid of

>>private SelectionKey key;

CEHJ,
Nope same problem. Stops at the end of the first write.

Thanks,
Rick
Remove the resetting too - it's not necessary
ok I just wrote a server proggie the emulates an SMTP server in order to see where this thing is stopping. Her's whats happening. The program in question connects to this smtp emulator, the emulator sends a helo statment, the proggie responds by writing HELO localhost.localdomain, the emulator responds with 250 ok, but the proggie never sees it and never throws the SelectionKey.OP_READ event. Based on what I am seeing this has to do with my interest operator settings.  Im not sure where to go next. I am not deregistering the operators as per your suggestion but I still have the same problem.

I am registering th enew socket as follows now:
sc.register(connections,  SelectionKey.OP_WRITE | SelectionKey.OP_READ  | SelectionKey.OP_CONNECT , new ConnStatus());

I am not resetting nor reregistering anything at this point... that is all commented out.

ASKER CERTIFIED SOLUTION
Avatar of CEHJ
CEHJ
Flag of United Kingdom of Great Britain and Northern Ireland image

Link to home
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
Start Free Trial
btw, good logging practice is *particularly* to be recommended for multi-threaded apps, preferably log4j
cehj,
im not following you what kind of debug code would I put in?

As for the logging, I have my own logging module that I will be using. I just havent incorporated it into my code yet.

Thanks,
Rick
CEHJ,
I got it! Took all day of screwing around but I finally figured it out. I had to rewrite the way the whole thing worked... it turns out that you cannot stack multiple I/O operations on a single loop of the key. I was trying to do a read & a write each time. If you simply divide the operations up into separate iterations and set the keys properly the whole things comes together... and I have to say it FLIES! Here's the modified code:

//Delivery Thread
import java.util.*;

import java.nio.channels.*;



public class deliveryThread extends Thread{



      SMTPHandler smtp = new SMTPHandler();

      smtpQueue mailQ;

      SelectionKey key;

      mtastatus mtastats = null;

      outboundconnections conn_out;

      config cfg;

      

      public deliveryThread(outboundconnections oc, ConnectionManager c, smtpQueue s, mtastatus m, config cf)

      {

            super();

            mailQ = s;

            mtastats = m;

            conn_out = oc;

            cfg = cf;

      }

      

      public synchronized void run()

      {

            //while there is not a system_exit event

            while (mtastats.system_exit() == false)

            {

                  //get any available keys... this will block until there is a connection to service

                  //System.out.println("Searching for connections to service.");

                  Iterator it = conn_out.getReadyConnections();

                  

                  if (it != null)

                  {

                        while(it.hasNext())

                        {

                        //System.out.println("Found A Key To Service");

                        //get the next key

                        key = (SelectionKey) it.next();

                  

                      //service the channel

                        if (((key != null) && (key.isValid())))

                        {

                              //call the smtpEvent handler

                              smtp.processSMTP(key,mailQ,mtastats,cfg,conn_out);                  

                        }      

                        else

                              System.out.println("Bad Key.");

                  

                        //reset the key

                        key = null;

                        }

                  }

            }

      }

}



//SMTP Handler
import java.io.*;

import java.nio.channels.*;

import java.nio.ByteBuffer;



public class SMTPHandler {

      

      public SMTPHandler()

      {      

      }

      



      void processSMTP(SelectionKey key,smtpQueue mailQ,mtastatus mtastats, config cfg, outboundconnections conn_out)

      {

            String command = "";



            //processSMTP - handles all read/write/error operations for SMTP

            

            //first get the connstatus object from the key

            ConnStatus cs = (ConnStatus) key.attachment();

                  

            int error = 0;

            

            //get the envelope

            envelope env = cs.getEnvelope();

            

            //if the envelope is null, then get a new envelope and assign it to ConnStatus

            if ((env == null) && (mailQ.spool.getQueueSize() > 0))

                  env = getEnvelope(mailQ, cs);

            

            //if this is a read cycle

            if (key.isReadable())

            {

                  //read and output the result

                  error = smtpRead(key,cs);

                  //System.out.println("From Server: " + cs.lastResponse + '\n');

                  key.attach(cs);

                  key.interestOps(SelectionKey.OP_WRITE);

            }

            

            //if this is a write cycle

            if (key.isWritable())

            {

                  //write the command to the screen and the channel

                  command = smtpStep(cs,env,mtastats,mailQ);

                  

                  //write the response to the channel

                  writeToChannel(key,command);

                  key.attach(cs);

                  key.interestOps(SelectionKey.OP_READ);

            }

      }

      

      /*//if there is an envelope to process

            if (env != null)

            {

                  //read from the channel to get status codes      

                  error = smtpRead(key,cs);

                  

                  //evaluate the SMTP error level

                  System.out.println("Error Level:" + error);

                  

                  if ((error == 250) || (error == 220) || (error == 354))

                  {

                        //there isnt an SMTP error so send a response

                        command = smtpStep(cs,env,mtastats,mailQ);

                        

                        //write the response to the channel

                        writeToChannel(key,command);

                  }

                  else

                  {

                        System.out.println("Bad Error Code.");

                  }

                  

            }

            else

            {

                  System.out.println("no envelope.");

            }

            */

      String smtpStep(ConnStatus cs, envelope env, mtastatus mtastats,smtpQueue mailQ)

      {

            //Step 0 = Connection Just Established -- Send the HELO/EHLO

            //Step 1 = Send The MAIL FROM:

            //Step 2 = Send the RCPT TO:

            //Step 3 = Send DATA

            //Step 4 = Send Msg Body to remote

            //Step 5 = Send <crlf>.<crlf>

            //Step 6 = Receive OK

            String command="";

            //try

            //{

                  //System.out.println("Performing Step " + cs.step);

                  

                  if (cs.step == 0)

                  {

                        //Connection Established send the HELO

                        command = "HELO localhost.localdomain";

                        cs.setStep(1);

                  }

                  else if (cs.step == 1)

                  {

                        command = "MAIL FROM: <" +env.from+">";

                        cs.setStep(2);

                  }

                  else if (cs.step == 2)

                  {

                        command = "RCPT TO: <" +env.to+">";

                        cs.setStep(3);

                  }

                  else if (cs.step == 3)

                  {

                        command = "DATA";

                        cs.setStep(4);

                  }

                  else if (cs.step == 4)

                  {

                        command=env.messagebody + (char) 13 + (char) 10 + ".";

                        cs.setStep(5);

                  }

                  else if (cs.step == 5)

                  {

                        //message has been sent so set env

                        //System.out.println(input);

                        //if (cs.lastResponse.startsWith("250"))

                        //{

                              //System.out.println("Message Delivered.");



                                    //increment delivery counters

                              mtastats.incDelivered();

                              cs.sessionmessages++;

                              

                              env = getEnvelope(mailQ,cs);

                              cs.attachEnvelope(env);

                              //write the next command                        

                              command = "MAIL FROM: <" +env.from+">";;

                              cs.setStep(2);                  

                        //}

                  }

            /*}catch (NullPointerException npe)

                  {

                        //a null pointer was found.... so....

                        System.out.println(npe.getMessage());

                        npe.printStackTrace();

                        //get a new envelope

                        env = getEnvelope(mailQ,cs);

                        cs.attachEnvelope(env);

                        command = "rset";

                        cs.setStep(1);

                  }*/

            return command;

      }

      

      int smtpRead(SelectionKey k, ConnStatus cs)

      {

            //System.out.println("Reading from Channel.");

            //read string

            String input = "";

            

            //Perform the read

            input = readFromChannel(k);

            

            //if (input.indexOf('\n') > 0)

            //      input = input.substring(0, input.indexOf('\n'));

                  

            //set the lastResponse

            cs.setLastResponse(input);

            

            return Integer.parseInt(input.substring(0,3));

      }

      

      envelope getEnvelope(smtpQueue mailQ, ConnStatus cs)

      {

            //System.out.println("Getting an envelope.");

            //the envelope is null so there is no envelope attached to this channel

            

            //grab an envelope from the mailQ

            envelope env;

            

            env = (envelope) mailQ.spool.pop();

            

            //assign it to the connection status object

            if (env != null)

            {

                  cs.attachEnvelope(env);

                  //System.out.println("Envelope Attached.");

            }



            return env;      

      }

      

      void closeChannel(SelectionKey k)

      {

            try

            {

                  SocketChannel sc = (SocketChannel) k.channel();

                  sc.close();

            }

            catch(IOException ie)

            {

                  System.out.println("Error closing channel.");

            }

      }

      

      String readFromChannel(SelectionKey k)

      {

            String retVal = "";

            try

            {

                  ByteBuffer readBuffer = ByteBuffer.allocate(1024);

            

                  SelectionKey key = k;

            

                  SocketChannel sc = (SocketChannel) key.channel();

            

                  while(sc.read(readBuffer) > 0)

                  {

                        readBuffer.flip();            

                        retVal = retVal + new String(readBuffer.array());

                  }

            

                  readBuffer.clear();

            }

            catch(IOException ie)

            {

                  System.out.println("error reading from channel");

            }

            return retVal.trim();

      }

      

      void writeToChannel(SelectionKey k, String output)

      {

            try

            {

                  SelectionKey key = k;

            

                  //add the crlf

                  output = output + (char) 13 + (char) 10;

            

                  //initialize the buffer

                  ByteBuffer buffer = ByteBuffer.allocate(output.getBytes().length);

            

                  buffer.clear();

                  

                  //convert the String to a ByteBuffer

                  buffer.put(output.getBytes());

            

                  SocketChannel sc = (SocketChannel) key.channel();

            

                  //System.out.println("To Server:" + output);

                  

                  

                  buffer.flip();

                  

                  sc.write(buffer);

                  

                  buffer.clear();

                  //System.out.println("Write Completed.");

            }

            catch (IOException ie)

            {

                  System.out.println("error performing SMTP write.");

            }

      }

      

      

}



Thanks for the help.

-Rick
Glad you got it working

>>As for the logging, I have my own logging module that I will be using.

That's OK - be sure to implement it with log4j ;-)

I haven't done enough work for 500 points so feel free to get them back
CEHJ,
actually they are yours. Im so happy right now... I have this thing pushing with one thread and 2 connections a constant 163,000 emails per minute.

Thanks,
Rick
:-)

>>I have this thing pushing with one thread and 2 connections a constant 163,000 emails per minute.

Just so long as you don't point it in my direction .. ;-)
actually I had to clarify with the customer what this was to be used for before I accepted the project. Its not for outgoing email, its for routing large amounts of company email to various independent locations. :-)