Solved

durable topic subscriber

Posted on 2007-11-14
5
744 Views
Last Modified: 2008-02-14
hi,

i am trying to create durable subscribers so that when a message is published but the subscriber is not alive, when the subscriber comes alive (connected), it will get the message.  right now, unless the subscriber is already alive, it's not getting the message.

attached is my code, don't pay attention to the class name, i am trying to publish a temporary queue name, the subscribe will then use the temp queue name to do ptp messaging:

does the publisher need to continuously publish the messge, ie in a while(1) loop?



/*

 * MyTempQSender2.java

 *

 * Created on November 14, 2007, 6:15 PM

 *

 * To change this template, choose Tools | Template Manager

 * and open the template in the editor.

 */
 

/**

 *

 * @author mlam

 */

import javax.jms.*;

import javax.naming.*;

import java.util.*;

import javax.jms.Queue;

import javax.jms.JMSException;

import javax.jms.Message;

import java.net.*;

import java.io.*;
 

public class MyTempQSender2 extends Thread  

{    

    //Initial context factory

    private String CTX_FACT; // = "org.exolab.jms.jndi.InitialContextFactory";

    //Provider URL

    private String PROV_URL; // = "rmi://64.139.12.19:1099/";

    private String TOPIC_CONN_FACT;

    private String TOPIC_NAME;
 

    private int memberPort;

    private String host;

    private int port;

    

    String                  fileName = null;

    Context                 jndiContext = null;

    TopicConnectionFactory  topicConnectionFactory = null;

    TopicConnection         topicConnection = null;

    TopicSession            topicSession = null;

    Topic topic = null;

    TopicSubscriber subscriber = null;

    

    TextMessage             message = null;

      

    private Properties loadProperties(String filename) throws IOException

    {

        Properties p = null;

        try

        {

            p = new Properties();

            java.net.URL url = ClassLoader.getSystemResource(filename);

            p.load(url.openStream());

        }

        catch(IOException x)

        {

            x.printStackTrace();

        }

        catch(Exception x)

        {

            x.printStackTrace();

        }

        finally

        {

            return p;

        }

    }
 
 

    /** Creates a new instance of MyTempQSender2 */

    

    public MyTempQSender2() 

    {

        try

        {
 

            /*

             *

            TN.memberPort = 65003

            TN.host = localhost

            TN.port = 65002

            TN.InitContextFactory = org.exolab.jms.jndi.InitialContextFactory

            TN.ProviderURL = rmi://localhost:1099/

            TN.QueueConnFactory = JmsQueueConnectionFactory  

            TN.QueueName = TNQueue

            */

            

            CTX_FACT = "org.exolab.jms.jndi.InitialContextFactory"; 

            PROV_URL= "rmi://localhost:1099/";

            TOPIC_CONN_FACT = "JmsTopicConnectionFactory";

            TOPIC_NAME = "TNTopic";
 
 

            memberPort = 65003;

            host = "localhost";

            port = 65002;

        }

        catch(Exception x)

        {

        }
 

    }

 

    public MyTempQSender2(String fName) 

    {

        try

        {
 

            fileName = fName;
 

            Properties p = loadProperties(fName);
 

            CTX_FACT = p.getProperty("TN.InitContextFactory"); 

            PROV_URL= p.getProperty("TN.ProviderURL");

            TOPIC_CONN_FACT = p.getProperty("TN.QueueConnFactory");

            TOPIC_NAME = p.getProperty("TN.QueueName");
 

            memberPort = Integer.parseInt(p.getProperty("TN.memberPort"));

            host = p.getProperty("TN.host");

            port = Integer.parseInt(p.getProperty("TN.port"));

        }

        catch(Exception x)

        {

        }
 

    }

    

    public MyTempQSender2(String initContextFactory, String providerURL, 

            String queueConnFactory, String queueName) 

    {

        try

        {

            CTX_FACT = initContextFactory; 

            PROV_URL= providerURL;

            TOPIC_CONN_FACT = queueConnFactory;

            TOPIC_NAME = queueName;
 

        }

        catch(Exception x)

        {           

        }

    }

       

    public void run()

