Solved

durable topic subscriber

Posted on 2007-11-14
5
748 Views
Last Modified: 2008-02-14
hi,

i am trying to create durable subscribers so that when a message is published but the subscriber is not alive, when the subscriber comes alive (connected), it will get the message.  right now, unless the subscriber is already alive, it's not getting the message.

attached is my code, don't pay attention to the class name, i am trying to publish a temporary queue name, the subscribe will then use the temp queue name to do ptp messaging:

does the publisher need to continuously publish the messge, ie in a while(1) loop?



/*

 * MyTempQSender2.java

 *

 * Created on November 14, 2007, 6:15 PM

 *

 * To change this template, choose Tools | Template Manager

 * and open the template in the editor.

 */
 

/**

 *

 * @author mlam

 */

import javax.jms.*;

import javax.naming.*;

import java.util.*;

import javax.jms.Queue;

import javax.jms.JMSException;

import javax.jms.Message;

import java.net.*;

import java.io.*;
 

public class MyTempQSender2 extends Thread  

{    

    //Initial context factory

    private String CTX_FACT; // = "org.exolab.jms.jndi.InitialContextFactory";

    //Provider URL

    private String PROV_URL; // = "rmi://64.139.12.19:1099/";

    private String TOPIC_CONN_FACT;

    private String TOPIC_NAME;
 

    private int memberPort;

    private String host;

    private int port;

    

    String                  fileName = null;

    Context                 jndiContext = null;

    TopicConnectionFactory  topicConnectionFactory = null;

    TopicConnection         topicConnection = null;

    TopicSession            topicSession = null;

    Topic topic = null;

    TopicSubscriber subscriber = null;

    

    TextMessage             message = null;

      

    private Properties loadProperties(String filename) throws IOException

    {

        Properties p = null;

        try

        {

            p = new Properties();

            java.net.URL url = ClassLoader.getSystemResource(filename);

            p.load(url.openStream());

        }

        catch(IOException x)

        {

            x.printStackTrace();

        }

        catch(Exception x)

        {

            x.printStackTrace();

        }

        finally

        {

            return p;

        }

    }
 
 

    /** Creates a new instance of MyTempQSender2 */

    

    public MyTempQSender2() 

    {

        try

        {
 

            /*

             *

            TN.memberPort = 65003

            TN.host = localhost

            TN.port = 65002

            TN.InitContextFactory = org.exolab.jms.jndi.InitialContextFactory

            TN.ProviderURL = rmi://localhost:1099/

            TN.QueueConnFactory = JmsQueueConnectionFactory  

            TN.QueueName = TNQueue

            */

            

            CTX_FACT = "org.exolab.jms.jndi.InitialContextFactory"; 

            PROV_URL= "rmi://localhost:1099/";

            TOPIC_CONN_FACT = "JmsTopicConnectionFactory";

            TOPIC_NAME = "TNTopic";
 
 

            memberPort = 65003;

            host = "localhost";

            port = 65002;

        }

        catch(Exception x)

        {

        }
 

    }

 

    public MyTempQSender2(String fName) 

    {

        try

        {
 

            fileName = fName;
 

            Properties p = loadProperties(fName);
 

            CTX_FACT = p.getProperty("TN.InitContextFactory"); 

            PROV_URL= p.getProperty("TN.ProviderURL");

            TOPIC_CONN_FACT = p.getProperty("TN.QueueConnFactory");

            TOPIC_NAME = p.getProperty("TN.QueueName");
 

            memberPort = Integer.parseInt(p.getProperty("TN.memberPort"));

            host = p.getProperty("TN.host");

            port = Integer.parseInt(p.getProperty("TN.port"));

        }

        catch(Exception x)

        {

        }
 

    }

    

    public MyTempQSender2(String initContextFactory, String providerURL, 

            String queueConnFactory, String queueName) 

    {

        try

        {

            CTX_FACT = initContextFactory; 

            PROV_URL= providerURL;

            TOPIC_CONN_FACT = queueConnFactory;

            TOPIC_NAME = queueName;
 

        }

        catch(Exception x)

        {           

        }

    }

       

    public void run()

