activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eaglepointe <songjf_2...@yahoo.com>
Subject Re: having problem with durable subscription
Date Fri, 13 Jun 2008 17:05:33 GMT

Forgot to mention that I also restarted message broker before start durable
subscriber again.


eaglepointe wrote:
> 
> Ok, here is what I did,
> 
> 1) start message broker
> 2) start a durable subscriber which subscribes "topictest.messages" as the
> attached code
> 3) start publisher, which sends some messages to "topictest.messages" in
> 'persistent' mode, and the subscriber receives the messages
> 4) stop the subscriber and publisher
> 5) start publisher again, which sends some messages to
> "topictest.messages"
> 6) stop publisher
> 7) start subscriber again, according to my understanding of durable
> subscription, I'd expect subscriber receives the messages sent by
> publisher, but I got the exceptions instead
> 
> Following is a more complete version of subscriber code, which is really
> the sample code comes with ActiveMQ installation, except for the durable
> subscription.
> 
> The same code works fine, meaning no exceptions, if I don't use durable
> subscription.
> It seems that durable subscription requires some context in message broker
> which somehow got lost.
> 
> 
> --------------------------------------------------------------------------------------------

> 
> public class TopicListener implements MessageListener {
> 
>     private Connection connection;
>     private MessageProducer producer;
>     private Session session;
>     private int count;
>     private long start;
>     private Topic topic;
>     private Topic control;
>     //private TopicSubscriber consumer;
> 
>     private String url = "tcp://192.168.30.60:61616";
> 
>     public static void main(String[] argv) throws Exception {
>         TopicListener l = new TopicListener();
>         String[] unknown = CommandLineSupport.setOptions(l, argv);
>         if (unknown.length > 0) {
>             System.out.println("Unknown options: " +
> Arrays.toString(unknown));
>             System.exit(-1);
>         }
>         l.run();
>     }
> 
>     public void run() throws JMSException {
>         ActiveMQConnectionFactory factory = new
> ActiveMQConnectionFactory(url);
>         connection = factory.createConnection();
>         connection.setClientID("myClient");
>         session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>         topic = session.createTopic("topictest.messages");
>         control = session.createTopic("topictest.control");
> 
>         TopicSubscriber consumer =
> session.createDurableSubscriber(topic,"myDurable1");
>         //MessageConsumer consumer = session.createConsumer(topic);
>         consumer.setMessageListener(this);
> 
>         producer = session.createProducer(control);
> 
>         System.out.println("Waiting for messages...");
>         connection.start();
>     }
> ---------------------------------------------------------------------------------
> 
> your grateful,
> -Jeff
> 
> 
> 
> Dave Stanley wrote:
>> 
>> Your not running this from within a thread by any chance? I don't
>> believe connection.start() is going to block, so try and create the
>> connection outside the thread and pass it in by reference to the thread
>> instance.
>> 
>> HTH
>> /Dave
>> 
>> 
>> On Thu, Jun 12, 2008 at 10:30 PM, eaglepointe <songjf_2000@yahoo.com>
>> wrote:
>> 
>>>
>>> Hi,
>>>
>>> I'm new to ActiveMQ and having problem with durable subscription,
>>> following
>>> code based on example code,
>>>
>>> ===================================================================
>>>   public void run() throws JMSException {
>>>        ActiveMQConnectionFactory factory = new
>>> ActiveMQConnectionFactory(url);
>>>        connection = factory.createConnection();
>>>        connection.setClientID("myClient");
>>>        session = connection.createSession(false,
>>> Session.AUTO_ACKNOWLEDGE);
>>>        topic = session.createTopic("topictest.messages");
>>>        control = session.createTopic("topictest.control");
>>>
>>>        TopicSubscriber consumer =
>>> session.createDurableSubscriber(topic,"myDurable1");
>>>        //MessageConsumer consumer = session.createConsumer(topic);
>>>        consumer.setMessageListener(this);
>>>
>>>        producer = session.createProducer(control);
>>>
>>>        System.out.println("Waiting for messages...");
>>>        connection.start();
>>>    }
>>> =================================================================
>>>
>>> is causing exceptions on broker side and client side, below are the
>>> exceptions
>>>
>>> broker side:
>>>
>>> ERROR Service                        - Async error occurred:
>>> java.lang.NullPointerException
>>> java.lang.NullPointerException
>>>        at
>>>
>>> org.apache.activemq.broker.TransportConnection.processAddProducer(TransportConnection.java:479)
>>>        at
>>> org.apache.activemq.command.ProducerInfo.visit(ProducerInfo.java:105)
>>>        at
>>>
>>> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:292)
>>>        at
>>>
>>> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:180)
>>>        at
>>>
>>> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
>>>        at
>>>
>>> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
>>>        at
>>>
>>> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
>>>        at
>>>
>>> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
>>>        at
>>> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:196)
>>>        at
>>> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:183)
>>>        at java.lang.Thread.run(Thread.java:619)
>>>
>>> client side:
>>>
>>> Exception in thread "ActiveMQ Session Task"
>>> java.util.concurrent.RejectedExecutionException
>>>        at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown
>>> Source)
>>>        at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
>>>        at java.util.concurrent.ThreadPoolExecutor.execute(Unknown
>>> Source)
>>>        at
>>>
>>> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:144)
>>>        at
>>> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
>>>        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown
>>> Source)
>>>        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
>>> Source)
>>>        at java.lang.Thread.run(Unknown Source)
>>>
>>>
>>> Any idea? Thanks in advance!
>>>
>>> --
>>> View this message in context:
>>> http://www.nabble.com/having-problem-with-durable-subscription-tp17813954p17813954.html
>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>>
>>>
>> 
>> 
> 
> 

-- 
View this message in context: http://www.nabble.com/having-problem-with-durable-subscription-tp17813954p17828072.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message