Learn how to a build a cloud-first strategyRegister Now

x
?
Solved

Issues with NIO Selection

Posted on 2006-04-02
6
Medium Priority
?
266 Views
Last Modified: 2013-12-29
Hello,
I am writing a proggie that uses socketchannels and selectors to deliver SMTP email to remote connections. It seems to work fine on windows, but for some reason on Linux, the key selection fails after the first iteration of the thread.

The code is a bit complex, but I will post it. I know the issue is something to do with selection as when I execute it on windows it works fine, but in linux it goes through the first read iteration on each of the socket channels once, and then never returns any more keys when I select.

Other various datatypes you'll see are just supporting classes which all have been tested heavily and work. I will not include them l as there are tons of them.

//Initialization code - this code is what initializes and starts the outboundconnections, and the selector, and initializes and starts the delivery thread.

                              // Init the outbound connection manager and the Thread Pool
                              conn_out = new outboundconnections(cfg);
                              
                              //start the delivery thread
                              deliveryThread dT = new deliveryThread(conn_out,new ConnectionManager(),mailq,mtastats);
                              dT.start();


//Outbound Connections class

import java.net.InetSocketAddress;
import java.nio.channels.*;
import java.io.IOException;
import java.util.Iterator;

public class outboundconnections {
      
      int maxconnections = 0;
      Selector connections;
      
      public outboundconnections(config c)
      {
            //set the maxconnections
            maxconnections = Integer.parseInt(c.getValue("max_smtp_out").toString());
            
            //create the selector
            try
            {
                  connections = Selector.open();
            
                  //create the new socketchannels
                  for (int i=0; i < maxconnections; i++)
                  {
                          openSocketChannel("192.168.1.115",25);                  
                  }
            }
            catch(IOException ie)
            {
                  //Error on selector open, socketchannel open, or socketchannel blocking
                  System.out.println("ERROR creating channel.");
            }
            
      }
      
      Iterator getReadyConnections()
      {
            //this function returns an iterator of the socket channels ready to be processed
            //it blocks until there is something to return
            int i = 0;
            try
            {
                  i = connections.select();
            }
            catch (IOException ie)
            {
                  System.out.println("ERROR While attempting select of connections.");
            }
            
            //System.out.println("I=" + i);
            
            if (i > 0)
                  return connections.selectedKeys().iterator();
            else
                  return null;
            
      }
      
      public void openSocketChannel(String host, int port)
      {
            //register the socketchannel, and control object with the selector
            try
            {
                  //try to open a connection
                  SocketChannel sc = SocketChannel.open(new InetSocketAddress(host,port));
      
                  //set blocking to no
                  sc.configureBlocking(false);
                  
                  //register the socketchannel with the selector, and register the control object with the channel
                  sc.register(connections, SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT, new ConnStatus());
            }
            catch (IOException ie)
            {
                  System.out.println("Error While Attempting to Open a Connection.");
            }
      }
}
      


//delivery thread class

import java.util.*;
import java.nio.*;
import java.nio.channels.*;

public class deliveryThread extends Thread{

      private smtpQueue mailQ;
      private SelectionKey key;
      private mtastatus mtastats;
      ByteBuffer buffer = ByteBuffer.allocate(1024);
      SMTPHandler smtp = new SMTPHandler();
      private outboundconnections conn_out;
      
      public deliveryThread(outboundconnections oc, ConnectionManager c, smtpQueue s, mtastatus m)
      {
            super();
            mailQ = s;
            mtastats = m;
            conn_out = oc;
      }
      