    {

        try

        {

 

            System.out.println("Topic name is " + TOPIC_NAME);

 

            /* 

            * Create a JNDI API InitialContext object if none exists

            * yet.

            */

            Properties prop = new Properties();

            //Add the initial context factory

            prop.put(Context.INITIAL_CONTEXT_FACTORY, CTX_FACT);

            //Add the provider URL

            prop.put(Context.PROVIDER_URL, PROV_URL);
 

            try 

            {

                jndiContext = new InitialContext(prop);

            } 

            catch (NamingException e) 

            {

                System.out.println("Could not create JNDI API " +

                "context: " + e.toString());
 

                System.exit(1);

            }
 

            /* 

            * Look up connection factory and queue.  If either does

            * not exist, exit.

            */

            try 

            {

                topicConnectionFactory = (TopicConnectionFactory)

                jndiContext.lookup(TOPIC_CONN_FACT);

                topic = (Topic) jndiContext.lookup(TOPIC_NAME);                //queue1 = (javax.jms.Queue)jndiContext.lookup("MyQueue1");

            } 

            catch (NamingException e) 

            {

                System.out.println("JNDI API lookup failed: " +

                e.toString());

 

                System.exit(1);

            }
 

            /*

            * Create connection.

            * Create session from connection; false means session is

            * not transacted.

            */

            try 

            {

                topicConnection = topicConnectionFactory.createTopicConnection();

                topicSession = topicConnection.createTopicSession(false, 

                        Session.AUTO_ACKNOWLEDGE);

                

                //subscriber = topicSession.createSubscriber(topic); 

                

                Random generator = new Random();

                int r = generator.nextInt();

                

                String sTopicName = "topic" + Integer.toString(r);   

                

                System.out.println("subscription name: " + sTopicName);

                subscriber = topicSession.createDurableSubscriber(topic, sTopicName);
 

                topicConnection.start();
 

                while (true) 

                {

                    Message m = subscriber.receive(1);

                    if (m != null)

                    {

                        if (m instanceof TextMessage)

                        {

                            String s = new String();

                            String s1 = new String();

                            TextMessage text = (TextMessage) m;

                            s1 = text.getText();

                            s = m.getStringProperty("transID").toString();

                            System.out.println(s);

                            

                        }

                    }

                }
 

            } 

            catch (JMSException e) 

            {

                System.out.println("Exception occurred: " + 

                e.toString());

            } 

            finally 

            {

                if(subscriber!=null)

                {

                    try 

                    {

                        subscriber.close();

                    } 

                    catch (JMSException x) 

                    {

                    }

                }

                if(topicSession != null)

                {

                    try 

                    {

                        topicSession.unsubscribe("");

                        topicSession.close();

                    } 

                    catch (JMSException x) 

                    {

                    }

                }

                if (topicConnection != null) 

                {

                    try 

                    {

                        topicConnection.close();

                    } 

                    catch (JMSException x) 

                    {

                    }

                }

            }
 

        }

        catch(Exception x)

        {

        }

    }

    

    public static void main(String args[])

    {

        MyTempQSender2 receiver = new MyTempQSender2();

        receiver.start();

    }
 

}

Open in new window

0
Comment
Question by:mmingfeilam
  • 3
  • 2
5 Comments
 
LVL 9

Expert Comment

by:ysnky
ID: 20287235
>>does the publisher need to continuously publish the messge, ie in a while(1) loop?
if you mean to be taken message by unconnected clients, no you must not.

in a durable queue message will be stored until the client get it and then it will be deleted.
in a durable topic message will ve stored until all clients get it and then it will be deleted.
0
 

Author Comment

by:mmingfeilam
ID: 20287448
here is my publisher:

for some reason my subscriber is not getting the message.
/*

 * MyTempQSender2.java

 *

 * Created on November 14, 2007, 6:15 PM

 *

 * To change this template, choose Tools | Template Manager

 * and open the template in the editor.

 */
 

/**

 *

 * @author mlam

 */

import javax.jms.*;

import javax.naming.*;

import java.util.*;

import javax.jms.Queue;

import javax.jms.JMSException;

import javax.jms.Message;

import java.net.*;

import java.io.*;
 

public class MyTempQSender2 extends Thread  

{    

    //Initial context factory

    private String CTX_FACT; // = "org.exolab.jms.jndi.InitialContextFactory";

    //Provider URL

    private String PROV_URL; // = "rmi://64.139.12.19:1099/";

