activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Daroo <dariusz.szablin...@gmail.com>
Subject "Could not correlate acknowledgment with dispatched message"
Date Wed, 11 Nov 2009 20:43:06 GMT

I'm getting following exception on broker side (5.3 and 5.1):


2009-11-11 21:12:25 org.apache.activemq.broker.TransportConnection
serviceException
WARNING: Async error occurred: javax.jms.JMSException: Could not correlate
acknowledgment with dispatched message: MessageAck {commandId = 14,
responseRequired = false, ackType = 0, consumerId =
ID:MAIN-50081-1257970345670-0:0:1:1, firstMessageId =
ID:MAIN-50081-1257970345670-0:0:3:1:1, lastMessageId =
ID:MAIN-50081-1257970345670-0:0:3:1:1, destination =
queue://unordered.ack.asynch.q, transactionId = null, messageCount = 1}
javax.jms.JMSException: Could not correlate acknowledgment with dispatched
message: MessageAck {commandId = 14, responseRequired = false, ackType = 0,
consumerId = ID:MAIN-50081-1257970345670-0:0:1:1, firstMessageId =
ID:MAIN-50081-1257970345670-0:0:3:1:1, lastMessageId =
ID:MAIN-50081-1257970345670-0:0:3:1:1, destination =
queue://unordered.ack.asynch.q, transactionId = null, messageCount = 1}
	at
org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:315)
	at
org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:369)
	at
org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:470)
	at
org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
	at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
	at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
	at
org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
	at
org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:449)
	at org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
	at
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:297)
	at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:175)
	at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
	at
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
	at
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:210)
	at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
	at
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
	at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
	at java.lang.Thread.run(Thread.java:619)

It happens when I have two or more asynch (onMessage) consumers on one queue
using either CLIENT_ACKNOWLEDGE or INDVIDUAL_ACKNOWLEDGE mode and ACK-ing
messages in "out of order" way. By "out of order" I mean  calling
message.acknowledge() inside onMessage()  method of the second consumer
first  and next on the first one. In real world it could happen when  the
second consumer is much faster than the first one.
I leaf through JMS specs and it seems that "out of order" ACK-ing is not
forbidden. At least,  I didn't found any statement which might prohibit
this. I'm attaching sample client code which causes the exception on broker
side. Is there anything wrong with my code or is it an AMQ bug?

==== client code ===
public class UnorderedAckAsynch {
	private final static String QUEUE_NAME = "unordered.ack.asynch.q";
	private final static String SEQ_NUM_PROPERTY = "seqNum";

	private final static int TOTAL_MESSAGES_CNT = 2;
	private final static int CONSUMERS_CNT = 2;
	private final static int MESSAGE_LENGTH_BYTES = 75000;
	
	private final static CountDownLatch LATCH = new
CountDownLatch(TOTAL_MESSAGES_CNT);
	
	private static Connection connection;
	
/**
 * AMQ 5.1 - SEVERE: Async error occurred: javax.jms.JMSException: Could not
correlate acknowledgment with dispatched message: MessageAck 
 * AMQ 5.3 - WARNING: Async error occurred: javax.jms.JMSException: Could
not correlate acknowledgment with dispatched message: MessageAck
 * 
 */
	public static void main(String[] args) throws Exception {
		final ConnectionFactory fac = new
ActiveMQConnectionFactory("tcp://0.0.0.0:61616");
		List<Consumer> consumers = null;
		Session producerSession = null;
		try {
			connection = fac.createConnection();
			connection.start();

			consumers = new ArrayList<Consumer>();
			for (int i = 0; i < CONSUMERS_CNT; i++) {
				consumers.add(new Consumer());
			}

			producerSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
			final MessageProducer producer = producerSession.createProducer(new
ActiveMQQueue(QUEUE_NAME));
			producer.setDeliveryMode(DeliveryMode.PERSISTENT);
			for (int i = 0; i < TOTAL_MESSAGES_CNT; i++) {
				final Message message =
producerSession.createTextMessage(buildLongString());
				message.setIntProperty(SEQ_NUM_PROPERTY, i);
				producer.send(message);
			}
			
			LATCH.await();

		} finally {
			if (producerSession != null) 
				producerSession.close();
				
			if (consumers != null) {
				for (Consumer c : consumers) {
					c.close();
				}
			}
			
			if (connection != null)
				connection.close();
		}
	}
	private static String buildLongString() {
		final StringBuilder stringBuilder = new
StringBuilder(MESSAGE_LENGTH_BYTES);
		for (int i = 0; i < MESSAGE_LENGTH_BYTES; ++i) {
			stringBuilder.append((int) (Math.random() * 10));
		}
		return stringBuilder.toString();
	}
	
	private final static class Consumer implements MessageListener {
		final Session session;
		private static final AtomicInteger nextExpectedSeqNum = new
AtomicInteger();
		
		private Consumer() {
			try {
				session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
				//session = connection.createSession(false,
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
				final Queue queue = session.createQueue(QUEUE_NAME+
"?consumer.prefetchSize=1");
				final MessageConsumer consumer = session.createConsumer(queue);
				consumer.setMessageListener(this);
			} catch (JMSException e) {
				e.printStackTrace();
				throw new RuntimeException(e);
			}
		}

		@Override
		public void onMessage(Message message) {
			try {
				final int seqNum = message.getIntProperty(SEQ_NUM_PROPERTY);
				if (seqNum == nextExpectedSeqNum.getAndIncrement()) {
					try {
						TimeUnit.SECONDS.sleep(1);
					} catch (InterruptedException e) {
						Thread.currentThread().interrupt();
					}
				} 
				
				message.acknowledge();
			} catch (JMSException e) {
				e.printStackTrace();
				throw new RuntimeException(e);
			} finally {
				LATCH.countDown();
			}
		}

		private void close() {
			if (session != null) {
				try {
					session.close();
				} catch (JMSException e) {
					e.printStackTrace();
					throw new RuntimeException(e);
				}
			}
		}
	}

}

====== END ====

-- 
View this message in context: http://old.nabble.com/%22Could-not-correlate-acknowledgment-with-dispatched-message%22-tp26308220p26308220.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message