Solved

durable topic subscriber

Posted on 2007-11-14
5
755 Views
Last Modified: 2008-02-14
hi,

i am trying to create durable subscribers so that when a message is published but the subscriber is not alive, when the subscriber comes alive (connected), it will get the message.  right now, unless the subscriber is already alive, it's not getting the message.

attached is my code, don't pay attention to the class name, i am trying to publish a temporary queue name, the subscribe will then use the temp queue name to do ptp messaging:

does the publisher need to continuously publish the messge, ie in a while(1) loop?



/*
 * MyTempQSender2.java
 *
 * Created on November 14, 2007, 6:15 PM
 *
 * To change this template, choose Tools | Template Manager
 * and open the template in the editor.
 */
 
/**
 *
 * @author mlam
 */
import javax.jms.*;
import javax.naming.*;
import java.util.*;
import javax.jms.Queue;
import javax.jms.JMSException;
import javax.jms.Message;
import java.net.*;
import java.io.*;
 
public class MyTempQSender2 extends Thread  
{    
    //Initial context factory
    private String CTX_FACT; // = "org.exolab.jms.jndi.InitialContextFactory";
    //Provider URL
    private String PROV_URL; // = "rmi://64.139.12.19:1099/";
    private String TOPIC_CONN_FACT;
    private String TOPIC_NAME;
 
    private int memberPort;
    private String host;
    private int port;
    
    String                  fileName = null;
    Context                 jndiContext = null;
    TopicConnectionFactory  topicConnectionFactory = null;
    TopicConnection         topicConnection = null;
    TopicSession            topicSession = null;
    Topic topic = null;
    TopicSubscriber subscriber = null;
    
    TextMessage             message = null;
      
    private Properties loadProperties(String filename) throws IOException
    {
        Properties p = null;
        try
        {
            p = new Properties();
            java.net.URL url = ClassLoader.getSystemResource(filename);
            p.load(url.openStream());
        }
        catch(IOException x)
        {
            x.printStackTrace();
        }
        catch(Exception x)
        {
            x.printStackTrace();
        }
        finally
        {
            return p;
        }
    }
 
 
    /** Creates a new instance of MyTempQSender2 */
    
    public MyTempQSender2() 
    {
        try
        {
 
            /*
             *
            TN.memberPort = 65003
            TN.host = localhost
            TN.port = 65002
            TN.InitContextFactory = org.exolab.jms.jndi.InitialContextFactory
            TN.ProviderURL = rmi://localhost:1099/
            TN.QueueConnFactory = JmsQueueConnectionFactory  
            TN.QueueName = TNQueue
            */
            
            CTX_FACT = "org.exolab.jms.jndi.InitialContextFactory"; 
            PROV_URL= "rmi://localhost:1099/";
            TOPIC_CONN_FACT = "JmsTopicConnectionFactory";
            TOPIC_NAME = "TNTopic";
 
 
            memberPort = 65003;
            host = "localhost";
            port = 65002;
        }
        catch(Exception x)
        {
        }
 
    }
 
    public MyTempQSender2(String fName) 
    {
        try
        {
 
            fileName = fName;
 
            Properties p = loadProperties(fName);
 
            CTX_FACT = p.getProperty("TN.InitContextFactory"); 
            PROV_URL= p.getProperty("TN.ProviderURL");
            TOPIC_CONN_FACT = p.getProperty("TN.QueueConnFactory");
            TOPIC_NAME = p.getProperty("TN.QueueName");
 
            memberPort = Integer.parseInt(p.getProperty("TN.memberPort"));
            host = p.getProperty("TN.host");
            port = Integer.parseInt(p.getProperty("TN.port"));
        }
        catch(Exception x)
        {
        }
 
    }
    
    public MyTempQSender2(String initContextFactory, String providerURL, 
            String queueConnFactory, String queueName) 
    {
        try
        {
            CTX_FACT = initContextFactory; 
            PROV_URL= providerURL;
            TOPIC_CONN_FACT = queueConnFactory;
            TOPIC_NAME = queueName;
 
        }
        catch(Exception x)
        {           
        }
    }
       
