activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Christian Posta <christian.po...@gmail.com>
Subject Re: Issues with durable subscriber and VMPendingDurableSubscriberMessageStoragePolicy
Date Wed, 05 Jun 2013 18:33:49 GMT
Tim fixed some issues around the cursors for durable subs.. can you try
your tests on a recent snapshot?


On Wed, Jun 5, 2013 at 11:25 AM, Yi Pan <ypan@yahoo-inc.com> wrote:

> Hi, all,
>
> Has anyone encountered a problem with
> VMPendingDurableSubscriberMessageStoragePolicy that it loses persistent
> messages when the subscriber re-connects?
>
> We have the following test case fails with
> VMPendingDurableSubscriberMessageStoragePolicy set as default destination
> policy:
> 1) publish persistent messages:
>               void publish(String brokerUrl) {
>                   try {
>                         ConnectionFactory factory = new
> ActiveMQConnectionFactory(brokerUrl);
>                         connection = factory.createTopicConnection();
>                         connection.setClientID(CLIENT_ID);
>
>                         /**
>                          * Create a JMS session
>                          */
>                         TopicSession session =
> connection.createTopicSession(false,
>                                         Session.AUTO_ACKNOWLEDGE);
>
>                         /**
>                          * Fully Qualified Topic Name
>                          */
>                         String fullyQualifiedTopicName = this.topicName;
>
>                         /**
>                          * Get the topic instance from the topic name.
>                          */
>                         Topic topic =
> session.createTopic(fullyQualifiedTopicName);
>
>                         /**
>                          * Create a topic subscriber to the topic.
>                          */
>                         TopicSubscriber ts =
> session.createDurableSubscriber(topic,
>                                         DURABLE_SUB_NAME);
>
>                         /**
>                          * Create a publisher to publish messages to the
> topic
>                          */
>                         TopicPublisher publisher =
> session.createPublisher(topic);
>
>                         int deliveryMode = DeliveryMode.PERSISTENT;
>
>                         /**
>                          * Send and receive 10 messages to the topic.
>                          */
>                         for (int i = 0; i < 10; i++) {
>                                 TextMessage tm =
> session.createTextMessage("msg-" + i);
>
>                                 LOG.info("publishing msg to broker, msg="
> + tm.getText());
>
>                                 publisher.publish(topic, tm, deliveryMode,
> 4, 0);
>                         }
>
>                         Thread.sleep(1000);
>
>                         publisher.close();
>                         session.close();
>
>                 } catch (Exception e) {
>                         LOG.error(e.getMessage(), e);
>                 } finally {
>                         try {
>
>                                 connection.close();
>                         } catch (Exception e) {
>                                 ;
>                         }
>                 }
>            }
> 2) after the messages are published, subscriber connects to receive the
> previously published messages:
>     void consume(String brokerUrl) {
>                 try {
>                         ActiveMQConnectionFactory factory = new
> ActiveMQConnectionFactory(brokerUrl);
>                         /**
>                          * Create a topic connection from the obtained
> connection factory.
>                          */
>                         connection = factory.createTopicConnection();
>
>                         connection.setClientID(CLIENT_ID);
>
>                         /**
>                          * Create a JMS session
>                          */
>                         TopicSession session =
> connection.createTopicSession(false,
>                                         Session.AUTO_ACKNOWLEDGE);
>
>                         /**
>                          * Fully Qualified Topic Name
>                          */
>                         String fullyQualifiedTopicName =
> this.topicLocalName;
>
>                         /**
>                          * Get the topic instance from the topic name.
>                          */
>                         Topic topic =
> session.createTopic(fullyQualifiedTopicName);
>
>                         /**
>                          * Create a topic subscriber to the topic.
>                          */
>                         TopicSubscriber ts =
> session.createDurableSubscriber(topic,
>                                         DURABLE_SUB_NAME);
>
>                         /**
>                          * Start the connection - message will be
> delivered to the
>                          * subscribers.
>                          */
>                         connection.start();
>
>                         /**
>                          * Send and receive 10 messages to the topic.
>                          */
>                         for (int i = 0; i < 10; i++) {
>
>                                 TextMessage tm2 = (TextMessage)
> ts.receive();
>
>                                 if (tm2 == null) {
>                                         throw new
> RuntimeException("message validation failed.");
>                                 }
>
>                                 LOG.info("receive msg from broker, msg
> received="
>                                                 + tm2.getText());
>
>                         }
>
>                         Thread.sleep(1000);
>
>                 } catch (Exception e) {
>                         LOG.error(e.getMessage(), e);
>                 } finally {
>                         try {
>                                 connection.close();
>                         } catch (Exception e) {
>                                 ;
>                         }
>                 }
>     }
>
> We noticed that when the default destination policy is set to
> VMPendingDurableSubscriberMessageStoragePolicy, the above consume()
> function
> fails and stuck at the blocking call:
>                TextMessage tm2 = (TextMessage) ts.receive();
>
> Has anyone encountered this issue before? Our version of ActiveMQ is 5.7.0
>
> Thanks a lot!
>
> -Yi
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/Issues-with-durable-subscriber-and-VMPendingDurableSubscriberMessageStoragePolicy-tp4667921.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>



-- 
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message