Solved

durable topic subscriber

Posted on 2007-11-14
5
760 Views
Last Modified: 2008-02-14
hi,

i am trying to create durable subscribers so that when a message is published but the subscriber is not alive, when the subscriber comes alive (connected), it will get the message.  right now, unless the subscriber is already alive, it's not getting the message.

attached is my code, don't pay attention to the class name, i am trying to publish a temporary queue name, the subscribe will then use the temp queue name to do ptp messaging:

does the publisher need to continuously publish the messge, ie in a while(1) loop?



/*
 * MyTempQSender2.java
 *
 * Created on November 14, 2007, 6:15 PM
 *
 * To change this template, choose Tools | Template Manager
 * and open the template in the editor.
 */
 
/**
 *
 * @author mlam
 */
import javax.jms.*;
import javax.naming.*;
import java.util.*;
import javax.jms.Queue;
import javax.jms.JMSException;
import javax.jms.Message;
import java.net.*;
import java.io.*;
 
public class MyTempQSender2 extends Thread  
{    
    //Initial context factory
    private String CTX_FACT; // = "org.exolab.jms.jndi.InitialContextFactory";
    //Provider URL
    private String PROV_URL; // = "rmi://64.139.12.19:1099/";
    private String TOPIC_CONN_FACT;
    private String TOPIC_NAME;
 
    private int memberPort;
    private String host;
    private int port;
    
    String                  fileName = null;
    Context                 jndiContext = null;
    TopicConnectionFactory  topicConnectionFactory = null;
    TopicConnection         topicConnection = null;
    TopicSession            topicSession = null;
    Topic topic = null;
    TopicSubscriber subscriber = null;
    
    TextMessage             message = null;
      
    private Properties loadProperties(String filename) throws IOException
    {
        Properties p = null;
        try
        {
            p = new Properties();
            java.net.URL url = ClassLoader.getSystemResource(filename);
            p.load(url.openStream());
        }
        catch(IOException x)
        {
            x.printStackTrace();
        }
        catch(Exception x)
        {
            x.printStackTrace();
        }
        finally
        {
            return p;
        }
    }
 
 
    /** Creates a new instance of MyTempQSender2 */
    
    public MyTempQSender2() 
    {
        try
        {
 
            /*
             *
            TN.memberPort = 65003
            TN.host = localhost
            TN.port = 65002
            TN.InitContextFactory = org.exolab.jms.jndi.InitialContextFactory
            TN.ProviderURL = rmi://localhost:1099/
            TN.QueueConnFactory = JmsQueueConnectionFactory  
            TN.QueueName = TNQueue
            */
            
            CTX_FACT = "org.exolab.jms.jndi.InitialContextFactory"; 
            PROV_URL= "rmi://localhost:1099/";
            TOPIC_CONN_FACT = "JmsTopicConnectionFactory";
            TOPIC_NAME = "TNTopic";
 
 
            memberPort = 65003;
            host = "localhost";
            port = 65002;
        }
        catch(Exception x)
        {
        }
 
    }
 
    public MyTempQSender2(String fName) 
    {
        try
        {
 
            fileName = fName;
 
            Properties p = loadProperties(fName);
 
            CTX_FACT = p.getProperty("TN.InitContextFactory"); 
            PROV_URL= p.getProperty("TN.ProviderURL");
            TOPIC_CONN_FACT = p.getProperty("TN.QueueConnFactory");
            TOPIC_NAME = p.getProperty("TN.QueueName");
 
            memberPort = Integer.parseInt(p.getProperty("TN.memberPort"));
            host = p.getProperty("TN.host");
            port = Integer.parseInt(p.getProperty("TN.port"));
        }
        catch(Exception x)
        {
        }
 
    }
    
    public MyTempQSender2(String initContextFactory, String providerURL, 
            String queueConnFactory, String queueName) 
    {
        try
        {
            CTX_FACT = initContextFactory; 
            PROV_URL= providerURL;
            TOPIC_CONN_FACT = queueConnFactory;
            TOPIC_NAME = queueName;
 
        }
        catch(Exception x)
        {           
        }
    }
       