    public void run()
    {
        try
        {
 
            System.out.println("Topic name is " + TOPIC_NAME);
 
            /* 
            * Create a JNDI API InitialContext object if none exists
            * yet.
            */
            Properties prop = new Properties();
            //Add the initial context factory
            prop.put(Context.INITIAL_CONTEXT_FACTORY, CTX_FACT);
            //Add the provider URL
            prop.put(Context.PROVIDER_URL, PROV_URL);
 
            try 
            {
                jndiContext = new InitialContext(prop);
            } 
            catch (NamingException e) 
            {
                System.out.println("Could not create JNDI API " +
                "context: " + e.toString());
 
                System.exit(1);
            }
 
            /* 
            * Look up connection factory and queue.  If either does
            * not exist, exit.
            */
            try 
            {
                topicConnectionFactory = (TopicConnectionFactory)
                jndiContext.lookup(TOPIC_CONN_FACT);
                topic = (Topic) jndiContext.lookup(TOPIC_NAME);                //queue1 = (javax.jms.Queue)jndiContext.lookup("MyQueue1");
            } 
            catch (NamingException e) 
            {
                System.out.println("JNDI API lookup failed: " +
                e.toString());
 
                System.exit(1);
            }
 
            /*
            * Create connection.
            * Create session from connection; false means session is
            * not transacted.
            */
            try 
            {
                topicConnection = topicConnectionFactory.createTopicConnection();
                topicSession = topicConnection.createTopicSession(false, 
                        Session.AUTO_ACKNOWLEDGE);
                
                //subscriber = topicSession.createSubscriber(topic); 
                
                Random generator = new Random();
                int r = generator.nextInt();
                
                String sTopicName = "topic" + Integer.toString(r);   
                
                System.out.println("subscription name: " + sTopicName);
                subscriber = topicSession.createDurableSubscriber(topic, sTopicName);
 
                topicConnection.start();
 
                while (true) 
                {
                    Message m = subscriber.receive(1);
                    if (m != null)
                    {
                        if (m instanceof TextMessage)
                        {
                            String s = new String();
                            String s1 = new String();
                            TextMessage text = (TextMessage) m;
                            s1 = text.getText();
                            s = m.getStringProperty("transID").toString();
                            System.out.println(s);
                            
                        }
                    }
                }
 
            } 
            catch (JMSException e) 
            {
                System.out.println("Exception occurred: " + 
                e.toString());
            } 
            finally 
            {
                if(subscriber!=null)
                {
                    try 
                    {
                        subscriber.close();
                    } 
                    catch (JMSException x) 
                    {
                    }
                }
                if(topicSession != null)
                {
                    try 
                    {
                        topicSession.unsubscribe("");
                        topicSession.close();
                    } 
                    catch (JMSException x) 
                    {
                    }
                }
                if (topicConnection != null) 
                {
                    try 
                    {
                        topicConnection.close();
                    } 
                    catch (JMSException x) 
                    {
                    }
                }
            }
 
        }
        catch(Exception x)
        {
        }
    }
    
    public static void main(String args[])
    {
        MyTempQSender2 receiver = new MyTempQSender2();
        receiver.start();
    }
 
}

Open in new window

0
Comment
Question by:mmingfeilam
[X]
Welcome to Experts Exchange

Add your voice to the tech community where 5M+ people just like you are talking about what matters.

  • Help others & share knowledge
  • Earn cash & points
  • Learn & ask questions
  • 3
  • 2
5 Comments
 
LVL 9

Expert Comment

by:ysnky
ID: 20287235
>>does the publisher need to continuously publish the messge, ie in a while(1) loop?
if you mean to be taken message by unconnected clients, no you must not.

in a durable queue message will be stored until the client get it and then it will be deleted.
in a durable topic message will ve stored until all clients get it and then it will be deleted.
0
 

Author Comment

by:mmingfeilam
ID: 20287448
here is my publisher:

for some reason my subscriber is not getting the message.
/*
 * MyTempQSender2.java
 *
 * Created on November 14, 2007, 6:15 PM
 *
 * To change this template, choose Tools | Template Manager
 * and open the template in the editor.
 */
 
/**
 *
 * @author mlam
 */
import javax.jms.*;
import javax.naming.*;
import java.util.*;
import javax.jms.Queue;
import javax.jms.JMSException;
import javax.jms.Message;
import java.net.*;
import java.io.*;
 
