We help IT Professionals succeed at work.

We've partnered with Certified Experts, Carl Webster and Richard Faulkner, to bring you two Citrix podcasts. Learn about 2020 trends and get answers to your biggest Citrix questions!Listen Now

x

Issues with NIO Selection

richardsimnett
on
Medium Priority
310 Views
Last Modified: 2013-12-29
Hello,
I am writing a proggie that uses socketchannels and selectors to deliver SMTP email to remote connections. It seems to work fine on windows, but for some reason on Linux, the key selection fails after the first iteration of the thread.

The code is a bit complex, but I will post it. I know the issue is something to do with selection as when I execute it on windows it works fine, but in linux it goes through the first read iteration on each of the socket channels once, and then never returns any more keys when I select.

Other various datatypes you'll see are just supporting classes which all have been tested heavily and work. I will not include them l as there are tons of them.

//Initialization code - this code is what initializes and starts the outboundconnections, and the selector, and initializes and starts the delivery thread.

                              // Init the outbound connection manager and the Thread Pool
                              conn_out = new outboundconnections(cfg);
                              
                              //start the delivery thread
                              deliveryThread dT = new deliveryThread(conn_out,new ConnectionManager(),mailq,mtastats);
                              dT.start();


//Outbound Connections class

import java.net.InetSocketAddress;
import java.nio.channels.*;
import java.io.IOException;
import java.util.Iterator;

public class outboundconnections {
      
      int maxconnections = 0;
      Selector connections;
      
      public outboundconnections(config c)
      {
            //set the maxconnections
            maxconnections = Integer.parseInt(c.getValue("max_smtp_out").toString());
            
            //create the selector
            try
            {
                  connections = Selector.open();
            
                  //create the new socketchannels
                  for (int i=0; i < maxconnections; i++)
                  {
                          openSocketChannel("192.168.1.115",25);                  
                  }
            }
            catch(IOException ie)
            {
                  //Error on selector open, socketchannel open, or socketchannel blocking
                  System.out.println("ERROR creating channel.");
            }
            
      }
      
      Iterator getReadyConnections()
      {
            //this function returns an iterator of the socket channels ready to be processed
            //it blocks until there is something to return
            int i = 0;
            try
            {
                  i = connections.select();
            }
            catch (IOException ie)
            {
                  System.out.println("ERROR While attempting select of connections.");
            }
            
            //System.out.println("I=" + i);
            
            if (i > 0)
                  return connections.selectedKeys().iterator();
            else
                  return null;
            
      }
      
      public void openSocketChannel(String host, int port)
      {
            //register the socketchannel, and control object with the selector
            try
            {
                  //try to open a connection
                  SocketChannel sc = SocketChannel.open(new InetSocketAddress(host,port));
      
                  //set blocking to no
                  sc.configureBlocking(false);
                  
                  //register the socketchannel with the selector, and register the control object with the channel
                  sc.register(connections, SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT, new ConnStatus());
            }
            catch (IOException ie)
            {
                  System.out.println("Error While Attempting to Open a Connection.");
            }
      }
}
      


//delivery thread class

import java.util.*;
import java.nio.*;
import java.nio.channels.*;

public class deliveryThread extends Thread{

      private smtpQueue mailQ;
      private SelectionKey key;
      private mtastatus mtastats;
      ByteBuffer buffer = ByteBuffer.allocate(1024);
      SMTPHandler smtp = new SMTPHandler();
      private outboundconnections conn_out;
      
      public deliveryThread(outboundconnections oc, ConnectionManager c, smtpQueue s, mtastatus m)
      {
            super();
            mailQ = s;
            mtastats = m;
            conn_out = oc;
      }
      