    {

        try

        {

 

            System.out.println("Topic name is " + TOPIC_NAME);

 

            /* 

            * Create a JNDI API InitialContext object if none exists

            * yet.

            */

            Properties prop = new Properties();

            //Add the initial context factory

            prop.put(Context.INITIAL_CONTEXT_FACTORY, CTX_FACT);

            //Add the provider URL

            prop.put(Context.PROVIDER_URL, PROV_URL);
 

            try 

            {

                jndiContext = new InitialContext(prop);

            } 

            catch (NamingException e) 

            {

                System.out.println("Could not create JNDI API " +

                "context: " + e.toString());
 

                System.exit(1);

            }
 

            /* 

            * Look up connection factory and queue.  If either does

            * not exist, exit.

            */

            try 

            {

                topicConnectionFactory = (TopicConnectionFactory)

                jndiContext.lookup(TOPIC_CONN_FACT);

                topic = (Topic) jndiContext.lookup(TOPIC_NAME);                //queue1 = (javax.jms.Queue)jndiContext.lookup("MyQueue1");

            } 

            catch (NamingException e) 

            {

                System.out.println("JNDI API lookup failed: " +

                e.toString());

 

                System.exit(1);

            }
 

            /*

            * Create connection.

            * Create session from connection; false means session is

            * not transacted.

            */

            try 

            {

                topicConnection = topicConnectionFactory.createTopicConnection();

                topicSession = topicConnection.createTopicSession(false, 

                        Session.AUTO_ACKNOWLEDGE);

                

                //subscriber = topicSession.createSubscriber(topic); 

                

                Random generator = new Random();

                int r = generator.nextInt();

                

                String sTopicName = "topic" + Integer.toString(r);   

                

                System.out.println("subscription name: " + sTopicName);

                subscriber = topicSession.createDurableSubscriber(topic, sTopicName);
 

                topicConnection.start();
 

                while (true) 

                {

                    Message m = subscriber.receive(1);

                    if (m != null)

                    {

                        if (m instanceof TextMessage)

                        {

                            String s = new String();

                            String s1 = new String();

                            TextMessage text = (TextMessage) m;

                            s1 = text.getText();

                            s = m.getStringProperty("transID").toString();

                            System.out.println(s);

                            

                        }

                    }

                }
 

            } 

            catch (JMSException e) 

            {

                System.out.println("Exception occurred: " + 

                e.toString());

            } 

            finally 

            {

                if(subscriber!=null)

                {

                    try 

                    {

                        subscriber.close();

                    } 

                    catch (JMSException x) 

                    {

                    }

                }

                if(topicSession != null)

                {

                    try 

                    {

                        topicSession.unsubscribe("");

                        topicSession.close();

                    } 

                    catch (JMSException x) 

                    {

                    }

                }

                if (topicConnection != null) 

                {

                    try 

                    {

                        topicConnection.close();

                    } 

                    catch (JMSException x) 

                    {

                    }

                }

            }
 

        }

        catch(Exception x)

        {

        }

    }

    

    public static void main(String args[])

    {

        MyTempQSender2 receiver = new MyTempQSender2();

        receiver.start();

    }
 

}

Open in new window

0
Comment
Question by:mmingfeilam
  • 3
  • 2
5 Comments
 
LVL 9

Expert Comment

by:ysnky
ID: 20287235
>>does the publisher need to continuously publish the messge, ie in a while(1) loop?
if you mean to be taken message by unconnected clients, no you must not.

in a durable queue message will be stored until the client get it and then it will be deleted.
in a durable topic message will ve stored until all clients get it and then it will be deleted.
0
 

Author Comment

by:mmingfeilam
ID: 20287448
here is my publisher:

for some reason my subscriber is not getting the message.
/*

 * MyTempQSender2.java

 *

 * Created on November 14, 2007, 6:15 PM

 *

 * To change this template, choose Tools | Template Manager

 * and open the template in the editor.

 */
 

/**

 *

 * @author mlam

 */

import javax.jms.*;

import javax.naming.*;

import java.util.*;

import javax.jms.Queue;

import javax.jms.JMSException;

import javax.jms.Message;

import java.net.*;

import java.io.*;
 

public class MyTempQSender2 extends Thread  

{    

    //Initial context factory

    private String CTX_FACT; // = "org.exolab.jms.jndi.InitialContextFactory";

