activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eaglepointe <songjf_2...@yahoo.com>
Subject Re: having problem with durable subscription
Date Fri, 13 Jun 2008 23:29:26 GMT

Yes, I'm running 5.1.0 and using default activemq.xml. I've found out the
cause! It is a bug subscriber code, it closes the connection in 'onMessage'
upon receiving 'SHUTDOWN' message while using 'session.AUTO_ACKNOWLEDGE', so
'SHUTDOWN' message never gets acknowledged, which results in everybody
appearing weird across subscriber/publisher/broker restart. Changing to
'session.CLIENT_ACKNOWLEDGE' solved the problem.

Thanks for the help!


Dave Stanley wrote:
> 
> Not that it helps much, I tried it with a 5.x release and your code works
> fine. I see the subscription is durable across consumer and broker
> restarts.
> Have you modified your activemq.xml, are you running with 5.1.0?
> 
> /Dave
> On Fri, Jun 13, 2008 at 1:05 PM, eaglepointe <songjf_2000@yahoo.com>
> wrote:
> 
>>
>> Forgot to mention that I also restarted message broker before start
>> durable
>> subscriber again.
>>
>>
>> eaglepointe wrote:
>> >
>> > Ok, here is what I did,
>> >
>> > 1) start message broker
>> > 2) start a durable subscriber which subscribes "topictest.messages" as
>> the
>> > attached code
>> > 3) start publisher, which sends some messages to "topictest.messages"
>> in
>> > 'persistent' mode, and the subscriber receives the messages
>> > 4) stop the subscriber and publisher
>> > 5) start publisher again, which sends some messages to
>> > "topictest.messages"
>> > 6) stop publisher
>> > 7) start subscriber again, according to my understanding of durable
>> > subscription, I'd expect subscriber receives the messages sent by
>> > publisher, but I got the exceptions instead
>> >
>> > Following is a more complete version of subscriber code, which is
>> really
>> > the sample code comes with ActiveMQ installation, except for the
>> durable
>> > subscription.
>> >
>> > The same code works fine, meaning no exceptions, if I don't use durable
>> > subscription.
>> > It seems that durable subscription requires some context in message
>> broker
>> > which somehow got lost.
>> >
>> >
>> >
>> --------------------------------------------------------------------------------------------
>> >
>> > public class TopicListener implements MessageListener {
>> >
>> >     private Connection connection;
>> >     private MessageProducer producer;
>> >     private Session session;
>> >     private int count;
>> >     private long start;
>> >     private Topic topic;
>> >     private Topic control;
>> >     //private TopicSubscriber consumer;
>> >
>> >     private String url = "tcp://192.168.30.60:61616";
>> >
>> >     public static void main(String[] argv) throws Exception {
>> >         TopicListener l = new TopicListener();
>> >         String[] unknown = CommandLineSupport.setOptions(l, argv);
>> >         if (unknown.length > 0) {
>> >             System.out.println("Unknown options: " +
>> > Arrays.toString(unknown));
>> >             System.exit(-1);
>> >         }
>> >         l.run();
>> >     }
>> >
>> >     public void run() throws JMSException {
>> >         ActiveMQConnectionFactory factory = new
>> > ActiveMQConnectionFactory(url);
>> >         connection = factory.createConnection();
>> >         connection.setClientID("myClient");
>> >         session = connection.createSession(false,
>> > Session.AUTO_ACKNOWLEDGE);
>> >         topic = session.createTopic("topictest.messages");
>> >         control = session.createTopic("topictest.control");
>> >
>> >         TopicSubscriber consumer =
>> > session.createDurableSubscriber(topic,"myDurable1");
>> >         //MessageConsumer consumer = session.createConsumer(topic);
>> >         consumer.setMessageListener(this);
>> >
>> >         producer = session.createProducer(control);
>> >
>> >         System.out.println("Waiting for messages...");
>> >         connection.start();
>> >     }
>> >
>> ---------------------------------------------------------------------------------
>> >
>> > your grateful,
>> > -Jeff
>> >
>> >
>> >
>> > Dave Stanley wrote:
>> >>
>> >> Your not running this from within a thread by any chance? I don't
>> >> believe connection.start() is going to block, so try and create the
>> >> connection outside the thread and pass it in by reference to the
>> thread
>> >> instance.
>> >>
>> >> HTH
>> >> /Dave
>> >>
>> >>
>> >> On Thu, Jun 12, 2008 at 10:30 PM, eaglepointe <songjf_2000@yahoo.com>
>> >> wrote:
>> >>
>> >>>
>> >>> Hi,
>> >>>
>> >>> I'm new to ActiveMQ and having problem with durable subscription,
>> >>> following
>> >>> code based on example code,
>> >>>
>> >>> ===================================================================
>> >>>   public void run() throws JMSException {
>> >>>        ActiveMQConnectionFactory factory = new
>> >>> ActiveMQConnectionFactory(url);
>> >>>        connection = factory.createConnection();
>> >>>        connection.setClientID("myClient");
>> >>>        session = connection.createSession(false,
>> >>> Session.AUTO_ACKNOWLEDGE);
>> >>>        topic = session.createTopic("topictest.messages");
>> >>>        control = session.createTopic("topictest.control");
>> >>>
>> >>>        TopicSubscriber consumer =
>> >>> session.createDurableSubscriber(topic,"myDurable1");
>> >>>        //MessageConsumer consumer = session.createConsumer(topic);
>> >>>        consumer.setMessageListener(this);
>> >>>
>> >>>        producer = session.createProducer(control);
>> >>>
>> >>>        System.out.println("Waiting for messages...");
>> >>>        connection.start();
>> >>>    }
>> >>> =================================================================
>> >>>
>> >>> is causing exceptions on broker side and client side, below are the
>> >>> exceptions
>> >>>
>> >>> broker side:
>> >>>
>> >>> ERROR Service                        - Async error occurred:
>> >>> java.lang.NullPointerException
>> >>> java.lang.NullPointerException
>> >>>        at
>> >>>
>> >>>
>> org.apache.activemq.broker.TransportConnection.processAddProducer(TransportConnection.java:479)
>> >>>        at
>> >>> org.apache.activemq.command.ProducerInfo.visit(ProducerInfo.java:105)
>> >>>        at
>> >>>
>> >>>
>> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:292)
>> >>>        at
>> >>>
>> >>>
>> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:180)
>> >>>        at
>> >>>
>> >>>
>> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
>> >>>        at
>> >>>
>> >>>
>> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
>> >>>        at
>> >>>
>> >>>
>> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
>> >>>        at
>> >>>
>> >>>
>> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
>> >>>        at
>> >>>
>> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:196)
>> >>>        at
>> >>>
>> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:183)
>> >>>        at java.lang.Thread.run(Thread.java:619)
>> >>>
>> >>> client side:
>> >>>
>> >>> Exception in thread "ActiveMQ Session Task"
>> >>> java.util.concurrent.RejectedExecutionException
>> >>>        at
>> >>>
>> >>>
>> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown
>> >>> Source)
>> >>>        at java.util.concurrent.ThreadPoolExecutor.reject(Unknown
>> Source)
>> >>>        at java.util.concurrent.ThreadPoolExecutor.execute(Unknown
>> >>> Source)
>> >>>        at
>> >>>
>> >>>
>> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:144)
>> >>>        at
>> >>>
>> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
>> >>>        at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown
>> >>> Source)
>> >>>        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
>> >>> Source)
>> >>>        at java.lang.Thread.run(Unknown Source)
>> >>>
>> >>>
>> >>> Any idea? Thanks in advance!
>> >>>
>> >>> --
>> >>> View this message in context:
>> >>>
>> http://www.nabble.com/having-problem-with-durable-subscription-tp17813954p17813954.html
>> >>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>> >>>
>> >>>
>> >>
>> >>
>> >
>> >
>>
>> --
>> View this message in context:
>> http://www.nabble.com/having-problem-with-durable-subscription-tp17813954p17828072.html
>>  Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>>
> 
> 

-- 
View this message in context: http://www.nabble.com/having-problem-with-durable-subscription-tp17813954p17833760.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message