activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yi Pan <y...@yahoo-inc.com>
Subject Issues with durable subscriber and VMPendingDurableSubscriberMessageStoragePolicy
Date Wed, 05 Jun 2013 18:25:06 GMT
Hi, all,

Has anyone encountered a problem with
VMPendingDurableSubscriberMessageStoragePolicy that it loses persistent
messages when the subscriber re-connects?

We have the following test case fails with
VMPendingDurableSubscriberMessageStoragePolicy set as default destination
policy:			
1) publish persistent messages:
              void publish(String brokerUrl) {
                  try {
                        ConnectionFactory factory = new
ActiveMQConnectionFactory(brokerUrl);
                        connection = factory.createTopicConnection();
			connection.setClientID(CLIENT_ID);

			/**
			 * Create a JMS session
			 */
			TopicSession session = connection.createTopicSession(false,
					Session.AUTO_ACKNOWLEDGE);

			/**
			 * Fully Qualified Topic Name
			 */
			String fullyQualifiedTopicName = this.topicName;

			/**
			 * Get the topic instance from the topic name.
			 */
			Topic topic = session.createTopic(fullyQualifiedTopicName);

			/**
			 * Create a topic subscriber to the topic.
			 */
			TopicSubscriber ts = session.createDurableSubscriber(topic,
					DURABLE_SUB_NAME);

			/**
			 * Create a publisher to publish messages to the topic
			 */
			TopicPublisher publisher = session.createPublisher(topic);

			int deliveryMode = DeliveryMode.PERSISTENT;

			/**
			 * Send and receive 10 messages to the topic.
			 */
			for (int i = 0; i < 10; i++) {
				TextMessage tm = session.createTextMessage("msg-" + i);

				LOG.info("publishing msg to broker, msg=" + tm.getText());

				publisher.publish(topic, tm, deliveryMode, 4, 0);
			}

			Thread.sleep(1000);

			publisher.close();
			session.close();

		} catch (Exception e) {
			LOG.error(e.getMessage(), e);
		} finally {
			try {

				connection.close();
			} catch (Exception e) {
				;
			}
		}
           }
2) after the messages are published, subscriber connects to receive the
previously published messages:
    void consume(String brokerUrl) {
		try {
                        ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(brokerUrl);
			/**
			 * Create a topic connection from the obtained connection factory.
			 */
			connection = factory.createTopicConnection();

			connection.setClientID(CLIENT_ID);

			/**
			 * Create a JMS session
			 */
			TopicSession session = connection.createTopicSession(false,
					Session.AUTO_ACKNOWLEDGE);

			/**
			 * Fully Qualified Topic Name
			 */
			String fullyQualifiedTopicName = this.topicLocalName;

			/**
			 * Get the topic instance from the topic name.
			 */
			Topic topic = session.createTopic(fullyQualifiedTopicName);

			/**
			 * Create a topic subscriber to the topic.
			 */
			TopicSubscriber ts = session.createDurableSubscriber(topic,
					DURABLE_SUB_NAME);

			/**
			 * Start the connection - message will be delivered to the
			 * subscribers.
			 */
			connection.start();

			/**
			 * Send and receive 10 messages to the topic.
			 */
			for (int i = 0; i < 10; i++) {

				TextMessage tm2 = (TextMessage) ts.receive();

				if (tm2 == null) {
					throw new RuntimeException("message validation failed.");
				}

				LOG.info("receive msg from broker, msg received="
						+ tm2.getText());

			}

			Thread.sleep(1000);

		} catch (Exception e) {
			LOG.error(e.getMessage(), e);
		} finally {
			try {
				connection.close();
			} catch (Exception e) {
				;
			}
		}
    }

We noticed that when the default destination policy is set to
VMPendingDurableSubscriberMessageStoragePolicy, the above consume() function
fails and stuck at the blocking call: 
               TextMessage tm2 = (TextMessage) ts.receive();

Has anyone encountered this issue before? Our version of ActiveMQ is 5.7.0

Thanks a lot!

-Yi



--
View this message in context: http://activemq.2283324.n4.nabble.com/Issues-with-durable-subscriber-and-VMPendingDurableSubscriberMessageStoragePolicy-tp4667921.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Mime
View raw message