public class MyTempQSender2 extends Thread  
{    
    //Initial context factory
    private String CTX_FACT; // = "org.exolab.jms.jndi.InitialContextFactory";
    //Provider URL
    private String PROV_URL; // = "rmi://64.139.12.19:1099/";
    private String TOPIC_CONN_FACT;
    private String TOPIC_NAME;
 
    private int memberPort;
    private String host;
    private int port;
    
    String                  fileName = null;
    Context                 jndiContext = null;
    TopicConnectionFactory  topicConnectionFactory = null;
    TopicConnection         topicConnection = null;
    TopicSession            topicSession = null;
    Topic topic = null;
    TopicSubscriber subscriber = null;
    
    TextMessage             message = null;
      
    private Properties loadProperties(String filename) throws IOException
    {
        Properties p = null;
        try
        {
            p = new Properties();
            java.net.URL url = ClassLoader.getSystemResource(filename);
            p.load(url.openStream());
        }
        catch(IOException x)
        {
            x.printStackTrace();
        }
        catch(Exception x)
        {
            x.printStackTrace();
        }
        finally
        {
            return p;
        }
    }
 
 
    /** Creates a new instance of MyTempQSender2 */
    
    public MyTempQSender2() 
    {
        try
        {
 
            /*
             *
            TN.memberPort = 65003
            TN.host = localhost
            TN.port = 65002
            TN.InitContextFactory = org.exolab.jms.jndi.InitialContextFactory
            TN.ProviderURL = rmi://localhost:1099/
            TN.QueueConnFactory = JmsQueueConnectionFactory  
            TN.QueueName = TNQueue
            */
            
            CTX_FACT = "org.exolab.jms.jndi.InitialContextFactory"; 
            PROV_URL= "rmi://localhost:1099/";
            TOPIC_CONN_FACT = "JmsTopicConnectionFactory";
            TOPIC_NAME = "TNTopic";
 
 
            memberPort = 65003;
            host = "localhost";
            port = 65002;
        }
        catch(Exception x)
        {
        }
 
    }
 
    public MyTempQSender2(String fName) 
    {
        try
        {
 
            fileName = fName;
 
            Properties p = loadProperties(fName);
 
            CTX_FACT = p.getProperty("TN.InitContextFactory"); 
            PROV_URL= p.getProperty("TN.ProviderURL");
            TOPIC_CONN_FACT = p.getProperty("TN.QueueConnFactory");
            TOPIC_NAME = p.getProperty("TN.QueueName");
 
            memberPort = Integer.parseInt(p.getProperty("TN.memberPort"));
            host = p.getProperty("TN.host");
            port = Integer.parseInt(p.getProperty("TN.port"));
        }
        catch(Exception x)
        {
        }
 
    }
    
    public MyTempQSender2(String initContextFactory, String providerURL, 
            String queueConnFactory, String queueName) 
    {
        try
        {
            CTX_FACT = initContextFactory; 
            PROV_URL= providerURL;
            TOPIC_CONN_FACT = queueConnFactory;
            TOPIC_NAME = queueName;
 
        }
        catch(Exception x)
        {           
        }
    }
       