    //Provider URL

    private String PROV_URL; // = "rmi://64.139.12.19:1099/";

    private String TOPIC_CONN_FACT;

    private String TOPIC_NAME;
 

    private int memberPort;

    private String host;

    private int port;

    

    String                  fileName = null;

    Context                 jndiContext = null;

    TopicConnectionFactory  topicConnectionFactory = null;

    TopicConnection         topicConnection = null;

    TopicSession            topicSession = null;

    Topic topic = null;

    TopicSubscriber subscriber = null;

    

    TextMessage             message = null;

      

    private Properties loadProperties(String filename) throws IOException

    {

        Properties p = null;

        try

        {

            p = new Properties();

            java.net.URL url = ClassLoader.getSystemResource(filename);

            p.load(url.openStream());

        }

        catch(IOException x)

        {

            x.printStackTrace();

        }

        catch(Exception x)

        {

            x.printStackTrace();

        }

        finally

        {

            return p;

        }

    }
 
 

    /** Creates a new instance of MyTempQSender2 */

    

    public MyTempQSender2() 

    {

        try

        {
 

            /*

             *

            TN.memberPort = 65003

            TN.host = localhost

            TN.port = 65002

            TN.InitContextFactory = org.exolab.jms.jndi.InitialContextFactory

            TN.ProviderURL = rmi://localhost:1099/

            TN.QueueConnFactory = JmsQueueConnectionFactory  

            TN.QueueName = TNQueue

            */

            

            CTX_FACT = "org.exolab.jms.jndi.InitialContextFactory"; 

            PROV_URL= "rmi://localhost:1099/";

            TOPIC_CONN_FACT = "JmsTopicConnectionFactory";

            TOPIC_NAME = "TNTopic";
 
 

            memberPort = 65003;

            host = "localhost";

            port = 65002;

        }

        catch(Exception x)

        {

        }
 

    }

 

    public MyTempQSender2(String fName) 

    {

        try

        {
 

            fileName = fName;
 

            Properties p = loadProperties(fName);
 

            CTX_FACT = p.getProperty("TN.InitContextFactory"); 

            PROV_URL= p.getProperty("TN.ProviderURL");

            TOPIC_CONN_FACT = p.getProperty("TN.QueueConnFactory");

            TOPIC_NAME = p.getProperty("TN.QueueName");
 

            memberPort = Integer.parseInt(p.getProperty("TN.memberPort"));

            host = p.getProperty("TN.host");

            port = Integer.parseInt(p.getProperty("TN.port"));

        }

        catch(Exception x)

        {

        }
 

    }

    

    public MyTempQSender2(String initContextFactory, String providerURL, 

            String queueConnFactory, String queueName) 

    {

        try

        {

            CTX_FACT = initContextFactory; 

            PROV_URL= providerURL;

            TOPIC_CONN_FACT = queueConnFactory;

            TOPIC_NAME = queueName;
 

        }

        catch(Exception x)

        {           

        }

    }

       

    public void run()