    private String TOPIC_CONN_FACT;

    private String TOPIC_NAME;
 

    private int memberPort;

    private String host;

    private int port;

    

    String                  fileName = null;

    Context                 jndiContext = null;

    TopicConnectionFactory  topicConnectionFactory = null;

    TopicConnection         topicConnection = null;

    TopicSession            topicSession = null;

    Topic topic = null;

    TopicSubscriber subscriber = null;

    

    TextMessage             message = null;

      

    private Properties loadProperties(String filename) throws IOException

    {

        Properties p = null;

        try

        {

            p = new Properties();

            java.net.URL url = ClassLoader.getSystemResource(filename);

            p.load(url.openStream());

        }

        catch(IOException x)

        {

            x.printStackTrace();

        }

        catch(Exception x)

        {

            x.printStackTrace();

        }

        finally

        {

            return p;

        }

    }
 
 

    /** Creates a new instance of MyTempQSender2 */

    

    public MyTempQSender2() 

    {

        try

        {
 

            /*

             *

            TN.memberPort = 65003

            TN.host = localhost

            TN.port = 65002

            TN.InitContextFactory = org.exolab.jms.jndi.InitialContextFactory

            TN.ProviderURL = rmi://localhost:1099/

            TN.QueueConnFactory = JmsQueueConnectionFactory  

            TN.QueueName = TNQueue

            */

            

            CTX_FACT = "org.exolab.jms.jndi.InitialContextFactory"; 

            PROV_URL= "rmi://localhost:1099/";

            TOPIC_CONN_FACT = "JmsTopicConnectionFactory";

            TOPIC_NAME = "TNTopic";
 
 

            memberPort = 65003;

            host = "localhost";

            port = 65002;

        }

        catch(Exception x)

        {

        }
 

    }

 

    public MyTempQSender2(String fName) 

    {

        try

        {
 

            fileName = fName;
 

            Properties p = loadProperties(fName);
 

            CTX_FACT = p.getProperty("TN.InitContextFactory"); 

            PROV_URL= p.getProperty("TN.ProviderURL");

            TOPIC_CONN_FACT = p.getProperty("TN.QueueConnFactory");

            TOPIC_NAME = p.getProperty("TN.QueueName");
 

            memberPort = Integer.parseInt(p.getProperty("TN.memberPort"));

            host = p.getProperty("TN.host");

            port = Integer.parseInt(p.getProperty("TN.port"));

        }

        catch(Exception x)

        {

        }
 

    }

    

    public MyTempQSender2(String initContextFactory, String providerURL, 

            String queueConnFactory, String queueName) 

    {

        try

        {

            CTX_FACT = initContextFactory; 

            PROV_URL= providerURL;

            TOPIC_CONN_FACT = queueConnFactory;

            TOPIC_NAME = queueName;
 

        }

        catch(Exception x)

        {           

        }

    }

       

    public void run()

