richardsimnett
asked on
Issues with NIO Selection continued
This is a continuation of a previous question found at https://www.experts-exchange.com/questions/21798808/Issues-with-NIO-Selection.html.
I am having the identical problem as in the previous post. I got a bit ahead of myself. Here is the new code I am working with (I have only changed a few things around to make the code a bit easier to read). The code stops again after the first write over socket channel. It seems to keep checking for a key, and the selector reports having a key registered, but the selector never reports the key as having a valid operation after the first iteration. The write seems to complete successfully, although for some reason either A) the selector isnt properly detecting operation events or B) the response never gets to its destination (or maybe not completely) so a response is never sent back. Hopefully someone can tell me what I am doing wrong here cuz I have exhausted every idea I could come up with. There is no point in including all the code, as A) the interface for data is proprietary, and B) there are tons of classes that all have been tested thoroughly and work, I have included the classes that are in question.
//Program Output
Found A Key To Service
SMTP Read Write
Getting an envelope.
Envelope Attached.
Reading from Channel.
220 localhost.localdomain ESMTP service ready
Error Level:220
Performing Step 0
Writing:HELO localhost.localdomain
Write Completed.
//Initialization code
// Init the outbound connection manager and the delivery thread
conn_out = new outboundconnections(cfg);
//start the delivery thread
deliveryThread dT = new deliveryThread(conn_out,ne w ConnectionManager(),mailq, mtastats,c fg);
dT.start();
//Outbound Connections Code
import java.net.InetSocketAddress ;
import java.nio.channels.*;
import java.io.IOException;
import java.util.Iterator;
public class outboundconnections {
int maxconnections = 0;
Selector connections;
public outboundconnections(config c)
{
//set the maxconnections
maxconnections = Integer.parseInt(c.getValu e("max_smt p_out").to String());
//create the selector
try
{
connections = Selector.open();
//create the new socketchannels
for (int i=0; i < maxconnections; i++)
{
openSocketChannel(c.getVal ue("mta_ip ").toStrin g(),25);
}
}
catch(IOException ie)
{
//Error on selector open, socketchannel open, or socketchannel blocking
System.out.println("ERROR creating channel.");
}
}
Iterator getReadyConnections()
{
//this function returns an iterator of the socket channels ready to be processed
//it blocks until there is something to return
int i = 0;
//System.out.println("Keys in selector: " + connections.keys().size()) ;
try
{
i = connections.select();
}
catch (IOException ie)
{
System.out.println("ERROR While attempting select of connections.");
}
//System.out.println("I=" + i);
if (i > 0)
return connections.selectedKeys() .iterator( );
else
return null;
}
public void openSocketChannel(String host, int port)
{
//register the socketchannel, and control object with the selector
try
{
//try to open a connection
SocketChannel sc = SocketChannel.open(new InetSocketAddress(host,por t));
//set blocking to no
sc.configureBlocking(false );
//register the socketchannel with the selector, and register the control object with the channel
sc.register(connections, SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT, new ConnStatus());
}
catch (IOException ie)
{
System.out.println("Error While Attempting to Open a Connection.");
}
}
}
//Delivery Thread Code
import java.util.*;
import java.nio.channels.*;
public class deliveryThread extends Thread{
private smtpQueue mailQ;
private SelectionKey key;
private mtastatus mtastats;
SMTPHandler smtp = new SMTPHandler();
private outboundconnections conn_out;
config cfg;
public deliveryThread(outboundcon nections oc, ConnectionManager c, smtpQueue s, mtastatus m, config cf)
{
super();
mailQ = s;
mtastats = m;
conn_out = oc;
cfg = cf;
}
public synchronized void run()
{
//while there is not a system_exit event
while (mtastats.system_exit() == false)
{
//get any available keys... this will block until there is a connection to service
//System.out.println("Sear ching for connections to service.");
Iterator it = conn_out.getReadyConnectio ns();
if (it != null)
{
while(it.hasNext())
{
System.out.println("Found A Key To Service");
//get the next key
key = (SelectionKey) it.next();
//service the channel
if (((key != null) && (key.isValid())) && (key.isReadable() || key.isWritable()))
{
//first determine the type of operation required
System.out.println("SMTP Read Write");
//call the smtpEvent handler
smtp.processSMTP(key,mailQ ,mtastats, cfg,conn_o ut);
}
else
System.out.println("Bad Key.");
//reset the key
key = null;
}
}
else
{
try
{
sleep(10);
}
catch(InterruptedException ie)
{
}
}
}
}
}
//SMTP handler code
import java.io.*;
import java.nio.channels.*;
import java.nio.ByteBuffer;
public class SMTPHandler {
public SMTPHandler()
{
}
void processSMTP(SelectionKey key,smtpQueue mailQ,mtastatus mtastats, config cfg, outboundconnections conn_out)
{
String command = "";
//processSMTP - handles all read/write/error operations for SMTP
//first get the connstatus object from the key
ConnStatus cs = (ConnStatus) key.attachment();
//Step 0 = Connection Just Established -- Send the HELO/EHLO
//Step 1 = Send The MAIL FROM:
//Step 2 = Send the RCPT TO:
//Step 3 = Send DATA
//Step 4 = Send Msg Body to remote
//Step 5 = Send <crlf>.<crlf>
//Step 6 = Receive OK
int error = 0;
//get the envelope
envelope env = cs.getEnvelope();
//if the envelope is null, then get a new envelope and assign it to ConnStatus
if ((env == null) && (mailQ.spool.getQueueSize( ) > 0))
env = getEnvelope(mailQ, cs);
//if there is an envelope to process
if (env != null)
{
//read from the channel to get status codes
error = smtpRead(key,cs);
//evaluate the SMTP error level
System.out.println("Error Level:" + error);
if ((error == 250) || (error == 220))
{
//there isnt an SMTP error so send a response
command = smtpStep(cs,env,mtastats);
//write the response to the channel
writeToChannel(key,command );
}
else
{
System.out.println("Bad Error Code.");
}
}
//reset the keys interests
key.interestOps(SelectionK ey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT);
}
String smtpStep(ConnStatus cs, envelope env, mtastatus mtastats)
{
String command="";
System.out.println("Perfor ming Step " + cs.step);
if (cs.step == 0)
{
//Connection Established send the HELO
command = "HELO localhost.localdomain ";
cs.setStep(1);
}
else if (cs.step == 1)
{
command = "MAIL FROM: <" +env.from+">";
cs.setStep(2);
}
else if (cs.step == 2)
{
command = "RCPT TO: <" +env.to+">";
cs.setStep(3);
}
else if (cs.step == 3)
{
command = "DATA";
cs.setStep(4);
}
else if (cs.step == 4)
{
command=env.messagebody + (char) 13 + (char) 10 + ".";
cs.setStep(5);
}
else if (cs.step == 5)
{
//message has been sent so set env
//System.out.println(input );
if (cs.lastResponse.startsWit h("250"))
{
System.out.println("Messag e Delivered.");
//increment delivery counters
mtastats.incDelivered();
cs.sessionmessages++;
cs.attachEnvelope(null);
//write the next command
command = "";
cs.setStep(1);
}
}
return command;
}
int smtpRead(SelectionKey k, ConnStatus cs)
{
System.out.println("Readin g from Channel.");
//read string
String input = "";
//Perform the read
input = readFromChannel(k);
//if (input.indexOf('\n') > 0)
// input = input.substring(0, input.indexOf('\n'));
System.out.println(input);
//set the lastResponse
cs.setLastResponse(input);
return Integer.parseInt(input.sub string(0,3 ));
}
envelope getEnvelope(smtpQueue mailQ, ConnStatus cs)
{
System.out.println("Gettin g an envelope.");
//the envelope is null so there is no envelope attached to this channel
//grab an envelope from the mailQ
envelope env = (envelope) mailQ.spool.pop();
//assign it to the connection status object
if (env != null)
{
cs.attachEnvelope(env);
System.out.println("Envelo pe Attached.");
}
return env;
}
void closeChannel(SelectionKey k)
{
try
{
SocketChannel sc = (SocketChannel) k.channel();
sc.close();
}
catch(IOException ie)
{
System.out.println("Error closing channel.");
}
}
String readFromChannel(SelectionK ey k)
{
String retVal = "";
try
{
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
SelectionKey key = k;
SocketChannel sc = (SocketChannel) key.channel();
while(sc.read(readBuffer) > 0)
{
readBuffer.flip();
retVal = retVal + new String(readBuffer.array()) ;
}
readBuffer.clear();
}
catch(IOException ie)
{
}
return retVal.trim();
}
void writeToChannel(SelectionKe y k, String output)
{
try
{
SelectionKey key = k;
//add the crlf
output = output + (char) 13 + (char) 10 + (char) 0;
//initialize the buffer
ByteBuffer buffer = ByteBuffer.allocate(output .getBytes( ).length);
//convert the String to a ByteBuffer
buffer.put(output.getBytes ());
SocketChannel sc = (SocketChannel) key.channel();
System.out.println("Writin g:" + output);
buffer.flip();
while(buffer.hasRemaining( ))
{
sc.write(buffer);
}
buffer.clear();
System.out.println("Write Completed.");
}
catch (IOException ie)
{
System.out.println("error performing SMTP write.");
}
}
}
//connstatus code
public class ConnStatus {
int step = 0;
private envelope e = null;
int sessionmessages = 0;
String lastResponse = "";
int type = 0;
public ConnStatus()
{
}
void attachEnvelope(envelope t)
{
t = e;
lastResponse = "";
}
envelope getEnvelope()
{
return e;
}
void setStep(int s)
{
step = s;
}
int getStep()
{
return step;
}
void setLastResponse(String s)
{
lastResponse = s;
}
}
//envelope code
public class envelope {
public String to="";
public String from="";
public String filename="";
public int retries = 0;
public int status = 0;
public String error = "";
public String messagebody="";
public String domain = "";
public envelope(String t, String From, String fn, String dom)
{
//create the new envelope
to = t;
from = From;
filename = fn;
//set the domain
domain = dom;
}
}
A working fix is worth 500 points.
Thanks,
Rick
I am having the identical problem as in the previous post. I got a bit ahead of myself. Here is the new code I am working with (I have only changed a few things around to make the code a bit easier to read). The code stops again after the first write over socket channel. It seems to keep checking for a key, and the selector reports having a key registered, but the selector never reports the key as having a valid operation after the first iteration. The write seems to complete successfully, although for some reason either A) the selector isnt properly detecting operation events or B) the response never gets to its destination (or maybe not completely) so a response is never sent back. Hopefully someone can tell me what I am doing wrong here cuz I have exhausted every idea I could come up with. There is no point in including all the code, as A) the interface for data is proprietary, and B) there are tons of classes that all have been tested thoroughly and work, I have included the classes that are in question.
//Program Output
Found A Key To Service
SMTP Read Write
Getting an envelope.
Envelope Attached.
Reading from Channel.
220 localhost.localdomain ESMTP service ready
Error Level:220
Performing Step 0
Writing:HELO localhost.localdomain
Write Completed.
//Initialization code
// Init the outbound connection manager and the delivery thread
conn_out = new outboundconnections(cfg);
//start the delivery thread
deliveryThread dT = new deliveryThread(conn_out,ne
dT.start();
//Outbound Connections Code
import java.net.InetSocketAddress
import java.nio.channels.*;
import java.io.IOException;
import java.util.Iterator;
public class outboundconnections {
int maxconnections = 0;
Selector connections;
public outboundconnections(config
{
//set the maxconnections
maxconnections = Integer.parseInt(c.getValu
//create the selector
try
{
connections = Selector.open();
//create the new socketchannels
for (int i=0; i < maxconnections; i++)
{
openSocketChannel(c.getVal
}
}
catch(IOException ie)
{
//Error on selector open, socketchannel open, or socketchannel blocking
System.out.println("ERROR creating channel.");
}
}
Iterator getReadyConnections()
{
//this function returns an iterator of the socket channels ready to be processed
//it blocks until there is something to return
int i = 0;
//System.out.println("Keys
try
{
i = connections.select();
}
catch (IOException ie)
{
System.out.println("ERROR While attempting select of connections.");
}
//System.out.println("I=" + i);
if (i > 0)
return connections.selectedKeys()
else
return null;
}
public void openSocketChannel(String host, int port)
{
//register the socketchannel, and control object with the selector
try
{
//try to open a connection
SocketChannel sc = SocketChannel.open(new InetSocketAddress(host,por
//set blocking to no
sc.configureBlocking(false
//register the socketchannel with the selector, and register the control object with the channel
sc.register(connections, SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT, new ConnStatus());
}
catch (IOException ie)
{
System.out.println("Error While Attempting to Open a Connection.");
}
}
}
//Delivery Thread Code
import java.util.*;
import java.nio.channels.*;
public class deliveryThread extends Thread{
private smtpQueue mailQ;
private SelectionKey key;
private mtastatus mtastats;
SMTPHandler smtp = new SMTPHandler();
private outboundconnections conn_out;
config cfg;
public deliveryThread(outboundcon
{
super();
mailQ = s;
mtastats = m;
conn_out = oc;
cfg = cf;
}
public synchronized void run()
{
//while there is not a system_exit event
while (mtastats.system_exit() == false)
{
//get any available keys... this will block until there is a connection to service
//System.out.println("Sear
Iterator it = conn_out.getReadyConnectio
if (it != null)
{
while(it.hasNext())
{
System.out.println("Found A Key To Service");
//get the next key
key = (SelectionKey) it.next();
//service the channel
if (((key != null) && (key.isValid())) && (key.isReadable() || key.isWritable()))
{
//first determine the type of operation required
System.out.println("SMTP Read Write");
//call the smtpEvent handler
smtp.processSMTP(key,mailQ
}
else
System.out.println("Bad Key.");
//reset the key
key = null;
}
}
else
{
try
{
sleep(10);
}
catch(InterruptedException
{
}
}
}
}
}
//SMTP handler code
import java.io.*;
import java.nio.channels.*;
import java.nio.ByteBuffer;
public class SMTPHandler {
public SMTPHandler()
{
}
void processSMTP(SelectionKey key,smtpQueue mailQ,mtastatus mtastats, config cfg, outboundconnections conn_out)
{
String command = "";
//processSMTP - handles all read/write/error operations for SMTP
//first get the connstatus object from the key
ConnStatus cs = (ConnStatus) key.attachment();
//Step 0 = Connection Just Established -- Send the HELO/EHLO
//Step 1 = Send The MAIL FROM:
//Step 2 = Send the RCPT TO:
//Step 3 = Send DATA
//Step 4 = Send Msg Body to remote
//Step 5 = Send <crlf>.<crlf>
//Step 6 = Receive OK
int error = 0;
//get the envelope
envelope env = cs.getEnvelope();
//if the envelope is null, then get a new envelope and assign it to ConnStatus
if ((env == null) && (mailQ.spool.getQueueSize(
env = getEnvelope(mailQ, cs);
//if there is an envelope to process
if (env != null)
{
//read from the channel to get status codes
error = smtpRead(key,cs);
//evaluate the SMTP error level
System.out.println("Error Level:" + error);
if ((error == 250) || (error == 220))
{
//there isnt an SMTP error so send a response
command = smtpStep(cs,env,mtastats);
//write the response to the channel
writeToChannel(key,command
}
else
{
System.out.println("Bad Error Code.");
}
}
//reset the keys interests
key.interestOps(SelectionK
}
String smtpStep(ConnStatus cs, envelope env, mtastatus mtastats)
{
String command="";
System.out.println("Perfor
if (cs.step == 0)
{
//Connection Established send the HELO
command = "HELO localhost.localdomain ";
cs.setStep(1);
}
else if (cs.step == 1)
{
command = "MAIL FROM: <" +env.from+">";
cs.setStep(2);
}
else if (cs.step == 2)
{
command = "RCPT TO: <" +env.to+">";
cs.setStep(3);
}
else if (cs.step == 3)
{
command = "DATA";
cs.setStep(4);
}
else if (cs.step == 4)
{
command=env.messagebody + (char) 13 + (char) 10 + ".";
cs.setStep(5);
}
else if (cs.step == 5)
{
//message has been sent so set env
//System.out.println(input
if (cs.lastResponse.startsWit
{
System.out.println("Messag
//increment delivery counters
mtastats.incDelivered();
cs.sessionmessages++;
cs.attachEnvelope(null);
//write the next command
command = "";
cs.setStep(1);
}
}
return command;
}
int smtpRead(SelectionKey k, ConnStatus cs)
{
System.out.println("Readin
//read string
String input = "";
//Perform the read
input = readFromChannel(k);
//if (input.indexOf('\n') > 0)
// input = input.substring(0, input.indexOf('\n'));
System.out.println(input);
//set the lastResponse
cs.setLastResponse(input);
return Integer.parseInt(input.sub
}
envelope getEnvelope(smtpQueue mailQ, ConnStatus cs)
{
System.out.println("Gettin
//the envelope is null so there is no envelope attached to this channel
//grab an envelope from the mailQ
envelope env = (envelope) mailQ.spool.pop();
//assign it to the connection status object
if (env != null)
{
cs.attachEnvelope(env);
System.out.println("Envelo
}
return env;
}
void closeChannel(SelectionKey k)
{
try
{
SocketChannel sc = (SocketChannel) k.channel();
sc.close();
}
catch(IOException ie)
{
System.out.println("Error closing channel.");
}
}
String readFromChannel(SelectionK
{
String retVal = "";
try
{
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
SelectionKey key = k;
SocketChannel sc = (SocketChannel) key.channel();
while(sc.read(readBuffer) > 0)
{
readBuffer.flip();
retVal = retVal + new String(readBuffer.array())
}
readBuffer.clear();
}
catch(IOException ie)
{
}
return retVal.trim();
}
void writeToChannel(SelectionKe
{
try
{
SelectionKey key = k;
//add the crlf
output = output + (char) 13 + (char) 10 + (char) 0;
//initialize the buffer
ByteBuffer buffer = ByteBuffer.allocate(output
//convert the String to a ByteBuffer
buffer.put(output.getBytes
SocketChannel sc = (SocketChannel) key.channel();
System.out.println("Writin
buffer.flip();
while(buffer.hasRemaining(
{
sc.write(buffer);
}
buffer.clear();
System.out.println("Write Completed.");
}
catch (IOException ie)
{
System.out.println("error performing SMTP write.");
}
}
}
//connstatus code
public class ConnStatus {
int step = 0;
private envelope e = null;
int sessionmessages = 0;
String lastResponse = "";
int type = 0;
public ConnStatus()
{
}
void attachEnvelope(envelope t)
{
t = e;
lastResponse = "";
}
envelope getEnvelope()
{
return e;
}
void setStep(int s)
{
step = s;
}
int getStep()
{
return step;
}
void setLastResponse(String s)
{
lastResponse = s;
}
}
//envelope code
public class envelope {
public String to="";
public String from="";
public String filename="";
public int retries = 0;
public int status = 0;
public String error = "";
public String messagebody="";
public String domain = "";
public envelope(String t, String From, String fn, String dom)
{
//create the new envelope
to = t;
from = From;
filename = fn;
//set the domain
domain = dom;
}
}
A working fix is worth 500 points.
Thanks,
Rick
Try getting rid of
>>private SelectionKey key;
>>private SelectionKey key;
ASKER
CEHJ,
Nope same problem. Stops at the end of the first write.
Thanks,
Rick
Nope same problem. Stops at the end of the first write.
Thanks,
Rick
Remove the resetting too - it's not necessary
ASKER
ok I just wrote a server proggie the emulates an SMTP server in order to see where this thing is stopping. Her's whats happening. The program in question connects to this smtp emulator, the emulator sends a helo statment, the proggie responds by writing HELO localhost.localdomain, the emulator responds with 250 ok, but the proggie never sees it and never throws the SelectionKey.OP_READ event. Based on what I am seeing this has to do with my interest operator settings. Im not sure where to go next. I am not deregistering the operators as per your suggestion but I still have the same problem.
I am registering th enew socket as follows now:
sc.register(connections, SelectionKey.OP_WRITE | SelectionKey.OP_READ | SelectionKey.OP_CONNECT , new ConnStatus());
I am not resetting nor reregistering anything at this point... that is all commented out.
I am registering th enew socket as follows now:
sc.register(connections, SelectionKey.OP_WRITE | SelectionKey.OP_READ | SelectionKey.OP_CONNECT , new ConnStatus());
I am not resetting nor reregistering anything at this point... that is all commented out.
ASKER CERTIFIED SOLUTION
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
btw, good logging practice is *particularly* to be recommended for multi-threaded apps, preferably log4j
ASKER
cehj,
im not following you what kind of debug code would I put in?
As for the logging, I have my own logging module that I will be using. I just havent incorporated it into my code yet.
Thanks,
Rick
im not following you what kind of debug code would I put in?
As for the logging, I have my own logging module that I will be using. I just havent incorporated it into my code yet.
Thanks,
Rick
ASKER
CEHJ,
I got it! Took all day of screwing around but I finally figured it out. I had to rewrite the way the whole thing worked... it turns out that you cannot stack multiple I/O operations on a single loop of the key. I was trying to do a read & a write each time. If you simply divide the operations up into separate iterations and set the keys properly the whole things comes together... and I have to say it FLIES! Here's the modified code:
//Delivery Thread
import java.util.*;
import java.nio.channels.*;
public class deliveryThread extends Thread{
SMTPHandler smtp = new SMTPHandler();
smtpQueue mailQ;
SelectionKey key;
mtastatus mtastats = null;
outboundconnections conn_out;
config cfg;
public deliveryThread(outboundcon nections oc, ConnectionManager c, smtpQueue s, mtastatus m, config cf)
{
super();
mailQ = s;
mtastats = m;
conn_out = oc;
cfg = cf;
}
public synchronized void run()
{
//while there is not a system_exit event
while (mtastats.system_exit() == false)
{
//get any available keys... this will block until there is a connection to service
//System.out.println("Sear ching for connections to service.");
Iterator it = conn_out.getReadyConnectio ns();
if (it != null)
{
while(it.hasNext())
{
//System.out.println("Foun d A Key To Service");
//get the next key
key = (SelectionKey) it.next();
//service the channel
if (((key != null) && (key.isValid())))
{
//call the smtpEvent handler
smtp.processSMTP(key,mailQ ,mtastats, cfg,conn_o ut);
}
else
System.out.println("Bad Key.");
//reset the key
key = null;
}
}
}
}
}
//SMTP Handler
import java.io.*;
import java.nio.channels.*;
import java.nio.ByteBuffer;
public class SMTPHandler {
public SMTPHandler()
{
}
void processSMTP(SelectionKey key,smtpQueue mailQ,mtastatus mtastats, config cfg, outboundconnections conn_out)
{
String command = "";
//processSMTP - handles all read/write/error operations for SMTP
//first get the connstatus object from the key
ConnStatus cs = (ConnStatus) key.attachment();
int error = 0;
//get the envelope
envelope env = cs.getEnvelope();
//if the envelope is null, then get a new envelope and assign it to ConnStatus
if ((env == null) && (mailQ.spool.getQueueSize( ) > 0))
env = getEnvelope(mailQ, cs);
//if this is a read cycle
if (key.isReadable())
{
//read and output the result
error = smtpRead(key,cs);
//System.out.println("From Server: " + cs.lastResponse + '\n');
key.attach(cs);
key.interestOps(SelectionK ey.OP_WRIT E);
}
//if this is a write cycle
if (key.isWritable())
{
//write the command to the screen and the channel
command = smtpStep(cs,env,mtastats,m ailQ);
//write the response to the channel
writeToChannel(key,command );
key.attach(cs);
key.interestOps(SelectionK ey.OP_READ );
}
}
/*//if there is an envelope to process
if (env != null)
{
//read from the channel to get status codes
error = smtpRead(key,cs);
//evaluate the SMTP error level
System.out.println("Error Level:" + error);
if ((error == 250) || (error == 220) || (error == 354))
{
//there isnt an SMTP error so send a response
command = smtpStep(cs,env,mtastats,m ailQ);
//write the response to the channel
writeToChannel(key,command );
}
else
{
System.out.println("Bad Error Code.");
}
}
else
{
System.out.println("no envelope.");
}
*/
String smtpStep(ConnStatus cs, envelope env, mtastatus mtastats,smtpQueue mailQ)
{
//Step 0 = Connection Just Established -- Send the HELO/EHLO
//Step 1 = Send The MAIL FROM:
//Step 2 = Send the RCPT TO:
//Step 3 = Send DATA
//Step 4 = Send Msg Body to remote
//Step 5 = Send <crlf>.<crlf>
//Step 6 = Receive OK
String command="";
//try
//{
//System.out.println("Perf orming Step " + cs.step);
if (cs.step == 0)
{
//Connection Established send the HELO
command = "HELO localhost.localdomain";
cs.setStep(1);
}
else if (cs.step == 1)
{
command = "MAIL FROM: <" +env.from+">";
cs.setStep(2);
}
else if (cs.step == 2)
{
command = "RCPT TO: <" +env.to+">";
cs.setStep(3);
}
else if (cs.step == 3)
{
command = "DATA";
cs.setStep(4);
}
else if (cs.step == 4)
{
command=env.messagebody + (char) 13 + (char) 10 + ".";
cs.setStep(5);
}
else if (cs.step == 5)
{
//message has been sent so set env
//System.out.println(input );
//if (cs.lastResponse.startsWit h("250"))
//{
//System.out.println("Mess age Delivered.");
//increment delivery counters
mtastats.incDelivered();
cs.sessionmessages++;
env = getEnvelope(mailQ,cs);
cs.attachEnvelope(env);
//write the next command
command = "MAIL FROM: <" +env.from+">";;
cs.setStep(2);
//}
}
/*}catch (NullPointerException npe)
{
//a null pointer was found.... so....
System.out.println(npe.get Message()) ;
npe.printStackTrace();
//get a new envelope
env = getEnvelope(mailQ,cs);
cs.attachEnvelope(env);
command = "rset";
cs.setStep(1);
}*/
return command;
}
int smtpRead(SelectionKey k, ConnStatus cs)
{
//System.out.println("Read ing from Channel.");
//read string
String input = "";
//Perform the read
input = readFromChannel(k);
//if (input.indexOf('\n') > 0)
// input = input.substring(0, input.indexOf('\n'));
//set the lastResponse
cs.setLastResponse(input);
return Integer.parseInt(input.sub string(0,3 ));
}
envelope getEnvelope(smtpQueue mailQ, ConnStatus cs)
{
//System.out.println("Gett ing an envelope.");
//the envelope is null so there is no envelope attached to this channel
//grab an envelope from the mailQ
envelope env;
env = (envelope) mailQ.spool.pop();
//assign it to the connection status object
if (env != null)
{
cs.attachEnvelope(env);
//System.out.println("Enve lope Attached.");
}
return env;
}
void closeChannel(SelectionKey k)
{
try
{
SocketChannel sc = (SocketChannel) k.channel();
sc.close();
}
catch(IOException ie)
{
System.out.println("Error closing channel.");
}
}
String readFromChannel(SelectionK ey k)
{
String retVal = "";
try
{
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
SelectionKey key = k;
SocketChannel sc = (SocketChannel) key.channel();
while(sc.read(readBuffer) > 0)
{
readBuffer.flip();
retVal = retVal + new String(readBuffer.array()) ;
}
readBuffer.clear();
}
catch(IOException ie)
{
System.out.println("error reading from channel");
}
return retVal.trim();
}
void writeToChannel(SelectionKe y k, String output)
{
try
{
SelectionKey key = k;
//add the crlf
output = output + (char) 13 + (char) 10;
//initialize the buffer
ByteBuffer buffer = ByteBuffer.allocate(output .getBytes( ).length);
buffer.clear();
//convert the String to a ByteBuffer
buffer.put(output.getBytes ());
SocketChannel sc = (SocketChannel) key.channel();
//System.out.println("To Server:" + output);
buffer.flip();
sc.write(buffer);
buffer.clear();
//System.out.println("Writ e Completed.");
}
catch (IOException ie)
{
System.out.println("error performing SMTP write.");
}
}
}
Thanks for the help.
-Rick
I got it! Took all day of screwing around but I finally figured it out. I had to rewrite the way the whole thing worked... it turns out that you cannot stack multiple I/O operations on a single loop of the key. I was trying to do a read & a write each time. If you simply divide the operations up into separate iterations and set the keys properly the whole things comes together... and I have to say it FLIES! Here's the modified code:
//Delivery Thread
import java.util.*;
import java.nio.channels.*;
public class deliveryThread extends Thread{
SMTPHandler smtp = new SMTPHandler();
smtpQueue mailQ;
SelectionKey key;
mtastatus mtastats = null;
outboundconnections conn_out;
config cfg;
public deliveryThread(outboundcon
{
super();
mailQ = s;
mtastats = m;
conn_out = oc;
cfg = cf;
}
public synchronized void run()
{
//while there is not a system_exit event
while (mtastats.system_exit() == false)
{
//get any available keys... this will block until there is a connection to service
//System.out.println("Sear
Iterator it = conn_out.getReadyConnectio
if (it != null)
{
while(it.hasNext())
{
//System.out.println("Foun
//get the next key
key = (SelectionKey) it.next();
//service the channel
if (((key != null) && (key.isValid())))
{
//call the smtpEvent handler
smtp.processSMTP(key,mailQ
}
else
System.out.println("Bad Key.");
//reset the key
key = null;
}
}
}
}
}
//SMTP Handler
import java.io.*;
import java.nio.channels.*;
import java.nio.ByteBuffer;
public class SMTPHandler {
public SMTPHandler()
{
}
void processSMTP(SelectionKey key,smtpQueue mailQ,mtastatus mtastats, config cfg, outboundconnections conn_out)
{
String command = "";
//processSMTP - handles all read/write/error operations for SMTP
//first get the connstatus object from the key
ConnStatus cs = (ConnStatus) key.attachment();
int error = 0;
//get the envelope
envelope env = cs.getEnvelope();
//if the envelope is null, then get a new envelope and assign it to ConnStatus
if ((env == null) && (mailQ.spool.getQueueSize(
env = getEnvelope(mailQ, cs);
//if this is a read cycle
if (key.isReadable())
{
//read and output the result
error = smtpRead(key,cs);
//System.out.println("From
key.attach(cs);
key.interestOps(SelectionK
}
//if this is a write cycle
if (key.isWritable())
{
//write the command to the screen and the channel
command = smtpStep(cs,env,mtastats,m
//write the response to the channel
writeToChannel(key,command
key.attach(cs);
key.interestOps(SelectionK
}
}
/*//if there is an envelope to process
if (env != null)
{
//read from the channel to get status codes
error = smtpRead(key,cs);
//evaluate the SMTP error level
System.out.println("Error Level:" + error);
if ((error == 250) || (error == 220) || (error == 354))
{
//there isnt an SMTP error so send a response
command = smtpStep(cs,env,mtastats,m
//write the response to the channel
writeToChannel(key,command
}
else
{
System.out.println("Bad Error Code.");
}
}
else
{
System.out.println("no envelope.");
}
*/
String smtpStep(ConnStatus cs, envelope env, mtastatus mtastats,smtpQueue mailQ)
{
//Step 0 = Connection Just Established -- Send the HELO/EHLO
//Step 1 = Send The MAIL FROM:
//Step 2 = Send the RCPT TO:
//Step 3 = Send DATA
//Step 4 = Send Msg Body to remote
//Step 5 = Send <crlf>.<crlf>
//Step 6 = Receive OK
String command="";
//try
//{
//System.out.println("Perf
if (cs.step == 0)
{
//Connection Established send the HELO
command = "HELO localhost.localdomain";
cs.setStep(1);
}
else if (cs.step == 1)
{
command = "MAIL FROM: <" +env.from+">";
cs.setStep(2);
}
else if (cs.step == 2)
{
command = "RCPT TO: <" +env.to+">";
cs.setStep(3);
}
else if (cs.step == 3)
{
command = "DATA";
cs.setStep(4);
}
else if (cs.step == 4)
{
command=env.messagebody + (char) 13 + (char) 10 + ".";
cs.setStep(5);
}
else if (cs.step == 5)
{
//message has been sent so set env
//System.out.println(input
//if (cs.lastResponse.startsWit
//{
//System.out.println("Mess
//increment delivery counters
mtastats.incDelivered();
cs.sessionmessages++;
env = getEnvelope(mailQ,cs);
cs.attachEnvelope(env);
//write the next command
command = "MAIL FROM: <" +env.from+">";;
cs.setStep(2);
//}
}
/*}catch (NullPointerException npe)
{
//a null pointer was found.... so....
System.out.println(npe.get
npe.printStackTrace();
//get a new envelope
env = getEnvelope(mailQ,cs);
cs.attachEnvelope(env);
command = "rset";
cs.setStep(1);
}*/
return command;
}
int smtpRead(SelectionKey k, ConnStatus cs)
{
//System.out.println("Read
//read string
String input = "";
//Perform the read
input = readFromChannel(k);
//if (input.indexOf('\n') > 0)
// input = input.substring(0, input.indexOf('\n'));
//set the lastResponse
cs.setLastResponse(input);
return Integer.parseInt(input.sub
}
envelope getEnvelope(smtpQueue mailQ, ConnStatus cs)
{
//System.out.println("Gett
//the envelope is null so there is no envelope attached to this channel
//grab an envelope from the mailQ
envelope env;
env = (envelope) mailQ.spool.pop();
//assign it to the connection status object
if (env != null)
{
cs.attachEnvelope(env);
//System.out.println("Enve
}
return env;
}
void closeChannel(SelectionKey k)
{
try
{
SocketChannel sc = (SocketChannel) k.channel();
sc.close();
}
catch(IOException ie)
{
System.out.println("Error closing channel.");
}
}
String readFromChannel(SelectionK
{
String retVal = "";
try
{
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
SelectionKey key = k;
SocketChannel sc = (SocketChannel) key.channel();
while(sc.read(readBuffer) > 0)
{
readBuffer.flip();
retVal = retVal + new String(readBuffer.array())
}
readBuffer.clear();
}
catch(IOException ie)
{
System.out.println("error reading from channel");
}
return retVal.trim();
}
void writeToChannel(SelectionKe
{
try
{
SelectionKey key = k;
//add the crlf
output = output + (char) 13 + (char) 10;
//initialize the buffer
ByteBuffer buffer = ByteBuffer.allocate(output
buffer.clear();
//convert the String to a ByteBuffer
buffer.put(output.getBytes
SocketChannel sc = (SocketChannel) key.channel();
//System.out.println("To Server:" + output);
buffer.flip();
sc.write(buffer);
buffer.clear();
//System.out.println("Writ
}
catch (IOException ie)
{
System.out.println("error performing SMTP write.");
}
}
}
Thanks for the help.
-Rick
Glad you got it working
>>As for the logging, I have my own logging module that I will be using.
That's OK - be sure to implement it with log4j ;-)
I haven't done enough work for 500 points so feel free to get them back
>>As for the logging, I have my own logging module that I will be using.
That's OK - be sure to implement it with log4j ;-)
I haven't done enough work for 500 points so feel free to get them back
ASKER
CEHJ,
actually they are yours. Im so happy right now... I have this thing pushing with one thread and 2 connections a constant 163,000 emails per minute.
Thanks,
Rick
actually they are yours. Im so happy right now... I have this thing pushing with one thread and 2 connections a constant 163,000 emails per minute.
Thanks,
Rick
:-)
>>I have this thing pushing with one thread and 2 connections a constant 163,000 emails per minute.
Just so long as you don't point it in my direction .. ;-)
>>I have this thing pushing with one thread and 2 connections a constant 163,000 emails per minute.
Just so long as you don't point it in my direction .. ;-)
ASKER
actually I had to clarify with the customer what this was to be used for before I accepted the project. Its not for outgoing email, its for routing large amounts of company email to various independent locations. :-)
ASKER
Thanks,
Rick