    public void run()
    {
        try
        {
 
            System.out.println("Topic name is " + TOPIC_NAME);
 
            /* 
            * Create a JNDI API InitialContext object if none exists
            * yet.
            */
            Properties prop = new Properties();
            //Add the initial context factory
            prop.put(Context.INITIAL_CONTEXT_FACTORY, CTX_FACT);
            //Add the provider URL
            prop.put(Context.PROVIDER_URL, PROV_URL);
 
            try 
            {
                jndiContext = new InitialContext(prop);
            } 
            catch (NamingException e) 
            {
                System.out.println("Could not create JNDI API " +
                "context: " + e.toString());
 
                System.exit(1);
            }
 
            /* 
            * Look up connection factory and queue.  If either does
            * not exist, exit.
            */
            try 
            {
                topicConnectionFactory = (TopicConnectionFactory)
                jndiContext.lookup(TOPIC_CONN_FACT);
                topic = (Topic) jndiContext.lookup(TOPIC_NAME);                //queue1 = (javax.jms.Queue)jndiContext.lookup("MyQueue1");
            } 
            catch (NamingException e) 
            {
                System.out.println("JNDI API lookup failed: " +
                e.toString());
 
                System.exit(1);
            }
 
            /*
            * Create connection.
            * Create session from connection; false means session is
            * not transacted.
            */
            try 
            {
                topicConnection = topicConnectionFactory.createTopicConnection();
                topicSession = topicConnection.createTopicSession(false, 
                        Session.AUTO_ACKNOWLEDGE);
                
                //subscriber = topicSession.createSubscriber(topic); 
                
                Random generator = new Random();
                int r = generator.nextInt();
                
                String sTopicName = "topic" + Integer.toString(r);   
                
                System.out.println("subscription name: " + sTopicName);
                subscriber = topicSession.createDurableSubscriber(topic, sTopicName);
 
                topicConnection.start();
 
                while (true) 
                {
                    Message m = subscriber.receive(1);
                    if (m != null)
                    {
                        if (m instanceof TextMessage)
                        {
                            String s = new String();
                            String s1 = new String();
                            TextMessage text = (TextMessage) m;
                            s1 = text.getText();
                            s = m.getStringProperty("transID").toString();
                            System.out.println(s);
                            
                        }
                    }
                }
 
            } 
            catch (JMSException e) 
            {
                System.out.println("Exception occurred: " + 
                e.toString());
            } 
            finally 
            {
                if(subscriber!=null)
                {
                    try 
                    {
                        subscriber.close();
                    } 
                    catch (JMSException x) 
                    {
                    }
                }
                if(topicSession != null)
                {
                    try 
                    {
                        topicSession.unsubscribe("");
                        topicSession.close();
                    } 
                    catch (JMSException x) 
                    {
                    }
                }
                if (topicConnection != null) 
                {
                    try 
                    {
                        topicConnection.close();
                    } 
                    catch (JMSException x) 
                    {
                    }
                }
            }
 
        }
        catch(Exception x)
        {
        }
    }
    
    public static void main(String args[])
    {
        MyTempQSender2 receiver = new MyTempQSender2();
        receiver.start();
    }
 
}

Open in new window

0
Comment
Question by:mmingfeilam
[X]
Welcome to Experts Exchange

Add your voice to the tech community where 5M+ people just like you are talking about what matters.

  • Help others & share knowledge
  • Earn cash & points
  • Learn & ask questions
  • 3
  • 2
5 Comments
 
LVL 9

Expert Comment

by:ysnky
ID: 20287235
>>does the publisher need to continuously publish the messge, ie in a while(1) loop?
if you mean to be taken message by unconnected clients, no you must not.

in a durable queue message will be stored until the client get it and then it will be deleted.
in a durable topic message will ve stored until all clients get it and then it will be deleted.
0
 

Author Comment

by:mmingfeilam
ID: 20287448
here is my publisher:

for some reason my subscriber is not getting the message.
/*
 * MyTempQSender2.java
 *
 * Created on November 14, 2007, 6:15 PM
 *
 * To change this template, choose Tools | Template Manager
 * and open the template in the editor.
 */
 
/**
 *
 * @author mlam
 */
import javax.jms.*;
import javax.naming.*;
import java.util.*;
import javax.jms.Queue;
import javax.jms.JMSException;
import javax.jms.Message;
import java.net.*;
import java.io.*;
 
