• Status: Solved
  • Priority: Medium
  • Security: Public
  • Views: 773
  • Last Modified:

durable topic subscriber

hi,

i am trying to create durable subscribers so that when a message is published but the subscriber is not alive, when the subscriber comes alive (connected), it will get the message.  right now, unless the subscriber is already alive, it's not getting the message.

attached is my code, don't pay attention to the class name, i am trying to publish a temporary queue name, the subscribe will then use the temp queue name to do ptp messaging:

does the publisher need to continuously publish the messge, ie in a while(1) loop?



/*
 * MyTempQSender2.java
 *
 * Created on November 14, 2007, 6:15 PM
 *
 * To change this template, choose Tools | Template Manager
 * and open the template in the editor.
 */
 
/**
 *
 * @author mlam
 */
import javax.jms.*;
import javax.naming.*;
import java.util.*;
import javax.jms.Queue;
import javax.jms.JMSException;
import javax.jms.Message;
import java.net.*;
import java.io.*;
 
public class MyTempQSender2 extends Thread  
{    
    //Initial context factory
    private String CTX_FACT; // = "org.exolab.jms.jndi.InitialContextFactory";
    //Provider URL
    private String PROV_URL; // = "rmi://64.139.12.19:1099/";
    private String TOPIC_CONN_FACT;
    private String TOPIC_NAME;
 
    private int memberPort;
    private String host;
    private int port;
    
    String                  fileName = null;
    Context                 jndiContext = null;
    TopicConnectionFactory  topicConnectionFactory = null;
    TopicConnection         topicConnection = null;
    TopicSession            topicSession = null;
    Topic topic = null;
    TopicSubscriber subscriber = null;
    
    TextMessage             message = null;
      
    private Properties loadProperties(String filename) throws IOException
    {
        Properties p = null;
        try
        {
            p = new Properties();
            java.net.URL url = ClassLoader.getSystemResource(filename);
            p.load(url.openStream());
        }
        catch(IOException x)
        {
            x.printStackTrace();
        }
        catch(Exception x)
        {
            x.printStackTrace();
        }
        finally
        {
            return p;
        }
    }
 
 
    /** Creates a new instance of MyTempQSender2 */
    
    public MyTempQSender2() 
    {
        try
        {
 
            /*
             *
            TN.memberPort = 65003
            TN.host = localhost
            TN.port = 65002
            TN.InitContextFactory = org.exolab.jms.jndi.InitialContextFactory
            TN.ProviderURL = rmi://localhost:1099/
            TN.QueueConnFactory = JmsQueueConnectionFactory  
            TN.QueueName = TNQueue
            */
            
            CTX_FACT = "org.exolab.jms.jndi.InitialContextFactory"; 
            PROV_URL= "rmi://localhost:1099/";
            TOPIC_CONN_FACT = "JmsTopicConnectionFactory";
            TOPIC_NAME = "TNTopic";
 
 
            memberPort = 65003;
            host = "localhost";
            port = 65002;
        }
        catch(Exception x)
        {
        }
 
    }
 
    public MyTempQSender2(String fName) 
    {
        try
        {
 
            fileName = fName;
 
            Properties p = loadProperties(fName);
 
            CTX_FACT = p.getProperty("TN.InitContextFactory"); 
            PROV_URL= p.getProperty("TN.ProviderURL");
            TOPIC_CONN_FACT = p.getProperty("TN.QueueConnFactory");
            TOPIC_NAME = p.getProperty("TN.QueueName");
 
            memberPort = Integer.parseInt(p.getProperty("TN.memberPort"));
            host = p.getProperty("TN.host");
            port = Integer.parseInt(p.getProperty("TN.port"));
        }
        catch(Exception x)
        {
        }
 
    }
    
    public MyTempQSender2(String initContextFactory, String providerURL, 
            String queueConnFactory, String queueName) 
    {
        try
        {
            CTX_FACT = initContextFactory; 
            PROV_URL= providerURL;
            TOPIC_CONN_FACT = queueConnFactory;
            TOPIC_NAME = queueName;
 
        }
        catch(Exception x)
        {           
        }
    }
       
