Would you be able to help. There are mainly two classes involved in concurrency. Broker and Agent. This is the code:
Agent Class
public class Agent extends JFrame implements Runnable, ActionListener, KeyListener, ListSelectionListener {
private JTextArea output;
private JTextArea input;
private JButton bAccept;
private JButton bGetList;
private JButton bClose;
private final static int INPUT_HEIGHT = 100;
private final static int AGENT_HEIGHT = 400;
private final static int AGENT_WIDTH = 300;
private String username;
private final static String SERVER_HOST = "127.0.0.1";
private final static int SERVER_PORT = 8989;
private final static int CLIENT_PORT = 8980;
private int portListening;
private ServerSocket server; // To accept other agents connection
private int currentStatus;
private JFrame pop = new JFrame("User List");
private JTable table;
private Thread thread;
private Reader reader;
private String usernameDest;
private ObjectInputStream in;
private ObjectOutputStream out;
private JScrollPane scrollUpper;
public static void main(String[] args) {
Agent agent = new Agent();
}
public Agent() {
super();
reader = new Reader();
username = JOptionPane.showInputDialo
g("Please enter your username: ");
this.setTitle(username);
output = new JTextArea();
output.setLineWrap(true);
output.setWrapStyleWord(tr
ue);
input = new JTextArea("");
input.setLineWrap(true);
input.setWrapStyleWord(tru
e);
bGetList = new JButton("Get Users");
bGetList.addActionListener
(this);
bClose = new JButton("Close Connection");
bClose.addActionListener(t
his);
bAccept = new JButton("Send");
bAccept.addActionListener(
this);
getContentPane().setLayout
(new BorderLayout());
JPanel lowerPanel = new JPanel(new BorderLayout());
lowerPanel.setBorder(new LineBorder(Color.black));
input.addKeyListener(this)
;
JScrollPane scrollLower = new JScrollPane(input);
lowerPanel.add(scrollLower
, BorderLayout.CENTER);
lowerPanel.add(bAccept, BorderLayout.EAST);
lowerPanel.add(bGetList, BorderLayout.SOUTH);
/* Upper panel */
JPanel upperPanel = new JPanel(new FlowLayout());
upperPanel.setBorder(new LineBorder(Color.black));
upperPanel.add(bGetList);
upperPanel.add(bClose);
/* Center panel */
JPanel centerPanel = new JPanel(new BorderLayout());
scrollUpper = new JScrollPane(output);
centerPanel.add(scrollUppe
r, BorderLayout.CENTER);
setSize(AGENT_WIDTH, AGENT_HEIGHT);
lowerPanel.setPreferredSiz
e(new Dimension(this.getWidth(),
INPUT_HEIGHT));
lowerPanel.setMaximumSize(
new Dimension(this.getWidth(),
INPUT_HEIGHT));
getContentPane().add(lower
Panel, BorderLayout.SOUTH);
getContentPane().add(cente
rPanel, BorderLayout.CENTER);
getContentPane().add(upper
Panel, BorderLayout.NORTH);
if (thread == null) {
thread = new Thread(this);
thread.start();
}
}
private boolean updateBroker() {
try {
Socket s = new Socket(SERVER_HOST, SERVER_PORT);
TreeMap data = new TreeMap();
data.put(Message.KEY_USERN
AME, username);
InetAddress thisIp = InetAddress.getLocalHost()
;
String strIP = thisIp.getHostAddress();
data.put(Message.KEY_HOST,
strIP);
data.put(Message.KEY_PORT,
new Integer(portListening));
data.put(Message.KEY_AGENT
_STATUS, new Integer(currentStatus));
Message loginMessage = new Message(Message.CMD_AGENT_
UPATE, data);
ObjectOutputStream out = new ObjectOutputStream(s.getOu
tputStream
());
System.out.println("Sendin
g registering with broker message");
out.writeObject(loginMessa
ge);
System.out.println("Receiv
ing registering response from broker");
ObjectInputStream in = new ObjectInputStream(s.getInp
utStream()
);
System.out.println("receiv
ed");
Message response = (Message) in.readObject();
Integer loginResult = (Integer) response.getData().get(Mes
sage.KEY_C
MD_RESULT)
;
System.out.println("Result
: " + loginResult.intValue());
if (loginResult.intValue() != Codes.RESULT_SUCCESSFULL.i
ntValue())
{
JOptionPane.showMessageDia
log(null, "Error updating client data");
return false;
}
return true;
} catch (IOException e) {
JOptionPane.showMessageDia
log(null, "Error registering with server");
} catch (ClassNotFoundException e) {
JOptionPane.showMessageDia
log(null, "Error registering with server");
}
return true;
}
private boolean processConnect(String ip, int port, String username) {
try {
Socket s = new Socket(ip, port);
TreeMap data = new TreeMap();
data.put(Message.KEY_USERN
AME, this.username);
Message connectMessage = new Message(Message.CMD_CONNEC
T, data);
out = new ObjectOutputStream(s.getOu
tputStream
());
System.out.println("Connec
ting with agent ip: " + ip + " port: " + port);
out.writeObject(connectMes
sage);
System.out.println("");
in = new ObjectInputStream(s.getInp
utStream()
);
System.out.println("Receiv
ed connection response");
Message response = (Message) in.readObject();
Integer cmdResult = (Integer) response.getData().get(Mes
sage.KEY_C
MD_RESULT)
;
if (cmdResult.intValue() != Codes.RESULT_SUCCESSFULL.i
ntValue())
{
JOptionPane.showMessageDia
log(null, "Error in connect");
return false;
}
usernameDest = username;
setTitle(this.username + " talking with " + usernameDest);
reader.start();
currentStatus = AgentData.STATUS_ON_SESSIO
N;
updateBroker();
return true;
} catch (IOException e) {
JOptionPane.showMessageDia
log(null, "Error registering with server");
} catch (ClassNotFoundException e) {
JOptionPane.showMessageDia
log(null, "Error registering with server");
}
return true;
}
private boolean getUserList() {
try {
Socket s = new Socket(SERVER_HOST, SERVER_PORT);
TreeMap data = new TreeMap();
Message loginMessage = new Message(Message.CMD_GET_US
ERLIST, data);
ObjectOutputStream out = new ObjectOutputStream(s.getOu
tputStream
());
System.out.println("sendin
g");
out.writeObject(loginMessa
ge);
System.out.println("receiv
ing");
ObjectInputStream in = new ObjectInputStream(s.getInp
utStream()
);
System.out.println("receiv
ed");
Message response = (Message) in.readObject();
Integer cmdResult = (Integer) response.getData().get(Mes
sage.KEY_C
MD_RESULT)
;
ArrayList userList = (ArrayList) response.getData().get(Mes
sage.KEY_U
SER_LIST);
System.out.println("Result
: " + cmdResult.intValue());
if (cmdResult.intValue() != Codes.RESULT_SUCCESSFULL.i
ntValue())
{
JOptionPane.showMessageDia
log(null, "Error in GET USER LIST");
return false;
}
String columnNames[] = {"Username", "Status", "IP Address", "Port"};
String sdata[][] = new String[userList.size()][4]
;
for (int i = 0; i < userList.size(); i++) {
AgentData d = (AgentData) userList.get(i);
sdata[i][0] = d.getUsername();
sdata[i][1] = Integer.toString(d.getStat
us());
sdata[i][2] = d.getHostname();
sdata[i][3] = Integer.toString(d.getPort
());
System.out.println("Userna
me : " + d.getUsername());
System.out.println("Status
: " + d.getStatus());
}
pop = new JFrame("Broker's User List");
DefaultTableModel model = new DefaultTableModel(sdata, columnNames);
table = new JTable(model);
JScrollPane scroll = new JScrollPane(table);
table.getSelectionModel().
addListSel
ectionList
ener(this)
;
pop.setSize(400, 400);
pop.getContentPane().add(s
croll);
pop.setVisible(true);
return true;
} catch (IOException e) {
JOptionPane.showMessageDia
log(null, "Error registering with server");
} catch (ClassNotFoundException e) {
JOptionPane.showMessageDia
log(null, "Error registering with server");
}
return true;
}
public void actionPerformed(ActionEven
t e) {
if (e.getSource() == bGetList) { // Get list button was pressed
getUserList();
}
if (e.getSource() == bAccept) { // Accept butotn was pressed
sendMessage();
}
if (e.getSource() == bClose) { // Close connection button was pressed
if (currentStatus == AgentData.STATUS_AVAILABLE
) {
JOptionPane.showMessageDia
log(null, "You are not on session");
return;
}
String confirm = JOptionPane.showInputDialo
g("Are you sure you want to close connection with " + usernameDest + " ? (Y/N): ");
if (confirm.equalsIgnoreCase(
"Y")) {
closeConnection();
}
}
}
private void sendMessage() {
TreeMap data = new TreeMap();
data.put(Message.KEY_TEXT,
input.getText());
Message txtMessage = new Message(Message.CMD_TEXT_M
ESSAGE, data);
try {
out.writeObject(txtMessage
);
input.setText("");
} catch (IOException e1) {
System.out.println("Error reading message");
}
}
private void closeConnection() {
TreeMap data = new TreeMap();
Message txtMessage = new Message(Message.CMD_CLOSE_
CONNECTION
, data);
try {
out.writeObject(txtMessage
);
reader = new Reader();
out.close();
in.close();
setTitle(username);
currentStatus = AgentData.STATUS_AVAILABLE
;
updateBroker();
} catch (IOException e1) {
System.out.println("Error reading message");
}
}
public void keyTyped(KeyEvent e) {
}
public void keyPressed(KeyEvent e) {
}
public void keyReleased(KeyEvent e) {
if (e.getSource() == input) { // Handle key events in the input text area
if (e.getKeyCode() == 10) { // Enter was pressed
sendMessage();
}
}
}
public void valueChanged(ListSelection
Event e) {
if ((ListSelectionModel) e.getSource() == table.getSelectionModel())
{ // Connection
if (e.getValueIsAdjusting()) return;
ListSelectionModel lsm = (ListSelectionModel) e.getSource();
if (lsm.isSelectionEmpty()) {
/* nothing selected */
} else {
int selectedRow = lsm.getMinSelectionIndex()
;
int status = Integer.parseInt((String) table.getModel().getValueA
t(selected
Row, 1));
if (status == AgentData.STATUS_AVAILABLE
) {
String username = (String) table.getModel().getValueA
t(selected
Row, 0);
if (!username.equalsIgnoreCas
e(this.use
rname)) {
String ip = (String) table.getModel().getValueA
t(selected
Row, 2);
int port = Integer.parseInt((String) table.getModel().getValueA
t(selected
Row, 3));
pop.setVisible(false);
processConnect(ip, port, username);
} else {
JOptionPane.showMessageDia
log(null, "Sorry you can connect to your self, perhaps for the next version");
}
} else {
JOptionPane.showMessageDia
log(null, "Client is not available");
}
}
}
}
public void run() {
portListening = CLIENT_PORT;
while (true) {
try {
System.out.println("Listen
ing on port " + portListening);
server = new ServerSocket(portListening
);
currentStatus = AgentData.STATUS_AVAILABLE
;
updateBroker();
show();
Socket s = server.accept();
System.out.println("Connec
tion request arrived from agent");
currentStatus = AgentData.STATUS_ON_SESSIO
N;
updateBroker();
out = new ObjectOutputStream(s.getOu
tputStream
());
in = new ObjectInputStream(s.getInp
utStream()
);
Message msg = (Message) in.readObject();
usernameDest = (String) msg.getData().get(Message.
KEY_USERNA
ME);
setTitle(this.username + " talking with " + usernameDest);
TreeMap data = new TreeMap();
data.put(Message.KEY_CMD_R
ESULT, Codes.RESULT_SUCCESSFULL);
Message connectMessage = new Message(Message.CMD_CONNEC
T_RESPONSE
, data);
System.out.println("sendin
g");
out.writeObject(connectMes
sage);
boolean end = false;
while (!end) {
try {
msg = (Message) in.readObject();
if (msg.getCmd() == Message.CMD_CLOSE_CONNECTI
ON) {
setTitle(username);
in.close();
out.close();
s.close();
end = true;
currentStatus = AgentData.STATUS_AVAILABLE
;
updateBroker();
} else {
addText((String) msg.getData().get(Message.
KEY_TEXT))
;
}
} catch (IOException e) {
System.out.println("Error reading 3");
end = true;
} catch (ClassNotFoundException e) {
System.out.println("Error reading 4");
end = true;
}
}
} catch (IOException e) {
portListening++; /* Port might be used, so let's try another */
System.out.println("Error accepting connection");
} catch (ClassNotFoundException e) {
System.out.println("Error accepting connection");
}
}
}
private void addText(String msg) {
System.out.println("Adding
" + msg);
msg = usernameDest + ": " + msg;
if (!msg.endsWith("\n")) { /* add line feed just if it does not already ends with \n */
msg = msg + "\n";
}
output.append(msg);
output.setCaretPosition(ou
tput.getDo
cument().g
etLength()
); /* scroll to the end */
}
private class Reader extends Thread {
public Reader() {
}
public void run() {
while (true) {
try {
Message msg = (Message) in.readObject();
addText((String) msg.getData().get(Message.
KEY_TEXT))
;
} catch (IOException e) {
System.out.println("Error reading 1");
break;
} catch (ClassNotFoundException e) {
System.out.println("Error reading 2");
break;
}
}
setTitle(username);
try {
in.close();
out.close();
currentStatus = AgentData.STATUS_AVAILABLE
;
updateBroker();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
Broker Class
import javax.swing.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream
;
import java.util.ArrayList;
import java.util.TreeMap;
public class Broker extends Thread {
private static final int BROKER_PORT = 8989;
private ArrayList agentList;
private ServerSocket server;
public static void main(String[] args) {
Broker b = new Broker();// allocate a new thread object
b.start();// Causes this thread to begin execution
}
public Broker() {
try {
agentList = new ArrayList();
server = new ServerSocket(BROKER_PORT);
} catch (IOException e) {
JOptionPane.showMessageDia
log(null, "Error creating server broker");
}
}
public void run() {// the main loop of the server
while (true) {
try {
Socket s = null;
s = server.accept();
processRequest(s);
} catch (IOException e) {
}
}
}
private void processAgentUpdate(ObjectO
utputStrea
m out, Message msg) {
/* Validate login for the time being is just to check it's not already logged */
String username = (String) msg.getData().get(Message.
KEY_USERNA
ME);
String host = (String) msg.getData().get(Message.
KEY_HOST);
Integer port = ((Integer) msg.getData().get(Message.
KEY_PORT))
;
Integer status = ((Integer) msg.getData().get(Message.
KEY_AGENT_
STATUS));
System.out.println("Proces
sing update for: " + username + " setting status " + status);
boolean found = false;
for (int i = 0; i < agentList.size(); i++) {
System.out.println("Compar
ing with ->" + username + "<-");
System.out.println("Compar
ing with ->" + ((AgentData) agentList.get(i)).getUsern
ame() + "<-");
if (((AgentData) agentList.get(i)).getUsern
ame().equa
ls(usernam
e)) {
AgentData current = (AgentData) agentList.get(i);
/* Update agent data */
if (host != null) {
current.setHostname(host);
}
if (port != null) {
current.setPort(port.intVa
lue());
}
if (status != null) {
current.setStatus(status.i
ntValue())
;
}
agentList.set(i, current);
found = true;
break;
}
}
if (!found) {
System.out.println("Adding
to agent list username " + username + " host " + host);
agentList.add(new AgentData(username, host, port.intValue(), AgentData.STATUS_AVAILABLE
));
}
TreeMap data = new TreeMap();
data.put(Message.KEY_CMD_R
ESULT, Codes.RESULT_SUCCESSFULL);
msg = new Message(Message.CMD_AGENT_
UPATE_RESP
ONSE, data);
try {
out.writeObject(msg);
out.close();
} catch (IOException e) {
}
}
private void processConnect(ObjectOutpu
tStream out, Message msg) {
String username = (String) msg.getData().get(Message.
KEY_USERNA
ME);
String usernameDest = (String) msg.getData().get(Message.
KEY_USERNA
ME_DESTINA
TION);
for (int i = 0; i < agentList.size(); i++) {
if (((AgentData) agentList.get(i)).getUsern
ame().equa
ls(usernam
e)) {
System.out.println("Chanin
g sattus of " + username);
AgentData current = (AgentData) agentList.get(i);
current.setStatus(AgentDat
a.STATUS_O
N_SESSION)
;
agentList.set(i, current);
}
if (((AgentData) agentList.get(i)).getUsern
ame().equa
ls(usernam
eDest)) {
System.out.println("Chanin
g sattus of " + usernameDest);
AgentData current = (AgentData) agentList.get(i);
current.setStatus(AgentDat
a.STATUS_O
N_SESSION)
;
agentList.set(i, current);
}
}
TreeMap data = new TreeMap();
data.put(Message.KEY_CMD_R
ESULT, Codes.RESULT_SUCCESSFULL);
msg = new Message(Message.CMD_CONNEC
T_RESPONSE
, data);
try {
out.writeObject(msg);
out.close();
} catch (IOException e) {
}
}
private void processGetUser(ObjectOutpu
tStream out, Message msg) {
System.out.println("Proces
sing Get User List");
TreeMap data = new TreeMap();
data.put(Message.KEY_CMD_R
ESULT, Codes.RESULT_SUCCESSFULL);
data.put(Message.KEY_USER_
LIST, agentList);
for (int i = 0; i < agentList.size(); i++) {
System.out.println("list["
+ i + " ] = Username = " + ((AgentData) agentList.get(i)).getUsern
ame() + " status = " + ((AgentData) agentList.get(i)).getStatu
s());
}
msg = new Message(Message.CMD_GET_US
ERLIST_RES
ULT, data);
try {
out.writeObject(msg);
out.close();
} catch (IOException e) {
}
}
private void processRequest(Socket s) {
try {
ObjectInputStream in = new ObjectInputStream(s.getInp
utStream()
);
ObjectOutputStream out = new ObjectOutputStream(s.getOu
tputStream
());
Message msg = (Message) in.readObject();
if (msg.getCmd() == Message.CMD_GET_USERLIST) {
processGetUser(out, msg);
} else if (msg.getCmd() == Message.CMD_CONNECT) {
processConnect(out, msg);
} else if (msg.getCmd() == Message.CMD_AGENT_UPATE) {
processAgentUpdate(out, msg);
} else {
JOptionPane.showMessageDia
log(null, "Invalid command");
}
in.close();
out.close();
} catch (IOException e) {
JOptionPane.showMessageDia
log(null, "Error reading from input stream");
} catch (ClassNotFoundException e) {
JOptionPane.showMessageDia
log(null, "Error reading from input stream");
}
}
}