activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From IlkkaT <ilkka.t...@gmail.com>
Subject durable subscriber cannot receive messages?
Date Fri, 17 Feb 2012 17:17:47 GMT
Hi,

I was able to find weird functionality with AMQ when testing a setup:
- one topic
- one consumer having durable subscription for the topic
- one producer publishing persistent messages to the topic

This worked for some time but at some point the consumer could not receive
messages anymore. I found out that there were multiple identical
subscriptions for a topic using the 'activemq query' tool. I was able the
get rid of the multiple subscriptions issue putting the
keepDurableSubsActive="false" attribute for the broker config. Actually,
there is not very much info about the attribute. However, the "not
receiving" issue still continued.

The issue seemed to relate to un-graceful stop of the consumer and AMQ
restart when the consumer was still active.

The issue disappeared once I removed 'destinations' element from the config
specifying that the topic is created automatically when AMQ starts.

This functionality can be replicated. I have tested with Ubuntu 10.04 and OS
X 10.6 using ActiveMQ 5.5.1. So, I don't know if this is a feature/bug but
this way I could get the AMQ blocking the messages for the consumer.


Best regards,
Ilkka



== Test keepDurableSubsActive="false" issue ==

1) fresh ActiveMQ installation with keepDurableSubsActive="false"
2) start AMQ
3) start durable subscriber and start producer
4) kill (ctrl-C) and start consumer two times (while producer is still
publishing)
5) consumer cannot receive anymore
6) restart AMQ
7) the messages are received by consumer and otherwise working fine

This does not replicate if the keepDurableSubsActive attribute is not used.



== Test destinations issue ==

1) fresh ActiveMQ installation with destinations element
<destinations>
	<topic physicalName="TestTopic"/>
</destinations>
2) start AMQ
3) start consumer and start producer
4) restart AMQ when consumer is still receiving
5) start consumer and start publisher
6) kill (ctrl-c) and start consumer a couple of times when producer is still
publishing
7) consumer cannot receive anymore and pending message count increases in
AMQ web console
8)  './activemq query -QTopic=TestTopic' shows multiple identical
subscriptions, number of subscriptions increases every time consumer
connects
9) restart AMQ
10) messages are received by consumer and otherwise working fine

The issue does not replicate if 'destinations' element is not used.



Codes:
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Topic;
import javax.jms.Session;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.jms.TopicConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class DurableSubscriber {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		try {
			String url = "tcp://127.0.0.1:61616";
			System.out.println("URL: " + url);
			ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(url);
			TopicConnection tc = acf.createTopicConnection();
			tc.setClientID("TestClient");
			//tc.start();
			TopicSession ts = tc.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
			Topic t = ts.createTopic("TestTopic");
			L l = new L();
			tc.setExceptionListener(l);
			TopicSubscriber tsub = ts.createDurableSubscriber(t, "TestSubscription");
			//tsub.setMessageListener(l);
			System.out.println("Start connection");
			tc.start();

			Message m = null;
			int count = 0;

			while (true) {
				System.out.println("Start to receive");
				m = tsub.receive(10000);
				if (m == null) {
					System.out.println("No more messages");
					break;
				}
				else {
					System.out.println(m.toString());
					count++;
				}
			}
			System.out.println("Count: " + count);
			System.out.println("Stop connection");
			tc.stop();
			System.out.println("Close subscription");
			tsub.close();
			System.out.println("Close session");
			ts.close();
			System.out.println("Close connection");
		    tc.close();
		}
		catch (Exception e) {
			System.out.println("Exception");
			e.printStackTrace();
		}
	}

	public static class L implements MessageListener, ExceptionListener {

		private int count = 0;

		@Override
		public void onMessage(Message m) {
			System.out.println(m.toString());
			count++;
			System.out.println("Count: " + count);
		}

		@Override
		public void onException(JMSException arg0) {
			System.out.println("onException");
			System.out.println(arg0.getMessage());
		}
	}
}

import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TProducer {

	public static void main(String[] args) {
		TopicConnection tc = null;
		TopicSession ts = null;
		String url = "tcp://127.0.0.1:61616?jms.alwaysSyncSend=true";
		String destination = "TestTopic";

		try {
			System.out.println("URL: " + url);
			ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(url);
			tc = acf.createTopicConnection();
			ts = tc.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
			EL el = new EL();
			tc.setExceptionListener(el);
			Topic t = ts.createTopic(destination);
			TopicPublisher tp = ts.createPublisher(t);
			tp.setDeliveryMode(DeliveryMode.PERSISTENT);
			tc.start();
			for (int i=0; i<10; i++) {
				TextMessage m = ts.createTextMessage();
				m.setText("Test Message");
				System.out.println("Publish");
				tp.publish(m);
				System.out.println("Published");
				Thread.sleep(1000);
			}
			System.out.println("Close session");
			ts.close();
			System.out.println("Close connection");
		    tc.close();
		    System.out.println("Closed");
		}
		catch (Exception e) {
			System.out.println("NORMAL EXCEPTION");
			e.printStackTrace();
			try {
				ts.close();
				tc.close();
			}
			catch (Exception e2) {
				System.out.println("Could not close");
			}
		}
	}

	public static class EL implements ExceptionListener {

		Object lock = new Object();
		int count = 0;

		public int getCount() {
			synchronized(lock) {
				return count;
			}
		}

		@Override
		public void onException(JMSException arg0) {
			synchronized(lock) {
				count++;
			}
			System.out.println("ON EXCEPTION");
		}
	}
}


--
View this message in context: http://activemq.2283324.n4.nabble.com/durable-subscriber-cannot-receive-messages-tp4397874p4397874.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Mime
View raw message