public class MyTempQSender2 extends Thread  
{    
    //Initial context factory
    private String CTX_FACT; // = "org.exolab.jms.jndi.InitialContextFactory";
    //Provider URL
    private String PROV_URL; // = "rmi://64.139.12.19:1099/";
    private String TOPIC_CONN_FACT;
    private String TOPIC_NAME;
 
    private int memberPort;
    private String host;
    private int port;
    
    String                  fileName = null;
    Context                 jndiContext = null;
    TopicConnectionFactory  topicConnectionFactory = null;
    TopicConnection         topicConnection = null;
    TopicSession            topicSession = null;
    Topic topic = null;
    TopicSubscriber subscriber = null;
    
    TextMessage             message = null;
      
    private Properties loadProperties(String filename) throws IOException
    {
        Properties p = null;
        try
        {
            p = new Properties();
            java.net.URL url = ClassLoader.getSystemResource(filename);
            p.load(url.openStream());
        }
        catch(IOException x)
        {
            x.printStackTrace();
        }
        catch(Exception x)
        {
            x.printStackTrace();
        }
        finally
        {
            return p;
        }
    }
 
 
    /** Creates a new instance of MyTempQSender2 */
    
    public MyTempQSender2() 
    {
        try
        {
 
            /*
             *
            TN.memberPort = 65003
            TN.host = localhost
            TN.port = 65002
            TN.InitContextFactory = org.exolab.jms.jndi.InitialContextFactory
            TN.ProviderURL = rmi://localhost:1099/
            TN.QueueConnFactory = JmsQueueConnectionFactory  
            TN.QueueName = TNQueue
            */
            
            CTX_FACT = "org.exolab.jms.jndi.InitialContextFactory"; 
            PROV_URL= "rmi://localhost:1099/";
            TOPIC_CONN_FACT = "JmsTopicConnectionFactory";
            TOPIC_NAME = "TNTopic";
 
 
            memberPort = 65003;
            host = "localhost";
            port = 65002;
        }
        catch(Exception x)
        {
        }
 
    }
 
    public MyTempQSender2(String fName) 
    {
        try
        {
 
            fileName = fName;
 
            Properties p = loadProperties(fName);
 
            CTX_FACT = p.getProperty("TN.InitContextFactory"); 
            PROV_URL= p.getProperty("TN.ProviderURL");
            TOPIC_CONN_FACT = p.getProperty("TN.QueueConnFactory");
            TOPIC_NAME = p.getProperty("TN.QueueName");
 
            memberPort = Integer.parseInt(p.getProperty("TN.memberPort"));
            host = p.getProperty("TN.host");
            port = Integer.parseInt(p.getProperty("TN.port"));
        }
        catch(Exception x)
        {
        }
 
    }
    
    public MyTempQSender2(String initContextFactory, String providerURL, 
            String queueConnFactory, String queueName) 
    {
        try
        {
            CTX_FACT = initContextFactory; 
            PROV_URL= providerURL;
            TOPIC_CONN_FACT = queueConnFactory;
            TOPIC_NAME = queueName;
 
        }
        catch(Exception x)
        {           
        }
    }
       