    {

        try

        {

 

            System.out.println("Topic name is " + TOPIC_NAME);

 

            /* 

            * Create a JNDI API InitialContext object if none exists

            * yet.

            */

            Properties prop = new Properties();

            //Add the initial context factory

            prop.put(Context.INITIAL_CONTEXT_FACTORY, CTX_FACT);

            //Add the provider URL

            prop.put(Context.PROVIDER_URL, PROV_URL);
 

            try 

            {

                jndiContext = new InitialContext(prop);

            } 

            catch (NamingException e) 

            {

                System.out.println("Could not create JNDI API " +

                "context: " + e.toString());
 

                System.exit(1);

            }
 

            /* 

            * Look up connection factory and queue.  If either does

            * not exist, exit.

            */

            try 

            {

                topicConnectionFactory = (TopicConnectionFactory)

                jndiContext.lookup(TOPIC_CONN_FACT);

                topic = (Topic) jndiContext.lookup(TOPIC_NAME);                //queue1 = (javax.jms.Queue)jndiContext.lookup("MyQueue1");

            } 

            catch (NamingException e) 

            {

                System.out.println("JNDI API lookup failed: " +

                e.toString());

 

                System.exit(1);

            }
 

            /*

            * Create connection.

            * Create session from connection; false means session is

            * not transacted.

            */

            try 

            {

                topicConnection = topicConnectionFactory.createTopicConnection();

                topicSession = topicConnection.createTopicSession(false, 

                        Session.AUTO_ACKNOWLEDGE);

                

                //subscriber = topicSession.createSubscriber(topic); 

                

                Random generator = new Random();

                int r = generator.nextInt();

                

                String sTopicName = "topic" + Integer.toString(r);   

                

                System.out.println("subscription name: " + sTopicName);

                subscriber = topicSession.createDurableSubscriber(topic, sTopicName);
 

                topicConnection.start();
 

                while (true) 

                {

                    Message m = subscriber.receive(1);

                    if (m != null)

                    {

                        if (m instanceof TextMessage)

                        {

                            String s = new String();

                            String s1 = new String();

                            TextMessage text = (TextMessage) m;

                            s1 = text.getText();

                            s = m.getStringProperty("transID").toString();

                            System.out.println(s);

                            

                        }

                    }

                }
 

            } 

            catch (JMSException e) 

            {

                System.out.println("Exception occurred: " + 

                e.toString());

            } 

            finally 

            {

                if(subscriber!=null)

                {

                    try 

                    {

                        subscriber.close();

                    } 

                    catch (JMSException x) 

                    {

                    }

                }

                if(topicSession != null)

                {

                    try 

                    {

                        topicSession.unsubscribe("");

                        topicSession.close();

                    } 

                    catch (JMSException x) 

                    {

                    }

                }

                if (topicConnection != null) 

                {

                    try 

                    {

                        topicConnection.close();

                    } 

                    catch (JMSException x) 

                    {

                    }

                }

            }
 

        }

        catch(Exception x)

        {

        }

    }

    

    public static void main(String args[])

    {

        MyTempQSender2 receiver = new MyTempQSender2();

        receiver.start();

    }
 

}

Open in new window

0
 

Author Comment

by:mmingfeilam
ID: 20287556
vsnky,

you mentioned that:

in a durable topic message will ve stored until all clients get it and then it will be deleted.

sounds like the publisher has to be aware ahead of time who the clients are, which in turn means the clients has to be alive/connected.  any client that subscribes to a topic after the publisher has already published a message will not get that message.
0
 
LVL 9

Accepted Solution

by:
ysnky earned 250 total points
ID: 20287785
publisher publish any message and checks all subscribers which subscribed before, get it successfully. if so it delete message, if not wait until it is gotton by all.
0
 

Author Comment

by:mmingfeilam
ID: 20294416
but how does the publisher know who the subscribers are:

i think only after this:

               topicConnection = topicConnectionFactory.createTopicConnection();
                topicSession = topicConnection.createTopicSession(false,
                        Session.AUTO_ACKNOWLEDGE);
               
                subscriber = topicSession.createDurableSubscriber(topic, sTopicName);

                topicConnection.start();

this mean that the subscriber is alive, as the connection has already been started.
0

Featured Post

6 Surprising Benefits of Threat Intelligence

All sorts of threat intelligence is available on the web. Intelligence you can learn from, and use to anticipate and prepare for future attacks.

Join & Write a Comment

Suggested Solutions

For beginner Java programmers or at least those new to the Eclipse IDE, the following tutorial will show some (four) ways in which you can import your Java projects to your Eclipse workbench. Introduction While learning Java can be done with…
By the end of 1980s, object oriented programming using languages like C++, Simula69 and ObjectPascal gained momentum. It looked like programmers finally found the perfect language. C++ successfully combined the object oriented principles of Simula w…
Viewers will learn about basic arrays, how to declare them, and how to use them. Introduction and definition: Declare an array and cover the syntax of declaring them: Initialize every index in the created array: Example/Features of a basic arr…
This tutorial explains how to use the VisualVM tool for the Java platform application. This video goes into detail on the Threads, Sampler, and Profiler tabs.

705 members asked questions and received personalized solutions in the past 7 days.

Join the community of 500,000 technology professionals and ask your questions.

Join & Ask a Question

Need Help in Real-Time?

Connect with top rated Experts

21 Experts available now in Live!

Get 1:1 Help Now