      public synchronized void run()
      {
            //while there is not a system_exit event
            while (mtastats.system_exit() == false)
            {
                  //get any available keys... this will block until there is a connection to service
                  //System.out.println("Searching for connections to service.");
                  Iterator it = conn_out.getReadyConnections();
                  
                  if (it != null)
                  {
                        while(it.hasNext())
                        {
                        System.out.println("Found A Key To Service");
                        //get the next key
                        key = (SelectionKey) it.next();
                  
                        //reset the operations
                        key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)& (~SelectionKey.OP_CONNECT));
                        
                        //service the channel
                        if (key != null)
                        {
                              //first determine the type of operation required
                              System.out.println("Get the socketchannel");
                              SocketChannel channel = (SocketChannel) key.channel();
                      
                              //is this channel connected?
                              if(channel.isConnected())
                              {
                                    System.out.println("Channel Connected.");
                                    if(key.isReadable() || key.isWritable())
                                    {
                                          System.out.println("SMTP Read Write");
                                          //call the smtpEvent handler
                                          smtp.processSMTP(key,mailQ);
                                    }
                              }      
                              else
                                    System.out.println("No Connection.");
                        }      
                        else
                              System.out.println("Bad Key.");
                  
                        //reset the key
                        key = null;
                        }
                  }
                  else
                  {
                        //failed to find any keys so sleep for a short bit
                        try
                        {
                              //System.out.println("No Keys so Sleeping.");
                              sleep(100);
                        }
                        catch(InterruptedException ie)
                        {
                              System.out.println("error while no key sleep");
                        }
                  }
            }
      }
            
}


//smtphandler class
import java.io.IOException;
import java.nio.channels.*;
import java.nio.ByteBuffer;

public class SMTPHandler {
      
      ByteBuffer buffer = ByteBuffer.allocate(1024);
      
      public SMTPHandler()
      {      
      }
      

      int processSMTP(SelectionKey key,smtpQueue mailQ)
      {
            //gets the selectionkey and processes the next step in message delivery
            
            //first get the connstatus object from the key
            
            ConnStatus cs = (ConnStatus) key.attachment();
            
            //Step 0 = Connection Just Established -- Send the HELO/EHLO
            //Step 1 = Send The MAIL FROM:
            //Step 2 = Send the RCPT TO:
            //Step 3 = Send DATA
            //Step 4 = Send Msg Body to remote
            //Step 5 = Send <crlf>.<crlf>
            //Step 6 = Receive OK
            
            //set the step
            int step = cs.getStep();
            
            //get the envelope
            envelope env = cs.getEnvelope();
            
            if ((env == null) && (mailQ.spool.getQueueSize() > 0))
            {
                  //System.out.println("Getting an envelope.");
                  //the envelope is null so there is no envelope attached to this channel
                  
                  //grab an envelope from the mailQ
                  env = (envelope) mailQ.spool.pop();
                  
                  //assign it to the connection status object
                  if (env != null)
                  {
                        cs.attachEnvelope(env);
                        //System.out.println("Envelope Attached.");
                  }
            }
            
            if (env != null)
            {
                  System.out.println("Reading from Channel.");
                  //read string
                  String input = "";
            
                  //Perform the read
                  try
                  {
                        input = readFromChannel(key);
                  }
                  catch(IOException e)
                  {
                        System.out.println("There was a problem reading from the channel.");
                  }
            
                  if (input.indexOf('\n') > 0)
                        input = input.substring(0, input.indexOf('\n'));
                  
                  //detect error state
                  System.out.println("SMTP STEP " + step + ":" + input);
            
                  String command = "";
            
                  if (step == 0)
                  {
                        //Connection Established send the HELO
                        command = "HELO localhost.localdomain ";
                        cs.setStep(1);
                  }
                  else if (step == 1)
                  {
                        command = "MAIL FROM: <" +env.from+">";
                        cs.setStep(2);
                  }
                  else if (step == 2)
                  {
                        command = "RCPT TO: <" +env.to+">";
                        cs.setStep(3);
                  }
                  else if (step == 3)
                  {
                        command = "DATA";
                        cs.setStep(4);
                  }
                  else if (step == 4)
                  {
                        command=env.messagebody + (char)13 + (char) 10 + ".";
                        cs.attachEnvelope(null);
                        cs.setStep(1);
                  }
                  else if (step == 5)
                  {
                        //message has been sent so set env
                        cs.attachEnvelope(null);
                        cs.setStep(1);
                  }                  
                  //now write the response
                  if (command != "")
                  {
                        try
                        {
                              System.out.println(command);
                              writeToChannel(key,command);
                        }
                        catch (IOException ile)
                        {
                              System.out.println("Error Writing to Channel.");
                        }
                  }
            }
            //attach the connection status object
            key.attach(cs);
            
            //reset the interestOps
            key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT);
            