    public void run()
    {
        try
        {
 
            System.out.println("Topic name is " + TOPIC_NAME);
 
            /* 
            * Create a JNDI API InitialContext object if none exists
            * yet.
            */
            Properties prop = new Properties();
            //Add the initial context factory
            prop.put(Context.INITIAL_CONTEXT_FACTORY, CTX_FACT);
            //Add the provider URL
            prop.put(Context.PROVIDER_URL, PROV_URL);
 
            try 
            {
                jndiContext = new InitialContext(prop);
            } 
            catch (NamingException e) 
            {
                System.out.println("Could not create JNDI API " +
                "context: " + e.toString());
 
                System.exit(1);
            }
 
            /* 
            * Look up connection factory and queue.  If either does
            * not exist, exit.
            */
            try 
            {
                topicConnectionFactory = (TopicConnectionFactory)
                jndiContext.lookup(TOPIC_CONN_FACT);
                topic = (Topic) jndiContext.lookup(TOPIC_NAME);                //queue1 = (javax.jms.Queue)jndiContext.lookup("MyQueue1");
            } 
            catch (NamingException e) 
            {
                System.out.println("JNDI API lookup failed: " +
                e.toString());
 
                System.exit(1);
            }
 
            /*
            * Create connection.
            * Create session from connection; false means session is
            * not transacted.
            */
            try 
            {
                topicConnection = topicConnectionFactory.createTopicConnection();
                topicSession = topicConnection.createTopicSession(false, 
                        Session.AUTO_ACKNOWLEDGE);
                
                //subscriber = topicSession.createSubscriber(topic); 
                
                Random generator = new Random();
                int r = generator.nextInt();
                
                String sTopicName = "topic" + Integer.toString(r);   
                
                System.out.println("subscription name: " + sTopicName);
                subscriber = topicSession.createDurableSubscriber(topic, sTopicName);
 
                topicConnection.start();
 
                while (true) 
                {
                    Message m = subscriber.receive(1);
                    if (m != null)
                    {
                        if (m instanceof TextMessage)
                        {
                            String s = new String();
                            String s1 = new String();
                            TextMessage text = (TextMessage) m;
                            s1 = text.getText();
                            s = m.getStringProperty("transID").toString();
                            System.out.println(s);
                            
                        }
                    }
                }
 
            } 
            catch (JMSException e) 
            {
                System.out.println("Exception occurred: " + 
                e.toString());
            } 
            finally 
            {
                if(subscriber!=null)
                {
                    try 
                    {
                        subscriber.close();
                    } 
                    catch (JMSException x) 
                    {
                    }
                }
                if(topicSession != null)
                {
                    try 
                    {
                        topicSession.unsubscribe("");
                        topicSession.close();
                    } 
                    catch (JMSException x) 
                    {
                    }
                }
                if (topicConnection != null) 
                {
                    try 
                    {
                        topicConnection.close();
                    } 
                    catch (JMSException x) 
                    {
                    }
                }
            }
 
        }
        catch(Exception x)
        {
        }
    }
    
    public static void main(String args[])
    {
        MyTempQSender2 receiver = new MyTempQSender2();
        receiver.start();
    }
 
}

Open in new window

0
 

Author Comment

by:mmingfeilam
ID: 20287556
vsnky,

you mentioned that:

in a durable topic message will ve stored until all clients get it and then it will be deleted.

sounds like the publisher has to be aware ahead of time who the clients are, which in turn means the clients has to be alive/connected.  any client that subscribes to a topic after the publisher has already published a message will not get that message.
0
 
LVL 9

Accepted Solution

by:
ysnky earned 250 total points
ID: 20287785
publisher publish any message and checks all subscribers which subscribed before, get it successfully. if so it delete message, if not wait until it is gotton by all.
0
 

Author Comment

by:mmingfeilam
ID: 20294416
but how does the publisher know who the subscribers are:

i think only after this:

               topicConnection = topicConnectionFactory.createTopicConnection();
                topicSession = topicConnection.createTopicSession(false,
                        Session.AUTO_ACKNOWLEDGE);
               
                subscriber = topicSession.createDurableSubscriber(topic, sTopicName);

                topicConnection.start();

this mean that the subscriber is alive, as the connection has already been started.
0

Featured Post

PeopleSoft Has Never Been Easier

PeopleSoft Adoption Made Smooth & Simple!

On-The-Job Training Is made Intuitive & Easy With WalkMe's On-Screen Guidance Tool.  Claim Your Free WalkMe Account Now

Question has a verified solution.

If you are experiencing a similar issue, please ask a related question

Introduction Java can be integrated with native programs using an interface called JNI(Java Native Interface). Native programs are programs which can directly run on the processor. JNI is simply a naming and calling convention so that the JVM (Java…
Go is an acronym of golang, is a programming language developed Google in 2007. Go is a new language that is mostly in the C family, with significant input from Pascal/Modula/Oberon family. Hence Go arisen as low-level language with fast compilation…
Viewers learn about the “for” loop and how it works in Java. By comparing it to the while loop learned before, viewers can make the transition easily. You will learn about the formatting of the for loop as we write a program that prints even numbers…
Video by: Michael
Viewers learn about how to reduce the potential repetitiveness of coding in main by developing methods to perform specific tasks for their program. Additionally, objects are introduced for the purpose of learning how to call methods in Java. Define …

688 members asked questions and received personalized solutions in the past 7 days.

Join the community of 500,000 technology professionals and ask your questions.

Join & Ask a Question