    {

        try

        {

 

            System.out.println("Topic name is " + TOPIC_NAME);

 

            /* 

            * Create a JNDI API InitialContext object if none exists

            * yet.

            */

            Properties prop = new Properties();

            //Add the initial context factory

            prop.put(Context.INITIAL_CONTEXT_FACTORY, CTX_FACT);

            //Add the provider URL

            prop.put(Context.PROVIDER_URL, PROV_URL);
 

            try 

            {

                jndiContext = new InitialContext(prop);

            } 

            catch (NamingException e) 

            {

                System.out.println("Could not create JNDI API " +

                "context: " + e.toString());
 

                System.exit(1);

            }
 

            /* 

            * Look up connection factory and queue.  If either does

            * not exist, exit.

            */

            try 

            {

                topicConnectionFactory = (TopicConnectionFactory)

                jndiContext.lookup(TOPIC_CONN_FACT);

                topic = (Topic) jndiContext.lookup(TOPIC_NAME);                //queue1 = (javax.jms.Queue)jndiContext.lookup("MyQueue1");

            } 

            catch (NamingException e) 

            {

                System.out.println("JNDI API lookup failed: " +

                e.toString());

 

                System.exit(1);

            }
 

            /*

            * Create connection.

            * Create session from connection; false means session is

            * not transacted.

            */

            try 

            {

                topicConnection = topicConnectionFactory.createTopicConnection();

                topicSession = topicConnection.createTopicSession(false, 

                        Session.AUTO_ACKNOWLEDGE);

                

                //subscriber = topicSession.createSubscriber(topic); 

                

                Random generator = new Random();

                int r = generator.nextInt();

                

                String sTopicName = "topic" + Integer.toString(r);   

                

                System.out.println("subscription name: " + sTopicName);

                subscriber = topicSession.createDurableSubscriber(topic, sTopicName);
 

                topicConnection.start();
 

                while (true) 

                {

                    Message m = subscriber.receive(1);

                    if (m != null)

                    {

                        if (m instanceof TextMessage)

                        {

                            String s = new String();

                            String s1 = new String();

                            TextMessage text = (TextMessage) m;

                            s1 = text.getText();

                            s = m.getStringProperty("transID").toString();

                            System.out.println(s);

                            

                        }

                    }

                }
 

            } 

            catch (JMSException e) 

            {

                System.out.println("Exception occurred: " + 

                e.toString());

            } 

            finally 

            {

                if(subscriber!=null)

                {

                    try 

                    {

                        subscriber.close();

                    } 

                    catch (JMSException x) 

                    {

                    }

                }

                if(topicSession != null)

                {

                    try 

                    {

                        topicSession.unsubscribe("");

                        topicSession.close();

                    } 

                    catch (JMSException x) 

                    {

                    }

                }

                if (topicConnection != null) 

                {

                    try 

                    {

                        topicConnection.close();

                    } 

                    catch (JMSException x) 

                    {

                    }

                }

            }
 

        }

        catch(Exception x)

        {

        }

    }

    

    public static void main(String args[])

    {

        MyTempQSender2 receiver = new MyTempQSender2();

        receiver.start();

    }
 

}

Open in new window

0
 

Author Comment

by:mmingfeilam
ID: 20287556
vsnky,

you mentioned that:

in a durable topic message will ve stored until all clients get it and then it will be deleted.

sounds like the publisher has to be aware ahead of time who the clients are, which in turn means the clients has to be alive/connected.  any client that subscribes to a topic after the publisher has already published a message will not get that message.
0
 
LVL 9

Accepted Solution

by:
ysnky earned 250 total points
ID: 20287785
publisher publish any message and checks all subscribers which subscribed before, get it successfully. if so it delete message, if not wait until it is gotton by all.
0
 

Author Comment

by:mmingfeilam
ID: 20294416
but how does the publisher know who the subscribers are:

i think only after this:

               topicConnection = topicConnectionFactory.createTopicConnection();
                topicSession = topicConnection.createTopicSession(false,
                        Session.AUTO_ACKNOWLEDGE);
               
                subscriber = topicSession.createDurableSubscriber(topic, sTopicName);

                topicConnection.start();

this mean that the subscriber is alive, as the connection has already been started.
0

Featured Post

Is Your Active Directory as Secure as You Think?

More than 75% of all records are compromised because of the loss or theft of a privileged credential. Experts have been exploring Active Directory infrastructure to identify key threats and establish best practices for keeping data safe. Attend this month’s webinar to learn more.

Question has a verified solution.

If you are experiencing a similar issue, please ask a related question

Suggested Solutions

Title # Comments Views Activity
firstswap challenge 20 65
eclipse package explorer vs project explorer view 2 79
Cipher Configuration on Apache HTTPD 4 50
jar file executable 12 38
For customizing the look of your lightweight component and making it look lucid like it was made of glass. Or: how to make your component more Apple-ish ;) This tip assumes your component to be of rectangular shape and completely opaque. (COD…
INTRODUCTION Working with files is a moderately common task in Java.  For most projects hard coding the file names, using parameters in configuration files, or using command-line arguments is sufficient.   However, when your application has vi…
Viewers learn how to read error messages and identify possible mistakes that could cause hours of frustration. Coding is as much about debugging your code as it is about writing it. Define Error Message: Line Numbers: Type of Error: Break Down…
This theoretical tutorial explains exceptions, reasons for exceptions, different categories of exception and exception hierarchy.

911 members asked questions and received personalized solutions in the past 7 days.

Join the community of 500,000 technology professionals and ask your questions.

Join & Ask a Question

Need Help in Real-Time?

Connect with top rated Experts

21 Experts available now in Live!

Get 1:1 Help Now