            return 0;
      }
      
      String readFromChannel(SelectionKey k) throws IOException
      {
            SelectionKey key = k;
            String retVal = "";
            
            SocketChannel sc = (SocketChannel) key.channel();
            
            buffer.clear();
            
            while(sc.read(buffer) > 0)
            {
                  buffer.flip();            
                  retVal = retVal + new String(buffer.array());
            }
            
            buffer.clear();
            
            return retVal.trim();
      }
      
      void writeToChannel(SelectionKey k, String output) throws IOException
      {
            SelectionKey key = k;
            
            //add the crlf
            output = output + '\n';//(char)13 + (char) 10;
            
            //convert the String to a ByteBuffer
            buffer.put(output.getBytes());
            
            SocketChannel sc = (SocketChannel) key.channel();
            
            buffer.flip();
            while(buffer.hasRemaining())
            {
                  sc.write(buffer);
            }
            
            buffer.clear();
      }
      
      
}

A working fix is worth 500 points.

Thanks,
Rick
0
Comment
Question by:richardsimnett
  • 3
  • 3
6 Comments
 
LVL 86

Accepted Solution

by:
CEHJ earned 2000 total points
ID: 16366230
Aren't you saying you're not interested in READing anymore here?

>>
// reset the operations
                              key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)
                                          & (~SelectionKey.OP_CONNECT));
>>
0
 

Author Comment

by:richardsimnett
ID: 16366572
CEHJ,
Yes I deregister the interests once the key is grabbed, but I did this in the event I chose to use multiple threads, but if you look I reset the interests at the end of the SMTPHandler. I did this to stop the possibility of multiple threads attempting to select the same keys.

Thanks,
Rick
0
 
LVL 86

Expert Comment

by:CEHJ
ID: 16366675
>>I did this to stop the possibility of multiple threads attempting to select the same keys.

Well that possibility doesn't exist as there can only be one thread through the method at one time - your run method is synchronized. I think you should get rid of that stuff. Shall try to look at this later - very late here. Would be better if i could run it though.
0
Concerto Cloud for Software Providers & ISVs

Can Concerto Cloud Services help you focus on evolving your application offerings, while delivering the best cloud experience to your customers? From DevOps to revenue models and customer support, the answer is yes!

Learn how Concerto can help you.

 

Author Comment

by:richardsimnett
ID: 16369145
CEHJ,
Amazingly enough taking out the deregistration fixed it. Im not sure why but this code now works on linux (it always worked on windows). I am now running into another problem however, please see the Question titled: Issues Reading and Writing with NIO.

Thanks,
Rick
0
 
LVL 86

Expert Comment

by:CEHJ
ID: 16369663
:-)

Maybe i missed your other one - possibly answered now
0
 

Author Comment

by:richardsimnett
ID: 16369837
CEHJ,
Sorry I never posted the follow up as I realized what I thougt worked (cuz it actually made it through about 10 messages) really didnt. I still have the same problem. I have opened a new question regarding this issue at : http://www.experts-exchange.com/Programming/Programming_Languages/Java/Q_21800722.html

Thanks,
Rick
0

Featured Post

[Webinar] Cloud and Mobile-First Strategy

Maybe you’ve fully adopted the cloud since the beginning. Or maybe you started with on-prem resources but are pursuing a “cloud and mobile first” strategy. Getting to that end state has its challenges. Discover how to build out a 100% cloud and mobile IT strategy in this webinar.

Question has a verified solution.

If you are experiencing a similar issue, please ask a related question

Java had always been an easily readable and understandable language.  Some relatively recent changes in the language seem to be changing this pretty fast, and anyone that had not seen any Java code for the last 5 years will possibly have issues unde…
Basic understanding on "OO- Object Orientation" is needed for designing a logical solution to solve a problem. Basic OOAD is a prerequisite for a coder to ensure that they follow the basic design of OO. This would help developers to understand the b…
Viewers learn how to read error messages and identify possible mistakes that could cause hours of frustration. Coding is as much about debugging your code as it is about writing it. Define Error Message: Line Numbers: Type of Error: Break Down…
This tutorial covers a practical example of lazy loading technique and early loading technique in a Singleton Design Pattern.
Suggested Courses
Course of the Month20 days, 19 hours left to enroll

810 members asked questions and received personalized solutions in the past 7 days.

Join the community of 500,000 technology professionals and ask your questions.

Join & Ask a Question