activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eaglepointe <songjf_2...@yahoo.com>
Subject Re: having problem with durable subscription
Date Fri, 13 Jun 2008 16:51:26 GMT

Ok, here is what I did,

1) start message broker
2) start a durable subscriber which subscribes "topictest.messages" as the
attached code
3) start publisher, which sends some messages to "topictest.messages" in
'persistent' mode, and the subscriber receives the messages
4) stop the subscriber and publisher
5) start publisher again, which sends some messages to "topictest.messages"
6) stop publisher
7) start subscriber again, according to my understanding of durable
subscription, I'd expect subscriber receives the messages sent by publisher,
but I got the exceptions instead

Following is a more complete version of subscriber code, which is really the
sample code comes with ActiveMQ installation, except for the durable
subscription.

The same code works fine, meaning no exceptions, if I don't use durable
subscription.
It seems that durable subscription requires some context in message broker
which somehow got lost.


--------------------------------------------------------------------------------------------


public class TopicListener implements MessageListener {

    private Connection connection;
    private MessageProducer producer;
    private Session session;
    private int count;
    private long start;
    private Topic topic;
    private Topic control;
    //private TopicSubscriber consumer;

    private String url = "tcp://192.168.30.60:61616";

    public static void main(String[] argv) throws Exception {
        TopicListener l = new TopicListener();
        String[] unknown = CommandLineSupport.setOptions(l, argv);
        if (unknown.length > 0) {
            System.out.println("Unknown options: " +
Arrays.toString(unknown));
            System.exit(-1);
        }
        l.run();
    }

    public void run() throws JMSException {
        ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(url);
        connection = factory.createConnection();
        connection.setClientID("myClient");
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        topic = session.createTopic("topictest.messages");
        control = session.createTopic("topictest.control");

        TopicSubscriber consumer =
session.createDurableSubscriber(topic,"myDurable1");
        //MessageConsumer consumer = session.createConsumer(topic);
        consumer.setMessageListener(this);

        producer = session.createProducer(control);

        System.out.println("Waiting for messages...");
        connection.start();
    }
---------------------------------------------------------------------------------

your grateful,
-Jeff



Dave Stanley wrote:
> 
> Your not running this from within a thread by any chance? I don't
> believe connection.start() is going to block, so try and create the
> connection outside the thread and pass it in by reference to the thread
> instance.
> 
> HTH
> /Dave
> 
> 
> On Thu, Jun 12, 2008 at 10:30 PM, eaglepointe <songjf_2000@yahoo.com>
> wrote:
> 
>>
>> Hi,
>>
>> I'm new to ActiveMQ and having problem with durable subscription,
>> following
>> code based on example code,
>>
>> ===================================================================
>>   public void run() throws JMSException {
>>        ActiveMQConnectionFactory factory = new
>> ActiveMQConnectionFactory(url);
>>        connection = factory.createConnection();
>>        connection.setClientID("myClient");
>>        session = connection.createSession(false,
>> Session.AUTO_ACKNOWLEDGE);
>>        topic = session.createTopic("topictest.messages");
>>        control = session.createTopic("topictest.control");
>>
>>        TopicSubscriber consumer =
>> session.createDurableSubscriber(topic,"myDurable1");
>>        //MessageConsumer consumer = session.createConsumer(topic);
>>        consumer.setMessageListener(this);
>>
>>        producer = session.createProducer(control);
>>
>>        System.out.println("Waiting for messages...");
>>        connection.start();
>>    }
>> =================================================================
>>
>> is causing exceptions on broker side and client side, below are the
>> exceptions
>>
>> broker side:
>>
>> ERROR Service                        - Async error occurred:
>> java.lang.NullPointerException
>> java.lang.NullPointerException
>>        at
>>
>> org.apache.activemq.broker.TransportConnection.processAddProducer(TransportConnection.java:479)
>>        at
>> org.apache.activemq.command.ProducerInfo.visit(ProducerInfo.java:105)
>>        at
>>
>> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:292)
>>        at
>>
>> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:180)
>>        at
>>
>> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
>>        at
>>
>> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
>>        at
>>
>> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
>>        at
>>
>> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
>>        at
>> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:196)
>>        at
>> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:183)
>>        at java.lang.Thread.run(Thread.java:619)
>>
>> client side:
>>
>> Exception in thread "ActiveMQ Session Task"
>> java.util.concurrent.RejectedExecutionException
>>        at
>>
>> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown
>> Source)
>>        at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
>>        at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
>>        at
>>
>> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:144)
>>        at
>> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
>>        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown
>> Source)
>>        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
>> Source)
>>        at java.lang.Thread.run(Unknown Source)
>>
>>
>> Any idea? Thanks in advance!
>>
>> --
>> View this message in context:
>> http://www.nabble.com/having-problem-with-durable-subscription-tp17813954p17813954.html
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>>
> 
> 

-- 
View this message in context: http://www.nabble.com/having-problem-with-durable-subscription-tp17813954p17827792.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message