richardsimnett
asked on
Issues with NIO Selection
Hello,
I am writing a proggie that uses socketchannels and selectors to deliver SMTP email to remote connections. It seems to work fine on windows, but for some reason on Linux, the key selection fails after the first iteration of the thread.
The code is a bit complex, but I will post it. I know the issue is something to do with selection as when I execute it on windows it works fine, but in linux it goes through the first read iteration on each of the socket channels once, and then never returns any more keys when I select.
Other various datatypes you'll see are just supporting classes which all have been tested heavily and work. I will not include them l as there are tons of them.
//Initialization code - this code is what initializes and starts the outboundconnections, and the selector, and initializes and starts the delivery thread.
// Init the outbound connection manager and the Thread Pool
conn_out = new outboundconnections(cfg);
//start the delivery thread
deliveryThread dT = new deliveryThread(conn_out,ne w ConnectionManager(),mailq, mtastats);
dT.start();
//Outbound Connections class
import java.net.InetSocketAddress ;
import java.nio.channels.*;
import java.io.IOException;
import java.util.Iterator;
public class outboundconnections {
int maxconnections = 0;
Selector connections;
public outboundconnections(config c)
{
//set the maxconnections
maxconnections = Integer.parseInt(c.getValu e("max_smt p_out").to String());
//create the selector
try
{
connections = Selector.open();
//create the new socketchannels
for (int i=0; i < maxconnections; i++)
{
openSocketChannel("192.168 .1.115",25 );
}
}
catch(IOException ie)
{
//Error on selector open, socketchannel open, or socketchannel blocking
System.out.println("ERROR creating channel.");
}
}
Iterator getReadyConnections()
{
//this function returns an iterator of the socket channels ready to be processed
//it blocks until there is something to return
int i = 0;
try
{
i = connections.select();
}
catch (IOException ie)
{
System.out.println("ERROR While attempting select of connections.");
}
//System.out.println("I=" + i);
if (i > 0)
return connections.selectedKeys() .iterator( );
else
return null;
}
public void openSocketChannel(String host, int port)
{
//register the socketchannel, and control object with the selector
try
{
//try to open a connection
SocketChannel sc = SocketChannel.open(new InetSocketAddress(host,por t));
//set blocking to no
sc.configureBlocking(false );
//register the socketchannel with the selector, and register the control object with the channel
sc.register(connections, SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT, new ConnStatus());
}
catch (IOException ie)
{
System.out.println("Error While Attempting to Open a Connection.");
}
}
}
//delivery thread class
import java.util.*;
import java.nio.*;
import java.nio.channels.*;
public class deliveryThread extends Thread{
private smtpQueue mailQ;
private SelectionKey key;
private mtastatus mtastats;
ByteBuffer buffer = ByteBuffer.allocate(1024);
SMTPHandler smtp = new SMTPHandler();
private outboundconnections conn_out;
public deliveryThread(outboundcon nections oc, ConnectionManager c, smtpQueue s, mtastatus m)
{
super();
mailQ = s;
mtastats = m;
conn_out = oc;
}
public synchronized void run()
{
//while there is not a system_exit event
while (mtastats.system_exit() == false)
{
//get any available keys... this will block until there is a connection to service
//System.out.println("Sear ching for connections to service.");
Iterator it = conn_out.getReadyConnectio ns();
if (it != null)
{
while(it.hasNext())
{
System.out.println("Found A Key To Service");
//get the next key
key = (SelectionKey) it.next();
//reset the operations
key.interestOps(key.intere stOps() & (~SelectionKey.OP_READ)& (~SelectionKey.OP_CONNECT) );
//service the channel
if (key != null)
{
//first determine the type of operation required
System.out.println("Get the socketchannel");
SocketChannel channel = (SocketChannel) key.channel();
//is this channel connected?
if(channel.isConnected())
{
System.out.println("Channe l Connected.");
if(key.isReadable() || key.isWritable())
{
System.out.println("SMTP Read Write");
//call the smtpEvent handler
smtp.processSMTP(key,mailQ );
}
}
else
System.out.println("No Connection.");
}
else
System.out.println("Bad Key.");
//reset the key
key = null;
}
}
else
{
//failed to find any keys so sleep for a short bit
try
{
//System.out.println("No Keys so Sleeping.");
sleep(100);
}
catch(InterruptedException ie)
{
System.out.println("error while no key sleep");
}
}
}
}
}
//smtphandler class
import java.io.IOException;
import java.nio.channels.*;
import java.nio.ByteBuffer;
public class SMTPHandler {
ByteBuffer buffer = ByteBuffer.allocate(1024);
public SMTPHandler()
{
}
int processSMTP(SelectionKey key,smtpQueue mailQ)
{
//gets the selectionkey and processes the next step in message delivery
//first get the connstatus object from the key
ConnStatus cs = (ConnStatus) key.attachment();
//Step 0 = Connection Just Established -- Send the HELO/EHLO
//Step 1 = Send The MAIL FROM:
//Step 2 = Send the RCPT TO:
//Step 3 = Send DATA
//Step 4 = Send Msg Body to remote
//Step 5 = Send <crlf>.<crlf>
//Step 6 = Receive OK
//set the step
int step = cs.getStep();
//get the envelope
envelope env = cs.getEnvelope();
if ((env == null) && (mailQ.spool.getQueueSize( ) > 0))
{
//System.out.println("Gett ing an envelope.");
//the envelope is null so there is no envelope attached to this channel
//grab an envelope from the mailQ
env = (envelope) mailQ.spool.pop();
//assign it to the connection status object
if (env != null)
{
cs.attachEnvelope(env);
//System.out.println("Enve lope Attached.");
}
}
if (env != null)
{
System.out.println("Readin g from Channel.");
//read string
String input = "";
//Perform the read
try
{
input = readFromChannel(key);
}
catch(IOException e)
{
System.out.println("There was a problem reading from the channel.");
}
if (input.indexOf('\n') > 0)
input = input.substring(0, input.indexOf('\n'));
//detect error state
System.out.println("SMTP STEP " + step + ":" + input);
String command = "";
if (step == 0)
{
//Connection Established send the HELO
command = "HELO localhost.localdomain ";
cs.setStep(1);
}
else if (step == 1)
{
command = "MAIL FROM: <" +env.from+">";
cs.setStep(2);
}
else if (step == 2)
{
command = "RCPT TO: <" +env.to+">";
cs.setStep(3);
}
else if (step == 3)
{
command = "DATA";
cs.setStep(4);
}
else if (step == 4)
{
command=env.messagebody + (char)13 + (char) 10 + ".";
cs.attachEnvelope(null);
cs.setStep(1);
}
else if (step == 5)
{
//message has been sent so set env
cs.attachEnvelope(null);
cs.setStep(1);
}
//now write the response
if (command != "")
{
try
{
System.out.println(command );
writeToChannel(key,command );
}
catch (IOException ile)
{
System.out.println("Error Writing to Channel.");
}
}
}
//attach the connection status object
key.attach(cs);
//reset the interestOps
key.interestOps(SelectionK ey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT);
return 0;
}
String readFromChannel(SelectionK ey k) throws IOException
{
SelectionKey key = k;
String retVal = "";
SocketChannel sc = (SocketChannel) key.channel();
buffer.clear();
while(sc.read(buffer) > 0)
{
buffer.flip();
retVal = retVal + new String(buffer.array());
}
buffer.clear();
return retVal.trim();
}
void writeToChannel(SelectionKe y k, String output) throws IOException
{
SelectionKey key = k;
//add the crlf
output = output + '\n';//(char)13 + (char) 10;
//convert the String to a ByteBuffer
buffer.put(output.getBytes ());
SocketChannel sc = (SocketChannel) key.channel();
buffer.flip();
while(buffer.hasRemaining( ))
{
sc.write(buffer);
}
buffer.clear();
}
}
A working fix is worth 500 points.
Thanks,
Rick
I am writing a proggie that uses socketchannels and selectors to deliver SMTP email to remote connections. It seems to work fine on windows, but for some reason on Linux, the key selection fails after the first iteration of the thread.
The code is a bit complex, but I will post it. I know the issue is something to do with selection as when I execute it on windows it works fine, but in linux it goes through the first read iteration on each of the socket channels once, and then never returns any more keys when I select.
Other various datatypes you'll see are just supporting classes which all have been tested heavily and work. I will not include them l as there are tons of them.
//Initialization code - this code is what initializes and starts the outboundconnections, and the selector, and initializes and starts the delivery thread.
// Init the outbound connection manager and the Thread Pool
conn_out = new outboundconnections(cfg);
//start the delivery thread
deliveryThread dT = new deliveryThread(conn_out,ne
dT.start();
//Outbound Connections class
import java.net.InetSocketAddress
import java.nio.channels.*;
import java.io.IOException;
import java.util.Iterator;
public class outboundconnections {
int maxconnections = 0;
Selector connections;
public outboundconnections(config
{
//set the maxconnections
maxconnections = Integer.parseInt(c.getValu
//create the selector
try
{
connections = Selector.open();
//create the new socketchannels
for (int i=0; i < maxconnections; i++)
{
openSocketChannel("192.168
}
}
catch(IOException ie)
{
//Error on selector open, socketchannel open, or socketchannel blocking
System.out.println("ERROR creating channel.");
}
}
Iterator getReadyConnections()
{
//this function returns an iterator of the socket channels ready to be processed
//it blocks until there is something to return
int i = 0;
try
{
i = connections.select();
}
catch (IOException ie)
{
System.out.println("ERROR While attempting select of connections.");
}
//System.out.println("I=" + i);
if (i > 0)
return connections.selectedKeys()
else
return null;
}
public void openSocketChannel(String host, int port)
{
//register the socketchannel, and control object with the selector
try
{
//try to open a connection
SocketChannel sc = SocketChannel.open(new InetSocketAddress(host,por
//set blocking to no
sc.configureBlocking(false
//register the socketchannel with the selector, and register the control object with the channel
sc.register(connections, SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT, new ConnStatus());
}
catch (IOException ie)
{
System.out.println("Error While Attempting to Open a Connection.");
}
}
}
//delivery thread class
import java.util.*;
import java.nio.*;
import java.nio.channels.*;
public class deliveryThread extends Thread{
private smtpQueue mailQ;
private SelectionKey key;
private mtastatus mtastats;
ByteBuffer buffer = ByteBuffer.allocate(1024);
SMTPHandler smtp = new SMTPHandler();
private outboundconnections conn_out;
public deliveryThread(outboundcon
{
super();
mailQ = s;
mtastats = m;
conn_out = oc;
}
public synchronized void run()
{
//while there is not a system_exit event
while (mtastats.system_exit() == false)
{
//get any available keys... this will block until there is a connection to service
//System.out.println("Sear
Iterator it = conn_out.getReadyConnectio
if (it != null)
{
while(it.hasNext())
{
System.out.println("Found A Key To Service");
//get the next key
key = (SelectionKey) it.next();
//reset the operations
key.interestOps(key.intere
//service the channel
if (key != null)
{
//first determine the type of operation required
System.out.println("Get the socketchannel");
SocketChannel channel = (SocketChannel) key.channel();
//is this channel connected?
if(channel.isConnected())
{
System.out.println("Channe
if(key.isReadable() || key.isWritable())
{
System.out.println("SMTP Read Write");
//call the smtpEvent handler
smtp.processSMTP(key,mailQ
}
}
else
System.out.println("No Connection.");
}
else
System.out.println("Bad Key.");
//reset the key
key = null;
}
}
else
{
//failed to find any keys so sleep for a short bit
try
{
//System.out.println("No Keys so Sleeping.");
sleep(100);
}
catch(InterruptedException
{
System.out.println("error while no key sleep");
}
}
}
}
}
//smtphandler class
import java.io.IOException;
import java.nio.channels.*;
import java.nio.ByteBuffer;
public class SMTPHandler {
ByteBuffer buffer = ByteBuffer.allocate(1024);
public SMTPHandler()
{
}
int processSMTP(SelectionKey key,smtpQueue mailQ)
{
//gets the selectionkey and processes the next step in message delivery
//first get the connstatus object from the key
ConnStatus cs = (ConnStatus) key.attachment();
//Step 0 = Connection Just Established -- Send the HELO/EHLO
//Step 1 = Send The MAIL FROM:
//Step 2 = Send the RCPT TO:
//Step 3 = Send DATA
//Step 4 = Send Msg Body to remote
//Step 5 = Send <crlf>.<crlf>
//Step 6 = Receive OK
//set the step
int step = cs.getStep();
//get the envelope
envelope env = cs.getEnvelope();
if ((env == null) && (mailQ.spool.getQueueSize(
{
//System.out.println("Gett
//the envelope is null so there is no envelope attached to this channel
//grab an envelope from the mailQ
env = (envelope) mailQ.spool.pop();
//assign it to the connection status object
if (env != null)
{
cs.attachEnvelope(env);
//System.out.println("Enve
}
}
if (env != null)
{
System.out.println("Readin
//read string
String input = "";
//Perform the read
try
{
input = readFromChannel(key);
}
catch(IOException e)
{
System.out.println("There was a problem reading from the channel.");
}
if (input.indexOf('\n') > 0)
input = input.substring(0, input.indexOf('\n'));
//detect error state
System.out.println("SMTP STEP " + step + ":" + input);
String command = "";
if (step == 0)
{
//Connection Established send the HELO
command = "HELO localhost.localdomain ";
cs.setStep(1);
}
else if (step == 1)
{
command = "MAIL FROM: <" +env.from+">";
cs.setStep(2);
}
else if (step == 2)
{
command = "RCPT TO: <" +env.to+">";
cs.setStep(3);
}
else if (step == 3)
{
command = "DATA";
cs.setStep(4);
}
else if (step == 4)
{
command=env.messagebody + (char)13 + (char) 10 + ".";
cs.attachEnvelope(null);
cs.setStep(1);
}
else if (step == 5)
{
//message has been sent so set env
cs.attachEnvelope(null);
cs.setStep(1);
}
//now write the response
if (command != "")
{
try
{
System.out.println(command
writeToChannel(key,command
}
catch (IOException ile)
{
System.out.println("Error Writing to Channel.");
}
}
}
//attach the connection status object
key.attach(cs);
//reset the interestOps
key.interestOps(SelectionK
return 0;
}
String readFromChannel(SelectionK
{
SelectionKey key = k;
String retVal = "";
SocketChannel sc = (SocketChannel) key.channel();
buffer.clear();
while(sc.read(buffer) > 0)
{
buffer.flip();
retVal = retVal + new String(buffer.array());
}
buffer.clear();
return retVal.trim();
}
void writeToChannel(SelectionKe
{
SelectionKey key = k;
//add the crlf
output = output + '\n';//(char)13 + (char) 10;
//convert the String to a ByteBuffer
buffer.put(output.getBytes
SocketChannel sc = (SocketChannel) key.channel();
buffer.flip();
while(buffer.hasRemaining(
{
sc.write(buffer);
}
buffer.clear();
}
}
A working fix is worth 500 points.
Thanks,
Rick
ASKER CERTIFIED SOLUTION
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
>>I did this to stop the possibility of multiple threads attempting to select the same keys.
Well that possibility doesn't exist as there can only be one thread through the method at one time - your run method is synchronized. I think you should get rid of that stuff. Shall try to look at this later - very late here. Would be better if i could run it though.
Well that possibility doesn't exist as there can only be one thread through the method at one time - your run method is synchronized. I think you should get rid of that stuff. Shall try to look at this later - very late here. Would be better if i could run it though.
ASKER
CEHJ,
Amazingly enough taking out the deregistration fixed it. Im not sure why but this code now works on linux (it always worked on windows). I am now running into another problem however, please see the Question titled: Issues Reading and Writing with NIO.
Thanks,
Rick
Amazingly enough taking out the deregistration fixed it. Im not sure why but this code now works on linux (it always worked on windows). I am now running into another problem however, please see the Question titled: Issues Reading and Writing with NIO.
Thanks,
Rick
:-)
Maybe i missed your other one - possibly answered now
Maybe i missed your other one - possibly answered now
ASKER
CEHJ,
Sorry I never posted the follow up as I realized what I thougt worked (cuz it actually made it through about 10 messages) really didnt. I still have the same problem. I have opened a new question regarding this issue at : https://www.experts-exchange.com/questions/21800722/Issues-with-NIO-Selection-continued.html
Thanks,
Rick
Sorry I never posted the follow up as I realized what I thougt worked (cuz it actually made it through about 10 messages) really didnt. I still have the same problem. I have opened a new question regarding this issue at : https://www.experts-exchange.com/questions/21800722/Issues-with-NIO-Selection-continued.html
Thanks,
Rick
ASKER
Yes I deregister the interests once the key is grabbed, but I did this in the event I chose to use multiple threads, but if you look I reset the interests at the end of the SMTPHandler. I did this to stop the possibility of multiple threads attempting to select the same keys.
Thanks,
Rick