    public void run()
    {
        try
        {
 
            System.out.println("Topic name is " + TOPIC_NAME);
 
            /* 
            * Create a JNDI API InitialContext object if none exists
            * yet.
            */
            Properties prop = new Properties();
            //Add the initial context factory
            prop.put(Context.INITIAL_CONTEXT_FACTORY, CTX_FACT);
            //Add the provider URL
            prop.put(Context.PROVIDER_URL, PROV_URL);
 
            try 
            {
                jndiContext = new InitialContext(prop);
            } 
            catch (NamingException e) 
            {
                System.out.println("Could not create JNDI API " +
                "context: " + e.toString());
 
                System.exit(1);
            }
 
            /* 
            * Look up connection factory and queue.  If either does
            * not exist, exit.
            */
            try 
            {
                topicConnectionFactory = (TopicConnectionFactory)
                jndiContext.lookup(TOPIC_CONN_FACT);
                topic = (Topic) jndiContext.lookup(TOPIC_NAME);                //queue1 = (javax.jms.Queue)jndiContext.lookup("MyQueue1");
            } 
            catch (NamingException e) 
            {
                System.out.println("JNDI API lookup failed: " +
                e.toString());
 
                System.exit(1);
            }
 
            /*
            * Create connection.
            * Create session from connection; false means session is
            * not transacted.
            */
            try 
            {
                topicConnection = topicConnectionFactory.createTopicConnection();
                topicSession = topicConnection.createTopicSession(false, 
                        Session.AUTO_ACKNOWLEDGE);
                
                //subscriber = topicSession.createSubscriber(topic); 
                
                Random generator = new Random();
                int r = generator.nextInt();
                
                String sTopicName = "topic" + Integer.toString(r);   
                
                System.out.println("subscription name: " + sTopicName);
                subscriber = topicSession.createDurableSubscriber(topic, sTopicName);
 
                topicConnection.start();
 
                while (true) 
                {
                    Message m = subscriber.receive(1);
                    if (m != null)
                    {
                        if (m instanceof TextMessage)
                        {
                            String s = new String();
                            String s1 = new String();
                            TextMessage text = (TextMessage) m;
                            s1 = text.getText();
                            s = m.getStringProperty("transID").toString();
                            System.out.println(s);
                            
                        }
                    }
                }
 
            } 
            catch (JMSException e) 
            {
                System.out.println("Exception occurred: " + 
                e.toString());
            } 
            finally 
            {
                if(subscriber!=null)
                {
                    try 
                    {
                        subscriber.close();
                    } 
                    catch (JMSException x) 
                    {
                    }
                }
                if(topicSession != null)
                {
                    try 
                    {
                        topicSession.unsubscribe("");
                        topicSession.close();
                    } 
                    catch (JMSException x) 
                    {
                    }
                }
                if (topicConnection != null) 
                {
                    try 
                    {
                        topicConnection.close();
                    } 
                    catch (JMSException x) 
                    {
                    }
                }
            }
 
        }
        catch(Exception x)
        {
        }
    }
    
    public static void main(String args[])
    {
        MyTempQSender2 receiver = new MyTempQSender2();
        receiver.start();
    }
 
}

Open in new window

0
mmingfeilam
Asked:
mmingfeilam
  • 3
  • 2
1 Solution
 
ysnkyCommented:
>>does the publisher need to continuously publish the messge, ie in a while(1) loop?
if you mean to be taken message by unconnected clients, no you must not.

in a durable queue message will be stored until the client get it and then it will be deleted.
in a durable topic message will ve stored until all clients get it and then it will be deleted.
0
 
mmingfeilamAuthor Commented:
here is my publisher:

for some reason my subscriber is not getting the message.
/*
 * MyTempQSender2.java
 *
 * Created on November 14, 2007, 6:15 PM
 *
 * To change this template, choose Tools | Template Manager
 * and open the template in the editor.
 */
 
/**
 *
 * @author mlam
 */
import javax.jms.*;
import javax.naming.*;
import java.util.*;
import javax.jms.Queue;
import javax.jms.JMSException;
import javax.jms.Message;
import java.net.*;
import java.io.*;
 
public class MyTempQSender2 extends Thread  
{    
    //Initial context factory
    private String CTX_FACT; // = "org.exolab.jms.jndi.InitialContextFactory";
    //Provider URL
    private String PROV_URL; // = "rmi://64.139.12.19:1099/";
    private String TOPIC_CONN_FACT;
    private String TOPIC_NAME;
 
    private int memberPort;
    private String host;
    private int port;
    
    String                  fileName = null;
    Context                 jndiContext = null;
    TopicConnectionFactory  topicConnectionFactory = null;
    TopicConnection         topicConnection = null;
    TopicSession            topicSession = null;
    Topic topic = null;
    TopicSubscriber subscriber = null;
    
    TextMessage             message = null;
      
    private Properties loadProperties(String filename) throws IOException
    {
        Properties p = null;
        try
        {
            p = new Properties();
            java.net.URL url = ClassLoader.getSystemResource(filename);
            p.load(url.openStream());
        }
        catch(IOException x)
        {
            x.printStackTrace();
        }
        catch(Exception x)
        {
            x.printStackTrace();
        }
        finally
        {
            return p;
        }
    }
 
 
    /** Creates a new instance of MyTempQSender2 */
    
    public MyTempQSender2() 
    {
        try
        {
 
            /*
             *
            TN.memberPort = 65003
            TN.host = localhost
            TN.port = 65002
            TN.InitContextFactory = org.exolab.jms.jndi.InitialContextFactory
            TN.ProviderURL = rmi://localhost:1099/
            TN.QueueConnFactory = JmsQueueConnectionFactory  
            TN.QueueName = TNQueue
            */
            
            CTX_FACT = "org.exolab.jms.jndi.InitialContextFactory"; 
            PROV_URL= "rmi://localhost:1099/";
            TOPIC_CONN_FACT = "JmsTopicConnectionFactory";
            TOPIC_NAME = "TNTopic";
 
 
            memberPort = 65003;
            host = "localhost";
            port = 65002;
        }
        catch(Exception x)
        {
        }
 
    }
 
    public MyTempQSender2(String fName) 
    {
        try
        {
 
            fileName = fName;
 
            Properties p = loadProperties(fName);
 
            CTX_FACT = p.getProperty("TN.InitContextFactory"); 
            PROV_URL= p.getProperty("TN.ProviderURL");
            TOPIC_CONN_FACT = p.getProperty("TN.QueueConnFactory");
            TOPIC_NAME = p.getProperty("TN.QueueName");
 
            memberPort = Integer.parseInt(p.getProperty("TN.memberPort"));
            host = p.getProperty("TN.host");
            port = Integer.parseInt(p.getProperty("TN.port"));
        }
        catch(Exception x)
        {
        }
 
    }
    
    public MyTempQSender2(String initContextFactory, String providerURL, 
            String queueConnFactory, String queueName) 
    {
        try
        {
            CTX_FACT = initContextFactory; 
            PROV_URL= providerURL;
            TOPIC_CONN_FACT = queueConnFactory;
            TOPIC_NAME = queueName;
 
        }
        catch(Exception x)
        {           
        }
    }
       