      public synchronized void run()
      {
            //while there is not a system_exit event
            while (mtastats.system_exit() == false)
            {
                  //get any available keys... this will block until there is a connection to service
                  //System.out.println("Searching for connections to service.");
                  Iterator it = conn_out.getReadyConnections();
                  
                  if (it != null)
                  {
                        while(it.hasNext())
                        {
                        System.out.println("Found A Key To Service");
                        //get the next key
                        key = (SelectionKey) it.next();
                  
                        //reset the operations
                        key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)& (~SelectionKey.OP_CONNECT));
                        
                        //service the channel
                        if (key != null)
                        {
                              //first determine the type of operation required
                              System.out.println("Get the socketchannel");
                              SocketChannel channel = (SocketChannel) key.channel();
                      
                              //is this channel connected?
                              if(channel.isConnected())
                              {
                                    System.out.println("Channel Connected.");
                                    if(key.isReadable() || key.isWritable())
                                    {
                                          System.out.println("SMTP Read Write");
                                          //call the smtpEvent handler
                                          smtp.processSMTP(key,mailQ);
                                    }
                              }      
                              else
                                    System.out.println("No Connection.");
                        }      
                        else
                              System.out.println("Bad Key.");
                  
                        //reset the key
                        key = null;
                        }
                  }
                  else
                  {
                        //failed to find any keys so sleep for a short bit
                        try
                        {
                              //System.out.println("No Keys so Sleeping.");
                              sleep(100);
                        }
                        catch(InterruptedException ie)
                        {
                              System.out.println("error while no key sleep");
                        }
                  }
            }
      }
            
}


//smtphandler class
import java.io.IOException;
import java.nio.channels.*;
import java.nio.ByteBuffer;

public class SMTPHandler {
      
      ByteBuffer buffer = ByteBuffer.allocate(1024);
      
      public SMTPHandler()
      {      
      }
      

      int processSMTP(SelectionKey key,smtpQueue mailQ)
      {
            //gets the selectionkey and processes the next step in message delivery
            
            //first get the connstatus object from the key
            
            ConnStatus cs = (ConnStatus) key.attachment();
            
            //Step 0 = Connection Just Established -- Send the HELO/EHLO
            //Step 1 = Send The MAIL FROM:
            //Step 2 = Send the RCPT TO:
            //Step 3 = Send DATA
            //Step 4 = Send Msg Body to remote
            //Step 5 = Send <crlf>.<crlf>
            //Step 6 = Receive OK
            
            //set the step
            int step = cs.getStep();
            
            //get the envelope
            envelope env = cs.getEnvelope();
            
            if ((env == null) && (mailQ.spool.getQueueSize() > 0))
            {
                  //System.out.println("Getting an envelope.");
                  //the envelope is null so there is no envelope attached to this channel
                  
                  //grab an envelope from the mailQ
                  env = (envelope) mailQ.spool.pop();
                  
                  //assign it to the connection status object
                  if (env != null)
                  {
                        cs.attachEnvelope(env);
                        //System.out.println("Envelope Attached.");
                  }
            }
            
            if (env != null)
            {
                  System.out.println("Reading from Channel.");
                  //read string
                  String input = "";
            
                  //Perform the read
                  try
                  {
                        input = readFromChannel(key);
                  }
                  catch(IOException e)
                  {
                        System.out.println("There was a problem reading from the channel.");
                  }
            
                  if (input.indexOf('\n') > 0)
                        input = input.substring(0, input.indexOf('\n'));
                  
                  //detect error state
                  System.out.println("SMTP STEP " + step + ":" + input);
            
                  String command = "";
            
                  if (step == 0)
                  {
                        //Connection Established send the HELO
                        command = "HELO localhost.localdomain ";
                        cs.setStep(1);
                  }
                  else if (step == 1)
                  {
                        command = "MAIL FROM: <" +env.from+">";
                        cs.setStep(2);
                  }
                  else if (step == 2)
                  {
                        command = "RCPT TO: <" +env.to+">";
                        cs.setStep(3);
                  }
                  else if (step == 3)
                  {
                        command = "DATA";
                        cs.setStep(4);
                  }
                  else if (step == 4)
                  {
                        command=env.messagebody + (char)13 + (char) 10 + ".";
                        cs.attachEnvelope(null);
                        cs.setStep(1);
                  }
                  else if (step == 5)
                  {
                        //message has been sent so set env
                        cs.attachEnvelope(null);
                        cs.setStep(1);
                  }                  
                  //now write the response
                  if (command != "")
                  {
                        try
                        {
                              System.out.println(command);
                              writeToChannel(key,command);
                        }
                        catch (IOException ile)
                        {
                              System.out.println("Error Writing to Channel.");
                        }
                  }
            }
            //attach the connection status object
            key.attach(cs);
            
            //reset the interestOps
            key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT);
            