    public void run()
    {
        try
        {
 
            System.out.println("Topic name is " + TOPIC_NAME);
 
            /* 
            * Create a JNDI API InitialContext object if none exists
            * yet.
            */
            Properties prop = new Properties();
            //Add the initial context factory
            prop.put(Context.INITIAL_CONTEXT_FACTORY, CTX_FACT);
            //Add the provider URL
            prop.put(Context.PROVIDER_URL, PROV_URL);
 
            try 
            {
                jndiContext = new InitialContext(prop);
            } 
            catch (NamingException e) 
            {
                System.out.println("Could not create JNDI API " +
                "context: " + e.toString());
 
                System.exit(1);
            }
 
            /* 
            * Look up connection factory and queue.  If either does
            * not exist, exit.
            */
            try 
            {
                topicConnectionFactory = (TopicConnectionFactory)
                jndiContext.lookup(TOPIC_CONN_FACT);
                topic = (Topic) jndiContext.lookup(TOPIC_NAME);                //queue1 = (javax.jms.Queue)jndiContext.lookup("MyQueue1");
            } 
            catch (NamingException e) 
            {
                System.out.println("JNDI API lookup failed: " +
                e.toString());
 
                System.exit(1);
            }
 
            /*
            * Create connection.
            * Create session from connection; false means session is
            * not transacted.
            */
            try 
            {
                topicConnection = topicConnectionFactory.createTopicConnection();
                topicSession = topicConnection.createTopicSession(false, 
                        Session.AUTO_ACKNOWLEDGE);
                
                //subscriber = topicSession.createSubscriber(topic); 
                
                Random generator = new Random();
                int r = generator.nextInt();
                
                String sTopicName = "topic" + Integer.toString(r);   
                
                System.out.println("subscription name: " + sTopicName);
                subscriber = topicSession.createDurableSubscriber(topic, sTopicName);
 
                topicConnection.start();
 
                while (true) 
                {
                    Message m = subscriber.receive(1);
                    if (m != null)
                    {
                        if (m instanceof TextMessage)
                        {
                            String s = new String();
                            String s1 = new String();
                            TextMessage text = (TextMessage) m;
                            s1 = text.getText();
                            s = m.getStringProperty("transID").toString();
                            System.out.println(s);
                            
                        }
                    }
                }
 
            } 
            catch (JMSException e) 
            {
                System.out.println("Exception occurred: " + 
                e.toString());
            } 
            finally 
            {
                if(subscriber!=null)
                {
                    try 
                    {
                        subscriber.close();
                    } 
                    catch (JMSException x) 
                    {
                    }
                }
                if(topicSession != null)
                {
                    try 
                    {
                        topicSession.unsubscribe("");
                        topicSession.close();
                    } 
                    catch (JMSException x) 
                    {
                    }
                }
                if (topicConnection != null) 
                {
                    try 
                    {
                        topicConnection.close();
                    } 
                    catch (JMSException x) 
                    {
                    }
                }
            }
 
        }
        catch(Exception x)
        {
        }
    }
    
    public static void main(String args[])
    {
        MyTempQSender2 receiver = new MyTempQSender2();
        receiver.start();
    }
 
}

Open in new window

0
 

Author Comment

by:mmingfeilam
ID: 20287556
vsnky,

you mentioned that:

in a durable topic message will ve stored until all clients get it and then it will be deleted.

sounds like the publisher has to be aware ahead of time who the clients are, which in turn means the clients has to be alive/connected.  any client that subscribes to a topic after the publisher has already published a message will not get that message.
0
 
LVL 9

Accepted Solution

by:
ysnky earned 250 total points
ID: 20287785
publisher publish any message and checks all subscribers which subscribed before, get it successfully. if so it delete message, if not wait until it is gotton by all.
0
 

Author Comment

by:mmingfeilam
ID: 20294416
but how does the publisher know who the subscribers are:

i think only after this:

               topicConnection = topicConnectionFactory.createTopicConnection();
                topicSession = topicConnection.createTopicSession(false,
                        Session.AUTO_ACKNOWLEDGE);
               
                subscriber = topicSession.createDurableSubscriber(topic, sTopicName);

                topicConnection.start();

this mean that the subscriber is alive, as the connection has already been started.
0

Featured Post

On Demand Webinar - Networking for the Cloud Era

This webinar discusses:
-Common barriers companies experience when moving to the cloud
-How SD-WAN changes the way we look at networks
-Best practices customers should employ moving forward with cloud migration
-What happens behind the scenes of SteelConnect’s one-click button

Question has a verified solution.

If you are experiencing a similar issue, please ask a related question

Suggested Solutions

Title # Comments Views Activity
egit plugin on eclipse 8 98
Opening PDF on button click and fill new document 2 54
Github api to fetch the number of collaborators in a git repo 1 28
Java array 21 82
INTRODUCTION Working with files is a moderately common task in Java.  For most projects hard coding the file names, using parameters in configuration files, or using command-line arguments is sufficient.   However, when your application has vi…
Introduction This article is the second of three articles that explain why and how the Experts Exchange QA Team does test automation for our web site. This article covers the basic installation and configuration of the test automation tools used by…
This tutorial will introduce the viewer to VisualVM for the Java platform application. This video explains an example program and covers the Overview, Monitor, and Heap Dump tabs.
This tutorial explains how to use the VisualVM tool for the Java platform application. This video goes into detail on the Threads, Sampler, and Profiler tabs.

749 members asked questions and received personalized solutions in the past 7 days.

Join the community of 500,000 technology professionals and ask your questions.

Join & Ask a Question