    public void run()
    {
        try
        {
 
            System.out.println("Topic name is " + TOPIC_NAME);
 
            /* 
            * Create a JNDI API InitialContext object if none exists
            * yet.
            */
            Properties prop = new Properties();
            //Add the initial context factory
            prop.put(Context.INITIAL_CONTEXT_FACTORY, CTX_FACT);
            //Add the provider URL
            prop.put(Context.PROVIDER_URL, PROV_URL);
 
            try 
            {
                jndiContext = new InitialContext(prop);
            } 
            catch (NamingException e) 
            {
                System.out.println("Could not create JNDI API " +
                "context: " + e.toString());
 
                System.exit(1);
            }
 
            /* 
            * Look up connection factory and queue.  If either does
            * not exist, exit.
            */
            try 
            {
                topicConnectionFactory = (TopicConnectionFactory)
                jndiContext.lookup(TOPIC_CONN_FACT);
                topic = (Topic) jndiContext.lookup(TOPIC_NAME);                //queue1 = (javax.jms.Queue)jndiContext.lookup("MyQueue1");
            } 
            catch (NamingException e) 
            {
                System.out.println("JNDI API lookup failed: " +
                e.toString());
 
                System.exit(1);
            }
 
            /*
            * Create connection.
            * Create session from connection; false means session is
            * not transacted.
            */
            try 
            {
                topicConnection = topicConnectionFactory.createTopicConnection();
                topicSession = topicConnection.createTopicSession(false, 
                        Session.AUTO_ACKNOWLEDGE);
                
                //subscriber = topicSession.createSubscriber(topic); 
                
                Random generator = new Random();
                int r = generator.nextInt();
                
                String sTopicName = "topic" + Integer.toString(r);   
                
                System.out.println("subscription name: " + sTopicName);
                subscriber = topicSession.createDurableSubscriber(topic, sTopicName);
 
                topicConnection.start();
 
                while (true) 
                {
                    Message m = subscriber.receive(1);
                    if (m != null)
                    {
                        if (m instanceof TextMessage)
                        {
                            String s = new String();
                            String s1 = new String();
                            TextMessage text = (TextMessage) m;
                            s1 = text.getText();
                            s = m.getStringProperty("transID").toString();
                            System.out.println(s);
                            
                        }
                    }
                }
 
            } 
            catch (JMSException e) 
            {
                System.out.println("Exception occurred: " + 
                e.toString());
            } 
            finally 
            {
                if(subscriber!=null)
                {
                    try 
                    {
                        subscriber.close();
                    } 
                    catch (JMSException x) 
                    {
                    }
                }
                if(topicSession != null)
                {
                    try 
                    {
                        topicSession.unsubscribe("");
                        topicSession.close();
                    } 
                    catch (JMSException x) 
                    {
                    }
                }
                if (topicConnection != null) 
                {
                    try 
                    {
                        topicConnection.close();
                    } 
                    catch (JMSException x) 
                    {
                    }
                }
            }
 
        }
        catch(Exception x)
        {
        }
    }
    
    public static void main(String args[])
    {
        MyTempQSender2 receiver = new MyTempQSender2();
        receiver.start();
    }
 
}

Open in new window

0
 
mmingfeilamAuthor Commented:
vsnky,

you mentioned that:

in a durable topic message will ve stored until all clients get it and then it will be deleted.

sounds like the publisher has to be aware ahead of time who the clients are, which in turn means the clients has to be alive/connected.  any client that subscribes to a topic after the publisher has already published a message will not get that message.
0
 
ysnkyCommented:
publisher publish any message and checks all subscribers which subscribed before, get it successfully. if so it delete message, if not wait until it is gotton by all.
0
 
mmingfeilamAuthor Commented:
but how does the publisher know who the subscribers are:

i think only after this:

               topicConnection = topicConnectionFactory.createTopicConnection();
                topicSession = topicConnection.createTopicSession(false,
                        Session.AUTO_ACKNOWLEDGE);
               
                subscriber = topicSession.createDurableSubscriber(topic, sTopicName);

                topicConnection.start();

this mean that the subscriber is alive, as the connection has already been started.
0

Featured Post

Never miss a deadline with monday.com

The revolutionary project management tool is here!   Plan visually with a single glance and make sure your projects get done.

  • 3
  • 2
Tackle projects and never again get stuck behind a technical roadblock.
Join Now