richardsimnett
asked on
Java.nio setting the socket channel output buffer size
Hello,
Is there a way to set the output buffer size of a SocketChannel from within java or is this something controlled on the OS side of things?
Cheers,
Rick
Is there a way to set the output buffer size of a SocketChannel from within java or is this something controlled on the OS side of things?
Cheers,
Rick
No afaik
Why do you want to do this? There has to be a work-around, like sending not more than a certain fixed length of data everytime. The size should ideally not be fixed because the socket can be used for any type of communication, e.g., making an HTTP request and reading the contents of a web-page, in which case the number of bytes that you read could be anything.
ASKER
mayankeagle,
I want to do this because I have written a mail feeder that utilizes the SMTP protocol. To get the rate of speed required I need to use the java.nio package... and for the most part this works extremely efficiently. But when the size of the email body gets to be 9k or greater, nio cannot write the whole chunk of data at once to the socket channel. Because of this I have a while loop which executes until all of the data was written. When I do this however, the performance degrades horribly for larger email sizes. My thinking is that if I can perform a calculation to figure out the optimal size of the output buffer, and then set the size on a per campaign basis I can minimize the degradation and get a more constant line of performance.
Cheers,
Rick
I want to do this because I have written a mail feeder that utilizes the SMTP protocol. To get the rate of speed required I need to use the java.nio package... and for the most part this works extremely efficiently. But when the size of the email body gets to be 9k or greater, nio cannot write the whole chunk of data at once to the socket channel. Because of this I have a while loop which executes until all of the data was written. When I do this however, the performance degrades horribly for larger email sizes. My thinking is that if I can perform a calculation to figure out the optimal size of the output buffer, and then set the size on a per campaign basis I can minimize the degradation and get a more constant line of performance.
Cheers,
Rick
ASKER
CEHJ,
what does 'afaik' mean?
Cheers,
Rick
what does 'afaik' mean?
Cheers,
Rick
As Far As I Know
The effect you mention may not be influencable by altering the buffer size, even if you could. It could be caused by network effects
ASKER
CEHJ,
I'm pretty sure that the network is not the issue... the test equipment is a dual xeon box, with a dedicated 100mb lan, all top of the line hardware. The problem is definitely occurring on the box as opposed to network traffic.
Cheers,
Rick
I'm pretty sure that the network is not the issue... the test equipment is a dual xeon box, with a dedicated 100mb lan, all top of the line hardware. The problem is definitely occurring on the box as opposed to network traffic.
Cheers,
Rick
ASKER CERTIFIED SOLUTION
membership
This solution is only available to members.
To access this solution, you must be a member of Experts Exchange.
That won't help
ASKER
CEHJ,
What wont help? The selector?
Cheers,
Rick
What wont help? The selector?
Cheers,
Rick
>>What wont help? The selector?
Yes. If you have a blockage *writing* to a Socket it's not likely to help if it's that data not being pushed over the wire that's your concern (which i seem to remember is the case)
Yes. If you have a blockage *writing* to a Socket it's not likely to help if it's that data not being pushed over the wire that's your concern (which i seem to remember is the case)
ASKER
You are right about this. The problem isnt with the wire however, its writing the entire 9k or greater at once to the socket channel. if it overflows the buffer I now have a while loop just waiting until its ready to go. I have a few thoughts about a different way of approaching the problem that I think may be work well assuming certain limitations can be overcome.
The idea is to check the available size of the output buffer for the socket channel prior to attempting the write. If the space is available then write all the data, if not wait until the next cycle of the thread, if the space is available then write the data, etc, and this will happen indefinitely until the write occurs.
Now there is one major problem with this approach that I see. First, if the amount of data is always going to be greater than the output buffer then the thread will not accomplish anything.
That is why I need to be able to set the socket channel output size. I seem to recall reading something about using properties classes within java to set things like that, but I have not yet figured out how the properties classes work exactly so that is an option that may or may not come to fruition.
What are your thoughts?
Cheers,
Rick
The idea is to check the available size of the output buffer for the socket channel prior to attempting the write. If the space is available then write all the data, if not wait until the next cycle of the thread, if the space is available then write the data, etc, and this will happen indefinitely until the write occurs.
Now there is one major problem with this approach that I see. First, if the amount of data is always going to be greater than the output buffer then the thread will not accomplish anything.
That is why I need to be able to set the socket channel output size. I seem to recall reading something about using properties classes within java to set things like that, but I have not yet figured out how the properties classes work exactly so that is an option that may or may not come to fruition.
What are your thoughts?
Cheers,
Rick
ASKER
CEHJ,
Just another thought.... Its hard to believe that its not possible to set the sendbuffersize, as you can do so with a normal java socket method setSendBufferSize(int size) . There's got to be a way to accomplish this. Its just a matter of tracking it down at this point.
Cheers,
Rick Simnett
Just another thought.... Its hard to believe that its not possible to set the sendbuffersize, as you can do so with a normal java socket method setSendBufferSize(int size) . There's got to be a way to accomplish this. Its just a matter of tracking it down at this point.
Cheers,
Rick Simnett
you don't need to set the buffer size.
ASKER
CEHJ,
Do you think it is somehow possbile to grab an instance of the socket from the socketchannel (using the socket() method in socketchannel), and directly change the settings?
Cheers,
Rick
Do you think it is somehow possbile to grab an instance of the socket from the socketchannel (using the socket() method in socketchannel), and directly change the settings?
Cheers,
Rick
ASKER
objects,
why do u say I do not need to set the buffersize? I think that is the best solution at this time, short of a complete rewrite of the code. Which is not something I definitely dont want to do....
Cheers,
Rick
why do u say I do not need to set the buffersize? I think that is the best solution at this time, short of a complete rewrite of the code. Which is not something I definitely dont want to do....
Cheers,
Rick
ASKER
errr...
definitely not something I want to do.....
haha... its after 7pm again.. im fried.
Cheers,
Rick
definitely not something I want to do.....
haha... its after 7pm again.. im fried.
Cheers,
Rick
if u want use nio then you need to understand about selectors.
unless cehj has any better suggestions which does not appear to be the case.
unless cehj has any better suggestions which does not appear to be the case.
ASKER
Objects / CEHJ,
Here is the complete code of the mail feeder app I have written. As it is I do not use any selectors, everything works great with exception of large email sizes. Its extremely fast, and works 100% except for this problem. The big performance issue is found in the sockConn class, in the writeln method. As you can see I am looping until the write is complete. I think if i can just get the buffer bigger the speed will be normal for larger emails. Basically, a 1.5k email I can deliver at
rates of 37,000 emails a minute. But when you jump up to 9k, the number of deliveries per minute is less than 1000, and the bottleneck is that while loop. If I can somehow make the buffer larger the looping will be minimized. I know this is a very long post, and I apologize for it, but I want you guys to reallly see what I am working with here.... In a way I have almost emulated selectors with the getstatus function in sockConn.
Let me know what you think I should do.
Cheers,
Rick
// MPFEEDER Class main class
//MailPro Delivery Feeder
//Scope: The MailPro Delivery Feeder is a stand-alone application that generates and transmits
//email at very high rates. The requiring is the algorithm that must be followed, and the pre-planned
//solutions.
//1.Initialize
//2.Query SQL Database for appropriate record.
//3.Set the records status.
//4.Loop
//5.Generate The Email
//6.Push the rendered email to storage.
//7.Push the email to the MTA.
//8.End Loop
/*
Required Classes:
Storage Classes:
Campaign Class - the campaign class is the most basic class for any outgoing campaign.
It stores the complete campaign record from the database. It is responsible for the
rendering of all outgoing email.
Email Holder Class - the email holder class basically holds each of the "rendered" email
messages for the delivery thread to pick up and deliver. It can hold X amount of messages,
each message stored can be retrieved, or deleted by its reference number.
Config Class - the config class holds the basic running values of the MailPro Feeder. These
values are loaded from the SQL database. The data includes the number of delivery threads,
MTA IP's and port configurations. In addition to this it will read from a base db.dat config
file on startup, this file contains the database configuration and access information.
Interface Classes:
Database Class - communicates with the database server. Must open connection on start up
and query the configuration and campaign information. IN addition to this it must have
functions to run queries, and return datasets.
Connection Class - manages connections to the MTA. It provides port status, as well as
read / write abilities, it opens the ports and closes the ports. It also detects the type
of MTA it is connected to (SMTP / PMM).
SMTP Class - works with Connection class and delivery thread to provide an smtp interface
for delivery.
PMM Class - works with connection class and delivery thread to provide an PMTA mail merge
interface.
Recipients Class - an interface to provide row by row data to the render thread. In addition,
to its primary role it provides status of the data... ie done status to the rest of the program.
Worker Threads:
Render Thread - based on the status of the email holder class (does the holder class have
any open containers) the render class pulls the email body, and header information from the
campaign class, the next recipients database row and renders the final outgoing mailpiece
and dumps it to the email holder class.
Delivery Thread - the delivery thread works with the connection class and the email holder. It
rapidly checks the status of each connection in a round robin type of manner. If the connection
reports that it is ready it will work with the email holder class and the set protocol interface
for the port to send each email through to the mta. Once the given message is complete it deletes
it from the email holder.
*/
import java.lang.*;
import java.io.*;
import java.util.*;
public class mpfeeder
{
public static void main(String[] args) throws IOException
{
//THIS IS THE ACTUAL CODE
/* INITIALIZATION */
//First initialize the config class
config sCfg = new config();
//Now initialize the campaign class
campaign cmpStorage = new campaign(sCfg);
//Now initialize email holder class
emailHolder eHolder = new emailHolder(sCfg);
//initialize the recipients class
recipients rcpt = new recipients(sCfg,cmpStorage );
//finally initialize the connection class
connections cInt = new connections(sCfg,cmpStorag e);
//set the campaign status
cmpStorage.setCampaignStat us(0);
/* END OF INITIALIATION */
/* START THREADS */
render rThread = new render(sCfg, cmpStorage, eHolder, rcpt);
rThread.setPriority(2);
rThread.start();
delivery dThread = new delivery(sCfg, eHolder, cInt,cmpStorage);
dThread.setPriority(2);
dThread.start();
statusThread sThread = new statusThread(sCfg,eHolder, cInt, cmpStorage);
sThread.setPriority(2);
sThread.start();
/* END OF THREADS */
}
}
//Campaign class
import java.io.*;
import java.util.*;
import java.text.*;
public class campaign
{
//campaign storage values
Hashtable campaigns;
public int campaign = -1;
config mCfg;
String[] subjectLines;
String[] fromLines;
int sbjCount = 0;
int frmCount = 0;
String tablename = "";
String htmlFooter = "";
String textFooter = "";
public campaign(config sCfg)
{
//query the database for an active campaign
mCfg = sCfg;
database application = new database(sCfg.dbURL(1),sCf g.dbUser(1 ),sCfg.dbP assword(1) );
//build the maildate
Date today;
String mdate;
SimpleDateFormat formatter;
formatter = new SimpleDateFormat("Mddyyyy" );
today = new Date();
mdate = formatter.format(today);
if (application.executeQuery( "select top 1 * from campaigns where maildate <= " + mdate + " and completed = -1 order by id asc") == 1)
{
campaigns = application.getNextRow();
}
else
{
System.out.println("Initia lization Error : There are no campaigns to generate.");
}
application.closeDBConnect ion1();
application.closeDBConnect ion2();
String t = (String) campaigns.get("subjectline s");
subjectLines = t.split("\n");
t = (String) campaigns.get("fromlines") ;
fromLines = t.split("\n");
application = new database(mCfg.dbURL(1),mCf g.dbUser(1 ),mCfg.dbP assword(1) );
if (application.executeQuery( "select datatablename from lists where id = " + (String) campaigns.get("listid")) > 0)
{
tablename = (String) (application.getNextRow()) .get("data tablename" );
}
else
{
System.out.println("Initia lization Error: Email List does not exist in lists table.");
}
application.closeDBConnect ion1();
application.closeDBConnect ion2();
application = new database(mCfg.dbURL(1),mCf g.dbUser(1 ),mCfg.dbP assword(1) );
if (application.executeQuery( "select * from footers") > 0)
{
Hashtable temp = application.getNextRow();
htmlFooter = (String) temp.get("htmlfooter");
textFooter = (String) temp.get("textfooter");
}
else
{
System.out.println("Initia liation Error: No Footers Defined. CAN-SPAM non-compliance issue.");
}
application.closeDBConnect ion1();
application.closeDBConnect ion2();
}
String getNextSubject()
{
String temp = subjectLines[sbjCount];
++sbjCount;
if (sbjCount == subjectLines.length)
{
sbjCount = 0;
}
return temp;
}
String getNextFrom()
{
String temp = fromLines[frmCount];
++frmCount;
if (frmCount == fromLines.length)
{
frmCount = 0;
}
return temp;
}
String getValue(String key)
{
//return the value for the given key
return (String) campaigns.get(key);
}
void setCampaignStatus(int status)
{
//set the campaign status
database application = new database(mCfg.dbURL(1),mCf g.dbUser(1 ),mCfg.dbP assword(1) );
if (status == 0)
{
application.executeUpdate( "update campaigns set status = 'Transport In Progress.', intstatus = 1 where id = " + getValue("id"));
}
else if (status == 1)
{
application.executeUpdate( "update campaigns set completed = 1, status = 'Done.', intstatus = 0 where id = " + getValue("id"));
}
application.closeDBConnect ion1();
application.closeDBConnect ion2();
}
}
//CONFIG Class
import java.lang.*;
import java.util.Hashtable;
import java.io.*;
import java.util.StringTokenizer;
import java.util.Enumeration;
public class config {
Hashtable mCfg;
int ErrorLevel = 0;
String[] domains;
int domainCount = 0;
String[] mailservers;
String[] ports;
int[] maxconnections;
public config() {
//mxcfg constructor
//creates new config hash
mCfg = new Hashtable();
if (loadConfig() == false)
{
ErrorLevel = 1;
}
//First get the list of useable domains
database application = new database(dbURL(1),dbUser(1 ),dbPasswo rd(1));
int rcount = application.executeQuery(" select domain from domains");
if (rcount > 0)
{
domains = new String[rcount];
for (int i = 1; i <= rcount; i++)
{
Hashtable temp = application.getNextRow();
domains[i-1] = (String) temp.get("domain");
}
application.closeDBConnect ion1();
application.closeDBConnect ion2();
}
else
{
System.out.println("Initia lization Error: No Domains Defined.");
}
//Next get the mtaservers
application = new database(dbURL(1),dbUser(1 ),dbPasswo rd(1));
rcount = application.executeQuery(" select mtaserver,port,maxconnecti ons from mtaservers");
if (rcount > 0)
{
mailservers = new String[rcount];
ports = new String[rcount];
maxconnections = new int[rcount];
for (int i = 1; i <=rcount; i++)
{
Hashtable temp = application.getNextRow();
mailservers[i-1] = (String) temp.get("mtaserver");
ports[i-1] = (String) temp.get("port");
maxconnections[i-1] = new Integer((String) temp.get("maxconnections") ).intValue ();
}
application.closeDBConnect ion1();
application.closeDBConnect ion2();
}
else
{
System.out.println("Initia lization Error: No Transports Defined.");
}
}
String getNextDomain()
{
String temp = domains[domainCount];
++domainCount;
if (domainCount == domains.length)
{
domainCount = 0;
}
return temp;
}
public void addOption(String Key, Object value)
{
//Adds a new configuration key & value combination to the
//mCfg hash
mCfg.put(Key,value);
}
String dbURL(int i)
{
// return the database string
String URL = "";
URL = "jdbc:microsoft:sqlserver: //" + (String) mCfg.get("database" + i) + ":" + (String) mCfg.get("dbport" + i) + ";DatabaseName=" + (String) mCfg.get("datasource"+i);
return URL;
}
String dbUser(int i)
{
//return the username for the given connection
return (String) mCfg.get("dbuser" + i);
}
String dbPassword(int i)
{
//return the username for the given connection
return (String) mCfg.get("dbpassword" + i);
}
public void removeOption(String Key)
{
//Removes an option bucket from the hashtable
mCfg.remove(Key);
}
public boolean loadConfig()
{
//Loads the configuration file from the config.ini file in the same directory
int Error = 0; //Error Level Indicator
//Catch error opening file for reading
try
{
//First Open mta.ini
BufferedReader in = new BufferedReader(new FileReader("db.dat"));
//1-Next read line by line config options
String cfgLine = "";
while ((cfgLine = in.readLine()) != null)
{
//2-for each individual line split the string in two utilizing StringTokenizer class on = delimiter
StringTokenizer st = new StringTokenizer(cfgLine,"= ");
String token1 = st.nextToken();
String token2 = st.nextToken();
addOption(token1,token2);
}
//4-repeat steps 1 - 4 until EOF
in.close();
}
catch (IOException e)
{
//if error level status Error = 2;
}
if (Error == 0)
{
return true;
}
else
{
return false;
}
}
public boolean saveConfig()
{
//Dumps the current hash table to a file in [KEY]=VALUE Pairs
//Catch error opening file for writing
int Error = 0;
//Open file mta.ini
try
{
BufferedWriter out = new BufferedWriter(new FileWriter("mta.ini"));
//1-Parse hashtable for next key,value combination
for (Enumeration keys = mCfg.keys() ; keys.hasMoreElements() ;)
{
String aKey = keys.nextElement().toStrin g();
out.write(aKey+"="+mCfg.ge t(aKey));
out.newLine();
}
out.close();
}
catch(IOException e)
{
ErrorLevel=2;
Error = 1;
}
if (Error == 0)
{
return true;
}
else
{
return false;
}
}
public Object getValue(String KEY)
{
//return the value associated with given key
return mCfg.get(KEY);
}
}
//Connections Class
import java.util.Hashtable;
import java.io.*;
import java.util.StringTokenizer;
import java.util.Enumeration;
public class connections
{
config dCfg;
sockConn[] sConnections;
int socketCount = 0;
public connections(config sCfg, campaign cmp)
{
dCfg = sCfg;
//System.out.println(dCfg. maxconnect ions[0] + " " + dCfg.ports[0]);
sConnections = new sockConn[dCfg.maxconnectio ns[0]];
for (int i = 0; i<=dCfg.maxconnections[0]- 1; i++)
{
sConnections[i] = new sockConn(cmp);
sConnections[i].connect(dC fg.mailser vers[0],dC fg.ports[0 ]);
//System.out.println("Conn ect Call #" + i);
}
}
int getSocketStatus(int cNum)
{
//code = -3 : Not Connected
//code = -2 : Not Connected but a connection is pending
//code = -1 : Not Connected but Domains in the queue (reconnect)
//code = 0 : Connected but no data
//code = 1 : Connected with Data Present
return sConnections[cNum].getStat us();
}
String readSocket(int cNum)
{
return sConnections[cNum].readln( );
}
void writeSocket(int cNum, String outs)
{
sConnections[cNum].writeln (outs);
}
}
//DATABASE CLASS
import java.lang.*;
import java.io.*;
import java.sql.*;
import java.util.*;
public class database
{
private Connection con;
private Statement stmt;
private ResultSet rs;
public int Status = 0;
private ResultSetMetaData rsmd;
private int numberOfColumns;
public database(String URL, String Username, String Password)
{
//now open the database connection to SQL Server
String url = URL;
try {
Class.forName("com.microso ft.jdbc.sq lserver.SQ LServerDri ver");
} catch(java.lang.ClassNotFo undExcepti on e) {
Status = -1;
System.err.print("ClassNot FoundExcep tion: ");
System.err.println(e.getMe ssage());
}
try {
con = DriverManager.getConnectio n(url, Username, Password);
}
catch(SQLException ex)
{
Status = -1;
System.err.print("SQLExcep tion - SQLServer - openConn(): ");
System.err.println(ex.getM essage());
}
//we are now ready to query the database
}
int executeQuery(String query)
{
int rowCount = 0;
try
{
stmt = con.createStatement(Result Set.TYPE_S CROLL_INSE NSITIVE, ResultSet.CONCUR_READ_ONLY );
rs = stmt.executeQuery(query);
rsmd = rs.getMetaData();
numberOfColumns = rsmd.getColumnCount();
rs.last();
rowCount = rs.getRow();
rs.first();
}
catch(SQLException ex)
{
Status = -1;
System.err.print("SQLExcep tion - SQLServer - executeQuery(): ");
System.err.println(ex.getM essage());
return -1;
}
return rowCount;
}
int executeUpdate(String query)
{
try
{
stmt= con.createStatement();
stmt.executeUpdate(query);
}
catch(SQLException ex)
{
Status = -1;
System.err.print("SQLExcep tion - SQLServer - executeQuery(): ");
System.err.println(ex.getM essage());
return -1;
}
return 1;
}
int closeDBConnection1()
{
//close the database connection
try{
stmt.close();
}
catch(SQLException ex)
{
Status = -1;
System.err.print("SQLExcep tion - SQLServer - closeDBConnection1(): ");
System.err.println(ex.getM essage());
return 0;
}
return 1;
}
int closeDBConnection2()
{
//close the database connection
try{
con.close();
}
catch(SQLException ex)
{
Status = -1;
System.err.print("SQLExcep tion - SQLServer - closeDBConnection2(): ");
System.err.println(ex.getM essage());
return 0;
}
return 1;
}
Hashtable getNextRow()
{
Hashtable temp = new Hashtable(50);
//parse the rs.next() and build the hashtable resultset
//if there aren't anymore rows then set the status to -1
//build the hashtable
try
{
for (int i = 1; i <= numberOfColumns; i++)
{
String column = rsmd.getColumnName(i);
String data = rs.getString(i);
if (data == null)
data = "";
temp.put(column,data);
}
rs.next();
if (rs.isAfterLast())
{
closeDBConnection1();
closeDBConnection2();
Status = -1;
}
}
catch(SQLException ex)
{
Status = -1;
System.err.print("SQLExcep tion - SQL Server - getNextRow(): ");
System.err.println(ex.getM essage());
}
return temp;
}
}
//DELIVERY THREAD
import java.io.*;
import java.net.*;
import java.lang.reflect.*;
import java.util.*;
public class delivery extends Thread{
config mCfg;
emailHolder eHolder;
connections cInt;
campaign D;
boolean working = true;
public delivery(config sCfg, emailHolder eh, connections dInt, campaign C)
{
mCfg = sCfg;
eHolder = eh;
cInt = dInt;
D = C;
}
public void run()
{
int msgCount = 1;
int failed = 0;
while (working)
{
//get the next message if -1 and eh.status = -1 then end
int mid = eHolder.nextMessage();
//System.out.print("Failed : " + failed);
if ((mid > -1))
{
//ok we have a message to work with so
//first check the status of sockConn[mid]
int sockStatus = cInt.getSocketStatus(mid);
//System.out.print("#" + sockStatus + "#");
if (sockStatus == -3)
{
cInt.sConnections[mid].con nect(mCfg. mailserver s[0],mCfg. ports[0]);
}
else if (sockStatus == -2)
{
//try to connect
try
{
cInt.sConnections[mid].sc. finishConn ect();
}
catch(IOException ie)
{
}
}
else if (sockStatus == 0)
{
int step = cInt.sConnections[mid].whe re;
if (step == 1)
{
email temp = eHolder.getMessage(mid);
if (temp != null)
{
//System.out.println(temp. from2);
cInt.writeSocket(mid,"MAIL FROM: " + temp.from2);
cInt.sConnections[mid].whe re = 2;
}
}
}
else if (sockStatus == 1)
{
//System.out.println(mid + " Sock Status - Delivery: Connected. Data Present");
String inStr = cInt.readSocket(mid);
//System.out.print(inStr);
//get the message
//System.out.println(inStr );
int step = cInt.sConnections[mid].whe re;
//System.out.println("STEP : " + step);
if (step == 0)
{
//The connection is new so lets detect if it is PMTA or not
int mType = inStr.indexOf("PowerMTA");
if (mType > 0)
cInt.sConnections[mid].pro tocolType = 1;
//based on protocol type respond
cInt.writeSocket(mid,"HELO mpfeeder.mailpro");
cInt.sConnections[mid].whe re = 1;
}
else if (step == 1)
{
//the machine obviously responded so lets send it a mail from:
if (inStr.indexOf("250")> -1)
{
//Look for the OK status... it was found so lets send the mail from line:
email temp = eHolder.getMessage(mid);
if (temp != null)
{
cInt.writeSocket(mid,"MAIL FROM: " + temp.from2);
cInt.sConnections[mid].whe re = 2;
}
}
else
{
if (inStr.charAt(1) == '5')
{
//5XX series Error
eHolder.eHolder[mid] = null;
failed++;cInt.writeSocket( mid,"RSET" );
cInt.sConnections[mid].whe re=1;
}
else if (inStr.charAt(0) == '4')
{
//4XX series Error
}
}
}
else if (step == 2)
{
//the machine responded so lets check for OK and sned it a RCPT TO:
if (inStr.indexOf("250")> -1)
{
//look for the OK status.... it was found so send the rcpt to:
email temp = eHolder.getMessage(mid);
cInt.writeSocket(mid,"RCPT TO: <" + temp.to + ">");
cInt.sConnections[mid].whe re = 3;
}
else
{
if (inStr.charAt(1) == '5')
{
//5XX series Error
eHolder.eHolder[mid] = null;
failed++;
cInt.writeSocket(mid,"RSET ");
cInt.sConnections[mid].whe re=1;
}
else if (inStr.charAt(0) == '4')
{
//4XX series Error
}
}
}
else if (step == 3)
{
//send the data command
if (inStr.indexOf("250")>-1)
{
//look for the OK status.... send the Data command
cInt.writeSocket(mid,"DATA ");
cInt.sConnections[mid].whe re = 4;
}
else
{
if (inStr.charAt(1) == '5')
{
//5XX series Error
eHolder.eHolder[mid] = null;
failed++;
cInt.writeSocket(mid,"RSET ");
cInt.sConnections[mid].whe re=1;
}
else if (inStr.charAt(0) == '4')
{
//4XX series Error
}
}
}
else if (step == 4)
{
//send the email body
if (inStr.indexOf("354") > -1)
{
email temp = eHolder.getMessage(mid);
String emailBody = "X-MPX: " + temp.messageid + "\n";
if(D.getValue("emailtype") == "1")
{
emailBody = emailBody + "MIME-Version: 1.0\nContent-Type: text/html;\n Content-Transfer-Encoding: 8bit\n";
}
else if (D.getValue("emailtype") == "0")
{
//Insert text & html MIME header
emailBody = emailBody + "MIME-Version: 1.0\nContent-Type: multipart/alternative;\n Content-Transfer-Encoding: 8bit\n";
}
emailBody = emailBody + "To: " + temp.to + "\n";
emailBody = emailBody + "From: " + temp.from + "\n";
emailBody = emailBody + "Subject: " + temp.subject + "\n\n";
if(D.getValue("emailtype") == "2")
{
//insert the text body
emailBody = emailBody + temp.textBody + "\n";
}
else if (D.getValue("emailtype") == "1")
{
//insert the html body
String tt = temp.htmlBody.replaceAll(" \n","\n\t" );
emailBody = emailBody + tt + "\n";
}
else if (D.getValue("emailtype") == "0")
{
//render text and html bodies
}
//System.out.println(email Body);
cInt.writeSocket(mid,email Body);
cInt.writeSocket(mid,".");
cInt.sConnections[mid].whe re = 5;
}
else
{
if (inStr.charAt(1) == '5')
{
//5XX series Error
eHolder.eHolder[mid] = null;
failed++;
cInt.writeSocket(mid,"RSET ");
cInt.sConnections[mid].whe re=1;
}
else if (inStr.charAt(0) == '4')
{
//4XX series Error
}
}
}
else if (step == 5)
{
if (inStr.indexOf("250") > -1)
{
//System.out.println(msgCo unt + " Message(s) Sent!");
eHolder.numSent++;
//delete the message from eHolder
//set where = 1 to recycle the connection
eHolder.eHolder[mid] = null;
cInt.sConnections[mid].whe re = 1;
}
else
{
if (inStr.charAt(1) == '5')
{
//5XX series Error
eHolder.eHolder[mid] = null;
failed++;
cInt.writeSocket(mid,"RSET ");
cInt.sConnections[mid].whe re=1;
}
else if (inStr.charAt(0) == '4')
{
//4XX series Error
}
}
}
}
//process
//code = -3 : Not Connected
//code = -2 : Not Connected but a connection is pending
//code = 0 : Connected but no data
//code = 1 : Connected with Data Present
}
else
{
//System.out.println("MID = -1");
if (eHolder.Status == -1)
{
working = false;
//System.out.println("Deli very Thread Complete.");
D.setCampaignStatus(1);
}
}
}
}
}
//EMAIL Class
public class email
{
//campaign storage values
public String from = "";
public String from2 = "";
public String to = "";
public String subject = "";
public String htmlBody = "";
public String textBody = "";
public String messageid = "";
public int status = -1;
public int where = 0;
public email(String t,String frm, String frm2, String sbj, String hBody, String tBody, String mid)
{
try
{
from = frm;
from2 = frm2;
to = t;
subject = sbj;
htmlBody = hBody;
textBody = tBody;
messageid = mid;
status = 0;
}
catch (Exception e)
{
System.out.println("EXCEPT ION SETTING EMAIL HOLDER!");
}
}
}
//EMAIL HOLDER
import java.util.Hashtable;
import java.io.*;
import java.util.StringTokenizer;
import java.util.Enumeration;
public class emailHolder
{
//campaign storage values
email[] eHolder;
int Status = 0;
int maxMessages = 0;
int messageCount = 0;
int numSent = 0;
public emailHolder(config sCfg)
{
//calculate mail queue
int temp = 0;
for (int i=0; i < sCfg.maxconnections.length ; i++)
{
temp = temp + sCfg.maxconnections[i];
}
maxMessages = temp;
eHolder = new email[maxMessages];
}
email getMessage(int mNum)
{
//return the value for the given key
return eHolder[mNum];
}
int findEmptySlot()
{
//searches the email queue to find an empty slot
//and returns the index if found or -1 if not found
int temp = -1;
for (int i = 0; i < eHolder.length; i++)
{
if(eHolder[i] == null)
{
temp = i;
}
}
return temp;
}
int putMessage(int id,String to, String from,String from2, String Subject, String HTMLBody, String TextBody, String mid)
{
//puts a message
eHolder[id] = new email(to,from,from2, Subject,HTMLBody, TextBody,mid);
return 0;
}
int nextMessage()
{
// Get and return the next message id. This occurs in a round robin succession.
//If the email holder is null, return the next value... if all email holders
//are null return -1.
int temp = -1;
if (checkStatus() == 0)
{
temp = messageCount;
++messageCount;
if (messageCount == eHolder.length)
{
messageCount = 0;
}
}
return temp;
}
int checkStatus()
{
//Check to see if the render thread is done....
//if it is then check to see if all the messages in the holder are null
int done = 1;
if (Status == -1)
{
for (int i = 0; i < eHolder.length; i++)
{
if (eHolder[i] != null)
done = 0;
}
}
else
{
done = 0;
}
return done;
}
}
//RECIPIENTS Class
import java.util.Hashtable;
import java.io.*;
import java.util.StringTokenizer;
import java.util.Enumeration;
public class recipients
{
//campaign storage values
config dCfg;
public int Status = 0;
database listdata;
public recipients(config sCfg, campaign cmp)
{
dCfg = sCfg;
//execute database queries
//get the datatable name
database application = new database(sCfg.dbURL(1),sCf g.dbUser(1 ),sCfg.dbP assword(1) );
String datatablename = "";
if (application.executeQuery( "select datatablename from lists where id = " + cmp.getValue("listid"))>0)
{
Hashtable temp = application.getNextRow();
datatablename = (String) temp.get("datatablename");
}
else
{
System.out.println("Initia lization Error. Campaign Data Not Found!");
}
//store recipient list in result set
application.closeDBConnect ion1();
application.closeDBConnect ion2();
//now connect to the recipient data
listdata = new database(sCfg.dbURL(2),sCf g.dbUser(2 ),sCfg.dbP assword(2) );
int t = listdata.executeQuery("sel ect top " + cmp.getValue("numsend") + " * from " + datatablename + " where id > " + cmp.getValue("startid"));
if (t==0)
{
Status = -1;
System.out.println("Initia lization Error. Query Returned No Records!");
}
}
Hashtable nextRecipient()
{
//return the value for the given key
if (listdata.Status == 0)
return listdata.getNextRow();
else
{
setStatus(-1);
return null;
}
}
int setStatus(int status)
{
//set the recipient status
Status = status;
return 0;
}
}
//REMINDER Class
import java.util.Timer;
import java.util.TimerTask;
/**
* Simple demo that uses java.util.Timer to schedule a task to execute
* once 5 seconds have passed.
*/
public class reminder {
Timer timer;
public reminder(int seconds) {
timer = new Timer();
timer.schedule(new RemindTask(),0, seconds*1000);
}
class RemindTask extends TimerTask {
public void run() {
System.out.println("Time's up!");
//timer.cancel(); //Terminate the timer thread
}
}
public static void main(String args[]) {
System.out.println("About to schedule task.");
new reminder(5);
System.out.println("Task scheduled.");
}
}
//render thread
import java.io.*;
import java.net.*;
import java.lang.reflect.*;
import java.util.*;
public class render extends Thread{
config mCfg;
campaign cmpData;
emailHolder eHolder;
recipients rcpt;
boolean working = true;
public render(config sCfg, campaign cData, emailHolder eh, recipients rcp)
{
mCfg = sCfg;
cmpData = cData;
eHolder = eh;
rcpt = rcp;
}
public void run()
{
//render the emails
boolean personalized = (new Integer(cmpData.getValue(" personaliz ed")).intV alue() == 1);
int emailtype = new Integer(cmpData.getValue(" emailtype" )).intValu e();
int counter = 0;
while (working)
{
int freeslot = eHolder.findEmptySlot();
if ((rcpt.Status == 0) && (freeslot > -1))
{
//there is a free slot so render an email
counter++;
//System.out.println(count er + " - New Render");
Hashtable row = rcpt.nextRecipient();
if (row != null)
{
try
{
//next get / set the subjectline
String subject = cmpData.getNextSubject();
//generate the messageid
String messageid = cmpData.getValue("id") + "-" + (String) row.get("id") + "-1-"+ cmpData.getValue("listid") +"-223mpx" ;
String curDomain = mCfg.getNextDomain();
//System.out.println(curDo main);
//next get / set the fromline
String from = '"' + cmpData.getNextFrom() + '"' + " <" + messageid + "@" + curDomain+">";
String from2 = "<" + messageid + "@" + curDomain + ">";
String to = (String) row.get("email");
String htmlBody = cmpData.getValue("htmlbody ");
String textBody = cmpData.getValue("textBody ");
String htmlFooter = cmpData.htmlFooter;
String textFooter = cmpData.textFooter;
//render the htmlbody if so required
if ((emailtype == 1) || (emailtype == 3))
{
//render the HTML body
htmlBody = htmlBody.replaceAll("@open @","http://" + curDomain + "/o.cfm?"+messageid);
htmlBody = htmlBody.replaceAll("@href @","http://" + curDomain + "/c.cfm?"+messageid);
htmlBody = htmlBody.replaceAll("@unsu bscribe@", "http://" + curDomain + "/u.cfm?" + messageid);
htmlBody = htmlBody.replaceAll("@firs tname@",(S tring) row.get("first"));
htmlBody = htmlBody.replaceAll("@last name@",(St ring) row.get("last"));
htmlBody = htmlBody.replaceAll("@addr ess@",(Str ing) row.get("address"));
htmlBody = htmlBody.replaceAll("@city @",(String ) row.get("city"));
htmlBody = htmlBody.replaceAll("@stat e@",(Strin g) row.get("state"));
htmlBody = htmlBody.replaceAll("@zip@ ",(String) row.get("zip"));
htmlBody = htmlBody.replaceAll("@emai l@",(Strin g) row.get("email"));
htmlBody = htmlBody.replaceAll("@date stamp@",(S tring) row.get("datestamp"));
htmlBody = htmlBody.replaceAll("@ip@" ,(String) row.get("ip"));
htmlBody = htmlBody.replaceAll("@site @",(String ) row.get("site"));
htmlBody = htmlBody.replaceAll("@id@" ,(String) row.get("id"));
htmlBody = htmlBody.replaceAll("@mess ageid@",me ssageid);
}
if ((emailtype == 2) || (emailtype ==3))
{
//render the TEXT body
textBody = textBody.replaceAll("@href @","http://" + curDomain + ".c.cfm?"+messageid);
textBody = textBody.replaceAll("@unsu bscribe@", "http://"+curDoma in+"/u.cfm ?"+message id);
textBody = textBody.replaceAll("@firs tname@",(S tring) row.get("first"));
textBody = textBody.replaceAll("@last name@",(St ring) row.get("last"));
textBody = textBody.replaceAll("@addr ess@",(Str ing) row.get("address"));
textBody = textBody.replaceAll("@city @",(String ) row.get("city"));
textBody = textBody.replaceAll("@stat e@",(Strin g) row.get("state"));
textBody = textBody.replaceAll("@zip@ ",(String) row.get("zip"));
textBody = textBody.replaceAll("@emai l@",(Strin g) row.get("email"));
textBody = textBody.replaceAll("@date stamp@",(S tring) row.get("datestamp"));
textBody = textBody.replaceAll("@ip@" ,(String) row.get("ip"));
textBody = textBody.replaceAll("@site @",(String ) row.get("site"));
textBody = textBody.replaceAll("@id@" ,(String) row.get("id"));
textBody = textBody.replaceAll("@mess ageid@",me ssageid);
}
//now render the footers
if ((emailtype == 1) || (emailtype == 3))
{
//render the HTML body
htmlFooter = htmlFooter.replaceAll("@un subscribe@ ","http://" + curDomain + "/u.cfm?" + messageid);
htmlFooter = htmlFooter.replaceAll("@fi rstname@", (String) row.get("first"));
htmlFooter = htmlFooter.replaceAll("@la stname@",( String) row.get("last"));
htmlFooter = htmlFooter.replaceAll("@ad dress@",(S tring) row.get("address"));
htmlFooter = htmlFooter.replaceAll("@ci ty@",(Stri ng) row.get("city"));
htmlFooter = htmlFooter.replaceAll("@st ate@",(Str ing) row.get("state"));
htmlFooter = htmlFooter.replaceAll("@zi p@",(Strin g) row.get("zip"));
htmlFooter = htmlFooter.replaceAll("@em ail@",(Str ing) row.get("email"));
htmlFooter = htmlFooter.replaceAll("@da testamp@", (String) row.get("datestamp"));
htmlFooter = htmlFooter.replaceAll("@ip @",(String ) row.get("ip"));
htmlFooter = htmlFooter.replaceAll("@si te@",(Stri ng) row.get("site"));
htmlFooter = htmlFooter.replaceAll("@id @",(String ) row.get("id"));
htmlFooter = htmlFooter.replaceAll("@me ssageid@", messageid) ;
}
if ((emailtype == 2) || (emailtype ==3))
{
//render the TEXT body
textFooter = textFooter.replaceAll("@un subscribe@ ","http://"+curDo main+"/u.c fm?"+messa geid);
textFooter = textFooter.replaceAll("@fi rstname@", (String) row.get("first"));
textFooter = textFooter.replaceAll("@la stname@",( String) row.get("last"));
textFooter = textFooter.replaceAll("@ad dress@",(S tring) row.get("address"));
textFooter = textFooter.replaceAll("@ci ty@",(Stri ng) row.get("city"));
textFooter = textFooter.replaceAll("@st ate@",(Str ing) row.get("state"));
textFooter = textFooter.replaceAll("@zi p@",(Strin g) row.get("zip"));
textFooter = textFooter.replaceAll("@em ail@",(Str ing) row.get("email"));
textFooter = textFooter.replaceAll("@da testamp@", (String) row.get("datestamp"));
textFooter = textFooter.replaceAll("@ip @",(String ) row.get("ip"));
textFooter = textFooter.replaceAll("@si te@",(Stri ng) row.get("site"));
textFooter = textFooter.replaceAll("@id @",(String ) row.get("id"));
textFooter = textFooter.replaceAll("@me ssageid@", messageid) ;
}
//Render the subject line
subject = subject.replaceAll("@first name@",(St ring) row.get("first"));
subject = subject.replaceAll("@lastn ame@",(Str ing) row.get("last"));
subject = subject.replaceAll("@addre ss@",(Stri ng) row.get("address"));
subject = subject.replaceAll("@city@ ",(String) row.get("city"));
subject = subject.replaceAll("@state @",(String ) row.get("state"));
subject = subject.replaceAll("@zip@" ,(String) row.get("zip"));
subject = subject.replaceAll("@email @",(String ) row.get("email"));
subject = subject.replaceAll("@dates tamp@",(St ring) row.get("datestamp"));
subject = subject.replaceAll("@ip@", (String) row.get("ip"));
subject = subject.replaceAll("@site@ ",(String) row.get("site"));
subject = subject.replaceAll("@id@", (String) row.get("id"));
subject = subject.replaceAll("@messa geid@",mes sageid);
//now render CAN-SPAM Compliance
htmlBody = htmlBody.replaceAll("</bod y>",htmlFo oter + "</body>");
textBody = textBody + "\n\n" + textFooter;
eHolder.putMessage(freeslo t,to,from, from2,subj ect,htmlBo dy,textBod y,messagei d);
//System.out.println("Rend ered: " + counter);
}
catch (Exception e)
{
}
}
else
{
working = false;
//System.out.println("Rend er Thread is Done!");
eHolder.Status = -1;
cmpData.setCampaignStatus( 1);
}
}
}
}
}
//Sock Conn Class
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
public class sockConn {
//In this version I am incorporating non-blocking connection requests, as well as I/O functions.
//The previous version was limited while the delivery thread was waiting to establish a connection on a slow or non-existant domain.
//NIO Related Variables
SocketChannel sc;
ByteBuffer in = null;
ByteBuffer out = null;
int where = 0;
int protocolType = 0; //0 = SMTP 1= PMM
public sockConn(campaign cmp)
{
//oSock constructor
//Socket Channel NIO Setup
//Calculate Output Buffer Size
String temp = cmp.getValue("htmlbody");
int outbuff = temp.length();
temp = cmp.getValue("textbody");
outbuff = outbuff + temp.length();
int adjBuff = outbuff/5;
in = ByteBuffer.allocate(1024);
out = ByteBuffer.allocate((outbu ff + adjBuff)*2);
try
{
sc = SocketChannel.open();
sc.configureBlocking(false ); //Set the channel to non-blocking mode
}
catch(IOException ie)
{
System.out.println("CONNEC TION ERROR: Unable to open socket channel.");
}
}
boolean isConnected()
{
//determines if the socket channel is connected
return sc.isConnected();
}
int getStatus()
{
//gets the current status of the socketchannel
int retVal = -3; // By default the socket is NOT connected
//first see if we are connected
if (sc.isConnected())
{
// ok we are connected lets see if this message is the one currently on top of this sockets queue
//connection is there so is there data to read?
if (dataIn())
{
//we have checked to see if there is data
//and there is so return data present
retVal = 1;
}
else
{
//there isnt data present we are just connected
retVal = 0;
}
}
else
{
//no connection present but is there emails in the queue?
if (sc.isConnectionPending()) // or is a connection pending?
{
retVal = -2;
}
}
//code = -3 : Not Connected
//code = -2 : Not Connected but a connection is pending
//code = -1 : Not Connected but Domains in the queue (reconnect)
//code = 0 : Connected but no data
//code = 1 : Connected with Data Present
//code = 2 : Connected but this message isnt next in the queue
return retVal;
}
boolean dataIn()
{
//refreshes the input buffer and returns true if there is new data to be utilized
//first refresh the buffer
boolean retVal = false;
try
{
if (sc.read(in) > 0)
{
retVal = true;
in.flip();
}
}
catch (IOException ie)
{
System.out.println("CONNEC TION ERROR: Unable to read from socket.");
System.out.println(ie.getM essage());
}
//flip the buffer
return retVal;
}
String readln()
{
//function to read the next "line" of data from the connection
//should only be called after a call to the dataIn function
//get the size of the buffer
int length = in.remaining();
//set up a holding array for the data
byte[] workStr = new byte[1024];
//get the data from the buffer
in.get(workStr,0,length);
String retVal = new String(workStr,0,length);
in.clear();
return retVal;
}
void writeln(String outStr)
{
outStr = outStr + "\r\n";
out.put(outStr.getBytes()) ;
out.flip();
int offset = 0;
try
{
while (offset < outStr.length())
{
offset += sc.write(out);
}
}
catch (IOException ie)
{
System.out.println("CONNEC TION ERROR: Unable to write to socket");
}
out.clear();
}
boolean connect(String connTo, String port)
{
boolean retVal = false;
try
{
retVal = sc.connect(new InetSocketAddress(connTo,n ew Integer(port).intValue())) ;
}
catch(IOException ie)
{
System.out.println("CONNEC TION ERROR: Unable to connect to address.");
}
catch(UnresolvedAddressExc eption uae)
{
System.out.println("CONNEC TION ERROR: Unable to resolve address.");
}
where = 0;
return retVal;
}
boolean finishConnect()
{
boolean retVal = false;
try
{
retVal = sc.finishConnect();
}
catch(IOException ie)
{
System.out.println("CONNEC TION ERROR: Unable to complete connection.");
}
return retVal;
}
void close()
{
try
{
sc.close();
}
catch(IOException ie)
{
System.out.println("CONNEC TION ERROR: Unable to close connection");
}
}
void recycle()
{
}
}
//statusThread class
import java.io.*;
import java.net.*;
import java.lang.reflect.*;
import java.util.*;
public class statusThread extends Thread{
config mCfg;
emailHolder eHolder;
connections cInt;
campaign D;
boolean working = true;
public statusThread(config sCfg, emailHolder eh, connections dInt, campaign C)
{
mCfg = sCfg;
eHolder = eh;
cInt = dInt;
D = C;
}
public void run()
{
int minutes = 0;
int lastMessages = 0;
while (working)
{
//wait 60 seconds
try
{
sleep(60000);
}
catch(InterruptedException ie)
{
}
//collect the data
minutes++;
int msgLastMinute = eHolder.numSent - lastMessages;
lastMessages = eHolder.numSent;
//calculate percent complete
float completed = (eHolder.numSent / new Integer((String) D.getValue("numsend")).int Value())*1 00;
//submit the data to the database
database application = new database(mCfg.dbURL(1),mCf g.dbUser(1 ),mCfg.dbP assword(1) );
application.executeUpdate( "update campaigns set minutes = " + minutes + ", lastminute = " + msgLastMinute + ", numsent = " + eHolder.numSent + " where id = " + D.getValue("id"));
application.closeDBConnect ion1();
application.closeDBConnect ion2();
//check to see if this should continue
System.out.println(minutes + "mins Last Minute: " + msgLastMinute + " Num Sent: " + eHolder.numSent + " " + completed + "%");
if (eHolder.Status == -1)
{
working = false;
}
}
}
}
Here is the complete code of the mail feeder app I have written. As it is I do not use any selectors, everything works great with exception of large email sizes. Its extremely fast, and works 100% except for this problem. The big performance issue is found in the sockConn class, in the writeln method. As you can see I am looping until the write is complete. I think if i can just get the buffer bigger the speed will be normal for larger emails. Basically, a 1.5k email I can deliver at
rates of 37,000 emails a minute. But when you jump up to 9k, the number of deliveries per minute is less than 1000, and the bottleneck is that while loop. If I can somehow make the buffer larger the looping will be minimized. I know this is a very long post, and I apologize for it, but I want you guys to reallly see what I am working with here.... In a way I have almost emulated selectors with the getstatus function in sockConn.
Let me know what you think I should do.
Cheers,
Rick
// MPFEEDER Class main class
//MailPro Delivery Feeder
//Scope: The MailPro Delivery Feeder is a stand-alone application that generates and transmits
//email at very high rates. The requiring is the algorithm that must be followed, and the pre-planned
//solutions.
//1.Initialize
//2.Query SQL Database for appropriate record.
//3.Set the records status.
//4.Loop
//5.Generate The Email
//6.Push the rendered email to storage.
//7.Push the email to the MTA.
//8.End Loop
/*
Required Classes:
Storage Classes:
Campaign Class - the campaign class is the most basic class for any outgoing campaign.
It stores the complete campaign record from the database. It is responsible for the
rendering of all outgoing email.
Email Holder Class - the email holder class basically holds each of the "rendered" email
messages for the delivery thread to pick up and deliver. It can hold X amount of messages,
each message stored can be retrieved, or deleted by its reference number.
Config Class - the config class holds the basic running values of the MailPro Feeder. These
values are loaded from the SQL database. The data includes the number of delivery threads,
MTA IP's and port configurations. In addition to this it will read from a base db.dat config
file on startup, this file contains the database configuration and access information.
Interface Classes:
Database Class - communicates with the database server. Must open connection on start up
and query the configuration and campaign information. IN addition to this it must have
functions to run queries, and return datasets.
Connection Class - manages connections to the MTA. It provides port status, as well as
read / write abilities, it opens the ports and closes the ports. It also detects the type
of MTA it is connected to (SMTP / PMM).
SMTP Class - works with Connection class and delivery thread to provide an smtp interface
for delivery.
PMM Class - works with connection class and delivery thread to provide an PMTA mail merge
interface.
Recipients Class - an interface to provide row by row data to the render thread. In addition,
to its primary role it provides status of the data... ie done status to the rest of the program.
Worker Threads:
Render Thread - based on the status of the email holder class (does the holder class have
any open containers) the render class pulls the email body, and header information from the
campaign class, the next recipients database row and renders the final outgoing mailpiece
and dumps it to the email holder class.
Delivery Thread - the delivery thread works with the connection class and the email holder. It
rapidly checks the status of each connection in a round robin type of manner. If the connection
reports that it is ready it will work with the email holder class and the set protocol interface
for the port to send each email through to the mta. Once the given message is complete it deletes
it from the email holder.
*/
import java.lang.*;
import java.io.*;
import java.util.*;
public class mpfeeder
{
public static void main(String[] args) throws IOException
{
//THIS IS THE ACTUAL CODE
/* INITIALIZATION */
//First initialize the config class
config sCfg = new config();
//Now initialize the campaign class
campaign cmpStorage = new campaign(sCfg);
//Now initialize email holder class
emailHolder eHolder = new emailHolder(sCfg);
//initialize the recipients class
recipients rcpt = new recipients(sCfg,cmpStorage
//finally initialize the connection class
connections cInt = new connections(sCfg,cmpStorag
//set the campaign status
cmpStorage.setCampaignStat
/* END OF INITIALIATION */
/* START THREADS */
render rThread = new render(sCfg, cmpStorage, eHolder, rcpt);
rThread.setPriority(2);
rThread.start();
delivery dThread = new delivery(sCfg, eHolder, cInt,cmpStorage);
dThread.setPriority(2);
dThread.start();
statusThread sThread = new statusThread(sCfg,eHolder,
sThread.setPriority(2);
sThread.start();
/* END OF THREADS */
}
}
//Campaign class
import java.io.*;
import java.util.*;
import java.text.*;
public class campaign
{
//campaign storage values
Hashtable campaigns;
public int campaign = -1;
config mCfg;
String[] subjectLines;
String[] fromLines;
int sbjCount = 0;
int frmCount = 0;
String tablename = "";
String htmlFooter = "";
String textFooter = "";
public campaign(config sCfg)
{
//query the database for an active campaign
mCfg = sCfg;
database application = new database(sCfg.dbURL(1),sCf
//build the maildate
Date today;
String mdate;
SimpleDateFormat formatter;
formatter = new SimpleDateFormat("Mddyyyy"
today = new Date();
mdate = formatter.format(today);
if (application.executeQuery(
{
campaigns = application.getNextRow();
}
else
{
System.out.println("Initia
}
application.closeDBConnect
application.closeDBConnect
String t = (String) campaigns.get("subjectline
subjectLines = t.split("\n");
t = (String) campaigns.get("fromlines")
fromLines = t.split("\n");
application = new database(mCfg.dbURL(1),mCf
if (application.executeQuery(
{
tablename = (String) (application.getNextRow())
}
else
{
System.out.println("Initia
}
application.closeDBConnect
application.closeDBConnect
application = new database(mCfg.dbURL(1),mCf
if (application.executeQuery(
{
Hashtable temp = application.getNextRow();
htmlFooter = (String) temp.get("htmlfooter");
textFooter = (String) temp.get("textfooter");
}
else
{
System.out.println("Initia
}
application.closeDBConnect
application.closeDBConnect
}
String getNextSubject()
{
String temp = subjectLines[sbjCount];
++sbjCount;
if (sbjCount == subjectLines.length)
{
sbjCount = 0;
}
return temp;
}
String getNextFrom()
{
String temp = fromLines[frmCount];
++frmCount;
if (frmCount == fromLines.length)
{
frmCount = 0;
}
return temp;
}
String getValue(String key)
{
//return the value for the given key
return (String) campaigns.get(key);
}
void setCampaignStatus(int status)
{
//set the campaign status
database application = new database(mCfg.dbURL(1),mCf
if (status == 0)
{
application.executeUpdate(
}
else if (status == 1)
{
application.executeUpdate(
}
application.closeDBConnect
application.closeDBConnect
}
}
//CONFIG Class
import java.lang.*;
import java.util.Hashtable;
import java.io.*;
import java.util.StringTokenizer;
import java.util.Enumeration;
public class config {
Hashtable mCfg;
int ErrorLevel = 0;
String[] domains;
int domainCount = 0;
String[] mailservers;
String[] ports;
int[] maxconnections;
public config() {
//mxcfg constructor
//creates new config hash
mCfg = new Hashtable();
if (loadConfig() == false)
{
ErrorLevel = 1;
}
//First get the list of useable domains
database application = new database(dbURL(1),dbUser(1
int rcount = application.executeQuery("
if (rcount > 0)
{
domains = new String[rcount];
for (int i = 1; i <= rcount; i++)
{
Hashtable temp = application.getNextRow();
domains[i-1] = (String) temp.get("domain");
}
application.closeDBConnect
application.closeDBConnect
}
else
{
System.out.println("Initia
}
//Next get the mtaservers
application = new database(dbURL(1),dbUser(1
rcount = application.executeQuery("
if (rcount > 0)
{
mailservers = new String[rcount];
ports = new String[rcount];
maxconnections = new int[rcount];
for (int i = 1; i <=rcount; i++)
{
Hashtable temp = application.getNextRow();
mailservers[i-1] = (String) temp.get("mtaserver");
ports[i-1] = (String) temp.get("port");
maxconnections[i-1] = new Integer((String) temp.get("maxconnections")
}
application.closeDBConnect
application.closeDBConnect
}
else
{
System.out.println("Initia
}
}
String getNextDomain()
{
String temp = domains[domainCount];
++domainCount;
if (domainCount == domains.length)
{
domainCount = 0;
}
return temp;
}
public void addOption(String Key, Object value)
{
//Adds a new configuration key & value combination to the
//mCfg hash
mCfg.put(Key,value);
}
String dbURL(int i)
{
// return the database string
String URL = "";
URL = "jdbc:microsoft:sqlserver:
return URL;
}
String dbUser(int i)
{
//return the username for the given connection
return (String) mCfg.get("dbuser" + i);
}
String dbPassword(int i)
{
//return the username for the given connection
return (String) mCfg.get("dbpassword" + i);
}
public void removeOption(String Key)
{
//Removes an option bucket from the hashtable
mCfg.remove(Key);
}
public boolean loadConfig()
{
//Loads the configuration file from the config.ini file in the same directory
int Error = 0; //Error Level Indicator
//Catch error opening file for reading
try
{
//First Open mta.ini
BufferedReader in = new BufferedReader(new FileReader("db.dat"));
//1-Next read line by line config options
String cfgLine = "";
while ((cfgLine = in.readLine()) != null)
{
//2-for each individual line split the string in two utilizing StringTokenizer class on = delimiter
StringTokenizer st = new StringTokenizer(cfgLine,"=
String token1 = st.nextToken();
String token2 = st.nextToken();
addOption(token1,token2);
}
//4-repeat steps 1 - 4 until EOF
in.close();
}
catch (IOException e)
{
//if error level status Error = 2;
}
if (Error == 0)
{
return true;
}
else
{
return false;
}
}
public boolean saveConfig()
{
//Dumps the current hash table to a file in [KEY]=VALUE Pairs
//Catch error opening file for writing
int Error = 0;
//Open file mta.ini
try
{
BufferedWriter out = new BufferedWriter(new FileWriter("mta.ini"));
//1-Parse hashtable for next key,value combination
for (Enumeration keys = mCfg.keys() ; keys.hasMoreElements() ;)
{
String aKey = keys.nextElement().toStrin
out.write(aKey+"="+mCfg.ge
out.newLine();
}
out.close();
}
catch(IOException e)
{
ErrorLevel=2;
Error = 1;
}
if (Error == 0)
{
return true;
}
else
{
return false;
}
}
public Object getValue(String KEY)
{
//return the value associated with given key
return mCfg.get(KEY);
}
}
//Connections Class
import java.util.Hashtable;
import java.io.*;
import java.util.StringTokenizer;
import java.util.Enumeration;
public class connections
{
config dCfg;
sockConn[] sConnections;
int socketCount = 0;
public connections(config sCfg, campaign cmp)
{
dCfg = sCfg;
//System.out.println(dCfg.
sConnections = new sockConn[dCfg.maxconnectio
for (int i = 0; i<=dCfg.maxconnections[0]-
{
sConnections[i] = new sockConn(cmp);
sConnections[i].connect(dC
//System.out.println("Conn
}
}
int getSocketStatus(int cNum)
{
//code = -3 : Not Connected
//code = -2 : Not Connected but a connection is pending
//code = -1 : Not Connected but Domains in the queue (reconnect)
//code = 0 : Connected but no data
//code = 1 : Connected with Data Present
return sConnections[cNum].getStat
}
String readSocket(int cNum)
{
return sConnections[cNum].readln(
}
void writeSocket(int cNum, String outs)
{
sConnections[cNum].writeln
}
}
//DATABASE CLASS
import java.lang.*;
import java.io.*;
import java.sql.*;
import java.util.*;
public class database
{
private Connection con;
private Statement stmt;
private ResultSet rs;
public int Status = 0;
private ResultSetMetaData rsmd;
private int numberOfColumns;
public database(String URL, String Username, String Password)
{
//now open the database connection to SQL Server
String url = URL;
try {
Class.forName("com.microso
} catch(java.lang.ClassNotFo
Status = -1;
System.err.print("ClassNot
System.err.println(e.getMe
}
try {
con = DriverManager.getConnectio
}
catch(SQLException ex)
{
Status = -1;
System.err.print("SQLExcep
System.err.println(ex.getM
}
//we are now ready to query the database
}
int executeQuery(String query)
{
int rowCount = 0;
try
{
stmt = con.createStatement(Result
rs = stmt.executeQuery(query);
rsmd = rs.getMetaData();
numberOfColumns = rsmd.getColumnCount();
rs.last();
rowCount = rs.getRow();
rs.first();
}
catch(SQLException ex)
{
Status = -1;
System.err.print("SQLExcep
System.err.println(ex.getM
return -1;
}
return rowCount;
}
int executeUpdate(String query)
{
try
{
stmt= con.createStatement();
stmt.executeUpdate(query);
}
catch(SQLException ex)
{
Status = -1;
System.err.print("SQLExcep
System.err.println(ex.getM
return -1;
}
return 1;
}
int closeDBConnection1()
{
//close the database connection
try{
stmt.close();
}
catch(SQLException ex)
{
Status = -1;
System.err.print("SQLExcep
System.err.println(ex.getM
return 0;
}
return 1;
}
int closeDBConnection2()
{
//close the database connection
try{
con.close();
}
catch(SQLException ex)
{
Status = -1;
System.err.print("SQLExcep
System.err.println(ex.getM
return 0;
}
return 1;
}
Hashtable getNextRow()
{
Hashtable temp = new Hashtable(50);
//parse the rs.next() and build the hashtable resultset
//if there aren't anymore rows then set the status to -1
//build the hashtable
try
{
for (int i = 1; i <= numberOfColumns; i++)
{
String column = rsmd.getColumnName(i);
String data = rs.getString(i);
if (data == null)
data = "";
temp.put(column,data);
}
rs.next();
if (rs.isAfterLast())
{
closeDBConnection1();
closeDBConnection2();
Status = -1;
}
}
catch(SQLException ex)
{
Status = -1;
System.err.print("SQLExcep
System.err.println(ex.getM
}
return temp;
}
}
//DELIVERY THREAD
import java.io.*;
import java.net.*;
import java.lang.reflect.*;
import java.util.*;
public class delivery extends Thread{
config mCfg;
emailHolder eHolder;
connections cInt;
campaign D;
boolean working = true;
public delivery(config sCfg, emailHolder eh, connections dInt, campaign C)
{
mCfg = sCfg;
eHolder = eh;
cInt = dInt;
D = C;
}
public void run()
{
int msgCount = 1;
int failed = 0;
while (working)
{
//get the next message if -1 and eh.status = -1 then end
int mid = eHolder.nextMessage();
//System.out.print("Failed
if ((mid > -1))
{
//ok we have a message to work with so
//first check the status of sockConn[mid]
int sockStatus = cInt.getSocketStatus(mid);
//System.out.print("#" + sockStatus + "#");
if (sockStatus == -3)
{
cInt.sConnections[mid].con
}
else if (sockStatus == -2)
{
//try to connect
try
{
cInt.sConnections[mid].sc.
}
catch(IOException ie)
{
}
}
else if (sockStatus == 0)
{
int step = cInt.sConnections[mid].whe
if (step == 1)
{
email temp = eHolder.getMessage(mid);
if (temp != null)
{
//System.out.println(temp.
cInt.writeSocket(mid,"MAIL
cInt.sConnections[mid].whe
}
}
}
else if (sockStatus == 1)
{
//System.out.println(mid + " Sock Status - Delivery: Connected. Data Present");
String inStr = cInt.readSocket(mid);
//System.out.print(inStr);
//get the message
//System.out.println(inStr
int step = cInt.sConnections[mid].whe
//System.out.println("STEP
if (step == 0)
{
//The connection is new so lets detect if it is PMTA or not
int mType = inStr.indexOf("PowerMTA");
if (mType > 0)
cInt.sConnections[mid].pro
//based on protocol type respond
cInt.writeSocket(mid,"HELO
cInt.sConnections[mid].whe
}
else if (step == 1)
{
//the machine obviously responded so lets send it a mail from:
if (inStr.indexOf("250")> -1)
{
//Look for the OK status... it was found so lets send the mail from line:
email temp = eHolder.getMessage(mid);
if (temp != null)
{
cInt.writeSocket(mid,"MAIL
cInt.sConnections[mid].whe
}
}
else
{
if (inStr.charAt(1) == '5')
{
//5XX series Error
eHolder.eHolder[mid] = null;
failed++;cInt.writeSocket(
cInt.sConnections[mid].whe
}
else if (inStr.charAt(0) == '4')
{
//4XX series Error
}
}
}
else if (step == 2)
{
//the machine responded so lets check for OK and sned it a RCPT TO:
if (inStr.indexOf("250")> -1)
{
//look for the OK status.... it was found so send the rcpt to:
email temp = eHolder.getMessage(mid);
cInt.writeSocket(mid,"RCPT
cInt.sConnections[mid].whe
}
else
{
if (inStr.charAt(1) == '5')
{
//5XX series Error
eHolder.eHolder[mid] = null;
failed++;
cInt.writeSocket(mid,"RSET
cInt.sConnections[mid].whe
}
else if (inStr.charAt(0) == '4')
{
//4XX series Error
}
}
}
else if (step == 3)
{
//send the data command
if (inStr.indexOf("250")>-1)
{
//look for the OK status.... send the Data command
cInt.writeSocket(mid,"DATA
cInt.sConnections[mid].whe
}
else
{
if (inStr.charAt(1) == '5')
{
//5XX series Error
eHolder.eHolder[mid] = null;
failed++;
cInt.writeSocket(mid,"RSET
cInt.sConnections[mid].whe
}
else if (inStr.charAt(0) == '4')
{
//4XX series Error
}
}
}
else if (step == 4)
{
//send the email body
if (inStr.indexOf("354") > -1)
{
email temp = eHolder.getMessage(mid);
String emailBody = "X-MPX: " + temp.messageid + "\n";
if(D.getValue("emailtype")
{
emailBody = emailBody + "MIME-Version: 1.0\nContent-Type: text/html;\n Content-Transfer-Encoding:
}
else if (D.getValue("emailtype") == "0")
{
//Insert text & html MIME header
emailBody = emailBody + "MIME-Version: 1.0\nContent-Type: multipart/alternative;\n Content-Transfer-Encoding:
}
emailBody = emailBody + "To: " + temp.to + "\n";
emailBody = emailBody + "From: " + temp.from + "\n";
emailBody = emailBody + "Subject: " + temp.subject + "\n\n";
if(D.getValue("emailtype")
{
//insert the text body
emailBody = emailBody + temp.textBody + "\n";
}
else if (D.getValue("emailtype") == "1")
{
//insert the html body
String tt = temp.htmlBody.replaceAll("
emailBody = emailBody + tt + "\n";
}
else if (D.getValue("emailtype") == "0")
{
//render text and html bodies
}
//System.out.println(email
cInt.writeSocket(mid,email
cInt.writeSocket(mid,".");
cInt.sConnections[mid].whe
}
else
{
if (inStr.charAt(1) == '5')
{
//5XX series Error
eHolder.eHolder[mid] = null;
failed++;
cInt.writeSocket(mid,"RSET
cInt.sConnections[mid].whe
}
else if (inStr.charAt(0) == '4')
{
//4XX series Error
}
}
}
else if (step == 5)
{
if (inStr.indexOf("250") > -1)
{
//System.out.println(msgCo
eHolder.numSent++;
//delete the message from eHolder
//set where = 1 to recycle the connection
eHolder.eHolder[mid] = null;
cInt.sConnections[mid].whe
}
else
{
if (inStr.charAt(1) == '5')
{
//5XX series Error
eHolder.eHolder[mid] = null;
failed++;
cInt.writeSocket(mid,"RSET
cInt.sConnections[mid].whe
}
else if (inStr.charAt(0) == '4')
{
//4XX series Error
}
}
}
}
//process
//code = -3 : Not Connected
//code = -2 : Not Connected but a connection is pending
//code = 0 : Connected but no data
//code = 1 : Connected with Data Present
}
else
{
//System.out.println("MID = -1");
if (eHolder.Status == -1)
{
working = false;
//System.out.println("Deli
D.setCampaignStatus(1);
}
}
}
}
}
//EMAIL Class
public class email
{
//campaign storage values
public String from = "";
public String from2 = "";
public String to = "";
public String subject = "";
public String htmlBody = "";
public String textBody = "";
public String messageid = "";
public int status = -1;
public int where = 0;
public email(String t,String frm, String frm2, String sbj, String hBody, String tBody, String mid)
{
try
{
from = frm;
from2 = frm2;
to = t;
subject = sbj;
htmlBody = hBody;
textBody = tBody;
messageid = mid;
status = 0;
}
catch (Exception e)
{
System.out.println("EXCEPT
}
}
}
//EMAIL HOLDER
import java.util.Hashtable;
import java.io.*;
import java.util.StringTokenizer;
import java.util.Enumeration;
public class emailHolder
{
//campaign storage values
email[] eHolder;
int Status = 0;
int maxMessages = 0;
int messageCount = 0;
int numSent = 0;
public emailHolder(config sCfg)
{
//calculate mail queue
int temp = 0;
for (int i=0; i < sCfg.maxconnections.length
{
temp = temp + sCfg.maxconnections[i];
}
maxMessages = temp;
eHolder = new email[maxMessages];
}
email getMessage(int mNum)
{
//return the value for the given key
return eHolder[mNum];
}
int findEmptySlot()
{
//searches the email queue to find an empty slot
//and returns the index if found or -1 if not found
int temp = -1;
for (int i = 0; i < eHolder.length; i++)
{
if(eHolder[i] == null)
{
temp = i;
}
}
return temp;
}
int putMessage(int id,String to, String from,String from2, String Subject, String HTMLBody, String TextBody, String mid)
{
//puts a message
eHolder[id] = new email(to,from,from2, Subject,HTMLBody, TextBody,mid);
return 0;
}
int nextMessage()
{
// Get and return the next message id. This occurs in a round robin succession.
//If the email holder is null, return the next value... if all email holders
//are null return -1.
int temp = -1;
if (checkStatus() == 0)
{
temp = messageCount;
++messageCount;
if (messageCount == eHolder.length)
{
messageCount = 0;
}
}
return temp;
}
int checkStatus()
{
//Check to see if the render thread is done....
//if it is then check to see if all the messages in the holder are null
int done = 1;
if (Status == -1)
{
for (int i = 0; i < eHolder.length; i++)
{
if (eHolder[i] != null)
done = 0;
}
}
else
{
done = 0;
}
return done;
}
}
//RECIPIENTS Class
import java.util.Hashtable;
import java.io.*;
import java.util.StringTokenizer;
import java.util.Enumeration;
public class recipients
{
//campaign storage values
config dCfg;
public int Status = 0;
database listdata;
public recipients(config sCfg, campaign cmp)
{
dCfg = sCfg;
//execute database queries
//get the datatable name
database application = new database(sCfg.dbURL(1),sCf
String datatablename = "";
if (application.executeQuery(
{
Hashtable temp = application.getNextRow();
datatablename = (String) temp.get("datatablename");
}
else
{
System.out.println("Initia
}
//store recipient list in result set
application.closeDBConnect
application.closeDBConnect
//now connect to the recipient data
listdata = new database(sCfg.dbURL(2),sCf
int t = listdata.executeQuery("sel
if (t==0)
{
Status = -1;
System.out.println("Initia
}
}
Hashtable nextRecipient()
{
//return the value for the given key
if (listdata.Status == 0)
return listdata.getNextRow();
else
{
setStatus(-1);
return null;
}
}
int setStatus(int status)
{
//set the recipient status
Status = status;
return 0;
}
}
//REMINDER Class
import java.util.Timer;
import java.util.TimerTask;
/**
* Simple demo that uses java.util.Timer to schedule a task to execute
* once 5 seconds have passed.
*/
public class reminder {
Timer timer;
public reminder(int seconds) {
timer = new Timer();
timer.schedule(new RemindTask(),0, seconds*1000);
}
class RemindTask extends TimerTask {
public void run() {
System.out.println("Time's
//timer.cancel(); //Terminate the timer thread
}
}
public static void main(String args[]) {
System.out.println("About to schedule task.");
new reminder(5);
System.out.println("Task scheduled.");
}
}
//render thread
import java.io.*;
import java.net.*;
import java.lang.reflect.*;
import java.util.*;
public class render extends Thread{
config mCfg;
campaign cmpData;
emailHolder eHolder;
recipients rcpt;
boolean working = true;
public render(config sCfg, campaign cData, emailHolder eh, recipients rcp)
{
mCfg = sCfg;
cmpData = cData;
eHolder = eh;
rcpt = rcp;
}
public void run()
{
//render the emails
boolean personalized = (new Integer(cmpData.getValue("
int emailtype = new Integer(cmpData.getValue("
int counter = 0;
while (working)
{
int freeslot = eHolder.findEmptySlot();
if ((rcpt.Status == 0) && (freeslot > -1))
{
//there is a free slot so render an email
counter++;
//System.out.println(count
Hashtable row = rcpt.nextRecipient();
if (row != null)
{
try
{
//next get / set the subjectline
String subject = cmpData.getNextSubject();
//generate the messageid
String messageid = cmpData.getValue("id") + "-" + (String) row.get("id") + "-1-"+ cmpData.getValue("listid")
String curDomain = mCfg.getNextDomain();
//System.out.println(curDo
//next get / set the fromline
String from = '"' + cmpData.getNextFrom() + '"' + " <" + messageid + "@" + curDomain+">";
String from2 = "<" + messageid + "@" + curDomain + ">";
String to = (String) row.get("email");
String htmlBody = cmpData.getValue("htmlbody
String textBody = cmpData.getValue("textBody
String htmlFooter = cmpData.htmlFooter;
String textFooter = cmpData.textFooter;
//render the htmlbody if so required
if ((emailtype == 1) || (emailtype == 3))
{
//render the HTML body
htmlBody = htmlBody.replaceAll("@open
htmlBody = htmlBody.replaceAll("@href
htmlBody = htmlBody.replaceAll("@unsu
htmlBody = htmlBody.replaceAll("@firs
htmlBody = htmlBody.replaceAll("@last
htmlBody = htmlBody.replaceAll("@addr
htmlBody = htmlBody.replaceAll("@city
htmlBody = htmlBody.replaceAll("@stat
htmlBody = htmlBody.replaceAll("@zip@
htmlBody = htmlBody.replaceAll("@emai
htmlBody = htmlBody.replaceAll("@date
htmlBody = htmlBody.replaceAll("@ip@"
htmlBody = htmlBody.replaceAll("@site
htmlBody = htmlBody.replaceAll("@id@"
htmlBody = htmlBody.replaceAll("@mess
}
if ((emailtype == 2) || (emailtype ==3))
{
//render the TEXT body
textBody = textBody.replaceAll("@href
textBody = textBody.replaceAll("@unsu
textBody = textBody.replaceAll("@firs
textBody = textBody.replaceAll("@last
textBody = textBody.replaceAll("@addr
textBody = textBody.replaceAll("@city
textBody = textBody.replaceAll("@stat
textBody = textBody.replaceAll("@zip@
textBody = textBody.replaceAll("@emai
textBody = textBody.replaceAll("@date
textBody = textBody.replaceAll("@ip@"
textBody = textBody.replaceAll("@site
textBody = textBody.replaceAll("@id@"
textBody = textBody.replaceAll("@mess
}
//now render the footers
if ((emailtype == 1) || (emailtype == 3))
{
//render the HTML body
htmlFooter = htmlFooter.replaceAll("@un
htmlFooter = htmlFooter.replaceAll("@fi
htmlFooter = htmlFooter.replaceAll("@la
htmlFooter = htmlFooter.replaceAll("@ad
htmlFooter = htmlFooter.replaceAll("@ci
htmlFooter = htmlFooter.replaceAll("@st
htmlFooter = htmlFooter.replaceAll("@zi
htmlFooter = htmlFooter.replaceAll("@em
htmlFooter = htmlFooter.replaceAll("@da
htmlFooter = htmlFooter.replaceAll("@ip
htmlFooter = htmlFooter.replaceAll("@si
htmlFooter = htmlFooter.replaceAll("@id
htmlFooter = htmlFooter.replaceAll("@me
}
if ((emailtype == 2) || (emailtype ==3))
{
//render the TEXT body
textFooter = textFooter.replaceAll("@un
textFooter = textFooter.replaceAll("@fi
textFooter = textFooter.replaceAll("@la
textFooter = textFooter.replaceAll("@ad
textFooter = textFooter.replaceAll("@ci
textFooter = textFooter.replaceAll("@st
textFooter = textFooter.replaceAll("@zi
textFooter = textFooter.replaceAll("@em
textFooter = textFooter.replaceAll("@da
textFooter = textFooter.replaceAll("@ip
textFooter = textFooter.replaceAll("@si
textFooter = textFooter.replaceAll("@id
textFooter = textFooter.replaceAll("@me
}
//Render the subject line
subject = subject.replaceAll("@first
subject = subject.replaceAll("@lastn
subject = subject.replaceAll("@addre
subject = subject.replaceAll("@city@
subject = subject.replaceAll("@state
subject = subject.replaceAll("@zip@"
subject = subject.replaceAll("@email
subject = subject.replaceAll("@dates
subject = subject.replaceAll("@ip@",
subject = subject.replaceAll("@site@
subject = subject.replaceAll("@id@",
subject = subject.replaceAll("@messa
//now render CAN-SPAM Compliance
htmlBody = htmlBody.replaceAll("</bod
textBody = textBody + "\n\n" + textFooter;
eHolder.putMessage(freeslo
//System.out.println("Rend
}
catch (Exception e)
{
}
}
else
{
working = false;
//System.out.println("Rend
eHolder.Status = -1;
cmpData.setCampaignStatus(
}
}
}
}
}
//Sock Conn Class
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
public class sockConn {
//In this version I am incorporating non-blocking connection requests, as well as I/O functions.
//The previous version was limited while the delivery thread was waiting to establish a connection on a slow or non-existant domain.
//NIO Related Variables
SocketChannel sc;
ByteBuffer in = null;
ByteBuffer out = null;
int where = 0;
int protocolType = 0; //0 = SMTP 1= PMM
public sockConn(campaign cmp)
{
//oSock constructor
//Socket Channel NIO Setup
//Calculate Output Buffer Size
String temp = cmp.getValue("htmlbody");
int outbuff = temp.length();
temp = cmp.getValue("textbody");
outbuff = outbuff + temp.length();
int adjBuff = outbuff/5;
in = ByteBuffer.allocate(1024);
out = ByteBuffer.allocate((outbu
try
{
sc = SocketChannel.open();
sc.configureBlocking(false
}
catch(IOException ie)
{
System.out.println("CONNEC
}
}
boolean isConnected()
{
//determines if the socket channel is connected
return sc.isConnected();
}
int getStatus()
{
//gets the current status of the socketchannel
int retVal = -3; // By default the socket is NOT connected
//first see if we are connected
if (sc.isConnected())
{
// ok we are connected lets see if this message is the one currently on top of this sockets queue
//connection is there so is there data to read?
if (dataIn())
{
//we have checked to see if there is data
//and there is so return data present
retVal = 1;
}
else
{
//there isnt data present we are just connected
retVal = 0;
}
}
else
{
//no connection present but is there emails in the queue?
if (sc.isConnectionPending())
{
retVal = -2;
}
}
//code = -3 : Not Connected
//code = -2 : Not Connected but a connection is pending
//code = -1 : Not Connected but Domains in the queue (reconnect)
//code = 0 : Connected but no data
//code = 1 : Connected with Data Present
//code = 2 : Connected but this message isnt next in the queue
return retVal;
}
boolean dataIn()
{
//refreshes the input buffer and returns true if there is new data to be utilized
//first refresh the buffer
boolean retVal = false;
try
{
if (sc.read(in) > 0)
{
retVal = true;
in.flip();
}
}
catch (IOException ie)
{
System.out.println("CONNEC
System.out.println(ie.getM
}
//flip the buffer
return retVal;
}
String readln()
{
//function to read the next "line" of data from the connection
//should only be called after a call to the dataIn function
//get the size of the buffer
int length = in.remaining();
//set up a holding array for the data
byte[] workStr = new byte[1024];
//get the data from the buffer
in.get(workStr,0,length);
String retVal = new String(workStr,0,length);
in.clear();
return retVal;
}
void writeln(String outStr)
{
outStr = outStr + "\r\n";
out.put(outStr.getBytes())
out.flip();
int offset = 0;
try
{
while (offset < outStr.length())
{
offset += sc.write(out);
}
}
catch (IOException ie)
{
System.out.println("CONNEC
}
out.clear();
}
boolean connect(String connTo, String port)
{
boolean retVal = false;
try
{
retVal = sc.connect(new InetSocketAddress(connTo,n
}
catch(IOException ie)
{
System.out.println("CONNEC
}
catch(UnresolvedAddressExc
{
System.out.println("CONNEC
}
where = 0;
return retVal;
}
boolean finishConnect()
{
boolean retVal = false;
try
{
retVal = sc.finishConnect();
}
catch(IOException ie)
{
System.out.println("CONNEC
}
return retVal;
}
void close()
{
try
{
sc.close();
}
catch(IOException ie)
{
System.out.println("CONNEC
}
}
void recycle()
{
}
}
//statusThread class
import java.io.*;
import java.net.*;
import java.lang.reflect.*;
import java.util.*;
public class statusThread extends Thread{
config mCfg;
emailHolder eHolder;
connections cInt;
campaign D;
boolean working = true;
public statusThread(config sCfg, emailHolder eh, connections dInt, campaign C)
{
mCfg = sCfg;
eHolder = eh;
cInt = dInt;
D = C;
}
public void run()
{
int minutes = 0;
int lastMessages = 0;
while (working)
{
//wait 60 seconds
try
{
sleep(60000);
}
catch(InterruptedException
{
}
//collect the data
minutes++;
int msgLastMinute = eHolder.numSent - lastMessages;
lastMessages = eHolder.numSent;
//calculate percent complete
float completed = (eHolder.numSent / new Integer((String) D.getValue("numsend")).int
//submit the data to the database
database application = new database(mCfg.dbURL(1),mCf
application.executeUpdate(
application.closeDBConnect
application.closeDBConnect
//check to see if this should continue
System.out.println(minutes
if (eHolder.Status == -1)
{
working = false;
}
}
}
}
> As you can see I am looping until the write is complete.
So effectively you are defeating the purpose of nio, and are blocking until write is complete.
So effectively you are defeating the purpose of nio, and are blocking until write is complete.
ASKER
for larger sized emails yes... smaller size emails have no problem whatsoever... thats why i am so adamant about the buffersize.
Cheers,
Rick
Cheers,
Rick
According to that logic anyone using nio would need increase their buffer size to a value greater than the maximum amount of data that could be sent to it.
ASKER
objects,
I'm not trying to defeat nio, i am simply trying to work around an inherent flaw in the program design. I'm aware of this problem, I just do not want to commit to a major rewrite of the sending side of the software.... I'm paid as a freelancer, and have already run over time alottment on this process as I initially started with coldfusion as the solution. Now I have merged the two, so that coldfusion populates the tables that this app uses.
Cheers,
Rick
I'm not trying to defeat nio, i am simply trying to work around an inherent flaw in the program design. I'm aware of this problem, I just do not want to commit to a major rewrite of the sending side of the software.... I'm paid as a freelancer, and have already run over time alottment on this process as I initially started with coldfusion as the solution. Now I have merged the two, so that coldfusion populates the tables that this app uses.
Cheers,
Rick
The output buffer size may be a factor related to the problem you're experiencing without being the reason for it. Personally i would perform an experiment: write a *single-threaded* test harness that writes a 'bad-sized' amount of data to the output stream, reproducing the current code as closely as possible and let us know what happens.
btw, are you able to post links to code?
btw, are you able to post links to code?
ASKER
CEHJ,
Ill do that and see what happens. Yes I can post links to code. I will do that from now on, as opposed to pasting code directly into the forum.
Cheers,
Rick
Ill do that and see what happens. Yes I can post links to code. I will do that from now on, as opposed to pasting code directly into the forum.
Cheers,
Rick
>>I will do that from now on, as opposed to pasting code directly into the forum.
Good idea. If you can post a link to the above, i'll get it deleted from this thread
Good idea. If you can post a link to the above, i'll get it deleted from this thread
ASKER
Ah - preferably as a normal link to make it easier
ASKER
CEHJ,
Please define as a normal link for me... the link above looks pretty normal to me. ;-)
Cheers,
Rick
Please define as a normal link for me... the link above looks pretty normal to me. ;-)
Cheers,
Rick
One that's readable when you click it ;-)