            return 0;
      }
      
      String readFromChannel(SelectionKey k) throws IOException
      {
            SelectionKey key = k;
            String retVal = "";
            
            SocketChannel sc = (SocketChannel) key.channel();
            
            buffer.clear();
            
            while(sc.read(buffer) > 0)
            {
                  buffer.flip();            
                  retVal = retVal + new String(buffer.array());
            }
            
            buffer.clear();
            
            return retVal.trim();
      }
      
      void writeToChannel(SelectionKey k, String output) throws IOException
      {
            SelectionKey key = k;
            
            //add the crlf
            output = output + '\n';//(char)13 + (char) 10;
            
            //convert the String to a ByteBuffer
            buffer.put(output.getBytes());
            
            SocketChannel sc = (SocketChannel) key.channel();
            
            buffer.flip();
            while(buffer.hasRemaining())
            {
                  sc.write(buffer);
            }
            
            buffer.clear();
      }
      
      
}

A working fix is worth 500 points.

Thanks,
Rick
Comment
Watch Question

CERTIFIED EXPERT
Top Expert 2016
Commented:
Aren't you saying you're not interested in READing anymore here?

>>
// reset the operations
                              key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)
                                          & (~SelectionKey.OP_CONNECT));
>>

Not the solution you were looking for? Getting a personalized solution is easy.

Ask the Experts

Author

Commented:
CEHJ,
Yes I deregister the interests once the key is grabbed, but I did this in the event I chose to use multiple threads, but if you look I reset the interests at the end of the SMTPHandler. I did this to stop the possibility of multiple threads attempting to select the same keys.

Thanks,
Rick
CERTIFIED EXPERT
Top Expert 2016

Commented:
>>I did this to stop the possibility of multiple threads attempting to select the same keys.

Well that possibility doesn't exist as there can only be one thread through the method at one time - your run method is synchronized. I think you should get rid of that stuff. Shall try to look at this later - very late here. Would be better if i could run it though.

Author

Commented:
CEHJ,
Amazingly enough taking out the deregistration fixed it. Im not sure why but this code now works on linux (it always worked on windows). I am now running into another problem however, please see the Question titled: Issues Reading and Writing with NIO.

Thanks,
Rick
CERTIFIED EXPERT
Top Expert 2016

Commented:
:-)

Maybe i missed your other one - possibly answered now

Author

Commented:
CEHJ,
Sorry I never posted the follow up as I realized what I thougt worked (cuz it actually made it through about 10 messages) really didnt. I still have the same problem. I have opened a new question regarding this issue at : http://www.experts-exchange.com/Programming/Programming_Languages/Java/Q_21800722.html

Thanks,
Rick
Access more of Experts Exchange with a free account
Thanks for using Experts Exchange.

Create a free account to continue.

Limited access with a free account allows you to:

  • View three pieces of content (articles, solutions, posts, and videos)
  • Ask the experts questions (counted toward content limit)
  • Customize your dashboard and profile

*This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.

OR

Please enter a first name

Please enter a last name

8+ characters (letters, numbers, and a symbol)

By clicking, you agree to the Terms of Use and Privacy Policy.