activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Dave Stanley" <dstanl...@gmail.com>
Subject Re: having problem with durable subscription
Date Fri, 13 Jun 2008 20:33:36 GMT
Not that it helps much, I tried it with a 5.x release and your code works
fine. I see the subscription is durable across consumer and broker restarts.
Have you modified your activemq.xml, are you running with 5.1.0?

/Dave
On Fri, Jun 13, 2008 at 1:05 PM, eaglepointe <songjf_2000@yahoo.com> wrote:

>
> Forgot to mention that I also restarted message broker before start durable
> subscriber again.
>
>
> eaglepointe wrote:
> >
> > Ok, here is what I did,
> >
> > 1) start message broker
> > 2) start a durable subscriber which subscribes "topictest.messages" as
> the
> > attached code
> > 3) start publisher, which sends some messages to "topictest.messages" in
> > 'persistent' mode, and the subscriber receives the messages
> > 4) stop the subscriber and publisher
> > 5) start publisher again, which sends some messages to
> > "topictest.messages"
> > 6) stop publisher
> > 7) start subscriber again, according to my understanding of durable
> > subscription, I'd expect subscriber receives the messages sent by
> > publisher, but I got the exceptions instead
> >
> > Following is a more complete version of subscriber code, which is really
> > the sample code comes with ActiveMQ installation, except for the durable
> > subscription.
> >
> > The same code works fine, meaning no exceptions, if I don't use durable
> > subscription.
> > It seems that durable subscription requires some context in message
> broker
> > which somehow got lost.
> >
> >
> >
> --------------------------------------------------------------------------------------------
> >
> > public class TopicListener implements MessageListener {
> >
> >     private Connection connection;
> >     private MessageProducer producer;
> >     private Session session;
> >     private int count;
> >     private long start;
> >     private Topic topic;
> >     private Topic control;
> >     //private TopicSubscriber consumer;
> >
> >     private String url = "tcp://192.168.30.60:61616";
> >
> >     public static void main(String[] argv) throws Exception {
> >         TopicListener l = new TopicListener();
> >         String[] unknown = CommandLineSupport.setOptions(l, argv);
> >         if (unknown.length > 0) {
> >             System.out.println("Unknown options: " +
> > Arrays.toString(unknown));
> >             System.exit(-1);
> >         }
> >         l.run();
> >     }
> >
> >     public void run() throws JMSException {
> >         ActiveMQConnectionFactory factory = new
> > ActiveMQConnectionFactory(url);
> >         connection = factory.createConnection();
> >         connection.setClientID("myClient");
> >         session = connection.createSession(false,
> > Session.AUTO_ACKNOWLEDGE);
> >         topic = session.createTopic("topictest.messages");
> >         control = session.createTopic("topictest.control");
> >
> >         TopicSubscriber consumer =
> > session.createDurableSubscriber(topic,"myDurable1");
> >         //MessageConsumer consumer = session.createConsumer(topic);
> >         consumer.setMessageListener(this);
> >
> >         producer = session.createProducer(control);
> >
> >         System.out.println("Waiting for messages...");
> >         connection.start();
> >     }
> >
> ---------------------------------------------------------------------------------
> >
> > your grateful,
> > -Jeff
> >
> >
> >
> > Dave Stanley wrote:
> >>
> >> Your not running this from within a thread by any chance? I don't
> >> believe connection.start() is going to block, so try and create the
> >> connection outside the thread and pass it in by reference to the thread
> >> instance.
> >>
> >> HTH
> >> /Dave
> >>
> >>
> >> On Thu, Jun 12, 2008 at 10:30 PM, eaglepointe <songjf_2000@yahoo.com>
> >> wrote:
> >>
> >>>
> >>> Hi,
> >>>
> >>> I'm new to ActiveMQ and having problem with durable subscription,
> >>> following
> >>> code based on example code,
> >>>
> >>> ===================================================================
> >>>   public void run() throws JMSException {
> >>>        ActiveMQConnectionFactory factory = new
> >>> ActiveMQConnectionFactory(url);
> >>>        connection = factory.createConnection();
> >>>        connection.setClientID("myClient");
> >>>        session = connection.createSession(false,
> >>> Session.AUTO_ACKNOWLEDGE);
> >>>        topic = session.createTopic("topictest.messages");
> >>>        control = session.createTopic("topictest.control");
> >>>
> >>>        TopicSubscriber consumer =
> >>> session.createDurableSubscriber(topic,"myDurable1");
> >>>        //MessageConsumer consumer = session.createConsumer(topic);
> >>>        consumer.setMessageListener(this);
> >>>
> >>>        producer = session.createProducer(control);
> >>>
> >>>        System.out.println("Waiting for messages...");
> >>>        connection.start();
> >>>    }
> >>> =================================================================
> >>>
> >>> is causing exceptions on broker side and client side, below are the
> >>> exceptions
> >>>
> >>> broker side:
> >>>
> >>> ERROR Service                        - Async error occurred:
> >>> java.lang.NullPointerException
> >>> java.lang.NullPointerException
> >>>        at
> >>>
> >>>
> org.apache.activemq.broker.TransportConnection.processAddProducer(TransportConnection.java:479)
> >>>        at
> >>> org.apache.activemq.command.ProducerInfo.visit(ProducerInfo.java:105)
> >>>        at
> >>>
> >>>
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:292)
> >>>        at
> >>>
> >>>
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:180)
> >>>        at
> >>>
> >>>
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> >>>        at
> >>>
> >>>
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
> >>>        at
> >>>
> >>>
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
> >>>        at
> >>>
> >>>
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
> >>>        at
> >>>
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:196)
> >>>        at
> >>>
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:183)
> >>>        at java.lang.Thread.run(Thread.java:619)
> >>>
> >>> client side:
> >>>
> >>> Exception in thread "ActiveMQ Session Task"
> >>> java.util.concurrent.RejectedExecutionException
> >>>        at
> >>>
> >>>
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown
> >>> Source)
> >>>        at java.util.concurrent.ThreadPoolExecutor.reject(Unknown
> Source)
> >>>        at java.util.concurrent.ThreadPoolExecutor.execute(Unknown
> >>> Source)
> >>>        at
> >>>
> >>>
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:144)
> >>>        at
> >>>
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
> >>>        at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown
> >>> Source)
> >>>        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> >>> Source)
> >>>        at java.lang.Thread.run(Unknown Source)
> >>>
> >>>
> >>> Any idea? Thanks in advance!
> >>>
> >>> --
> >>> View this message in context:
> >>>
> http://www.nabble.com/having-problem-with-durable-subscription-tp17813954p17813954.html
> >>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> >>>
> >>>
> >>
> >>
> >
> >
>
> --
> View this message in context:
> http://www.nabble.com/having-problem-with-durable-subscription-tp17813954p17828072.html
>  Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message