activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From neilwilk <neil.wilkin...@ds-s.com>
Subject Re: javax.jms.JMSException: Invalid acknowledgement after rollback of TopicSession
Date Mon, 03 Mar 2008 15:30:57 GMT

Hi Again, all

Solved my own problem, but I've left this in just in case others make the
same mistake!

I should have been using a durable subscriber.  As soon as I corrected the
code to the following, ActiveMQ stopped generating the exceptions:

package neil.transactions;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class JMSTransactedTopicListen {

	public static class MessageHandler implements MessageListener{
		Session session;
		int count=0;
		
		public MessageHandler(Session aSession){
			session = aSession;
		}

		@Override
		public void onMessage(Message msg) {
			try {
				if (++count % 3 == 0){
					System.out.println("ROLLBACK message ID "+msg.getJMSMessageID());
					session.rollback();
					return;
				}
			} catch (JMSException e){
				e.printStackTrace();
				return;
			}
			
			try{
				System.out.println("COMMIT   message ID "+msg.getJMSMessageID());
				session.commit();
			} catch (JMSException e){
				e.printStackTrace();
			}
		}
	}

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		String clientId = "TRANSACTED_TOPIC_CLIENT_ID";
		System.out.println("JMSTransactedTopicListen::main()");
		try {
			Context context = new InitialContext();
			
			java.util.Hashtable<?,?> myEnvironment = context.getEnvironment();
			if (myEnvironment.containsKey("java.naming.provider.url"))
System.out.println("Connecting to url:
"+myEnvironment.get("java.naming.provider.url"));
							
			ConnectionFactory tcFactory = (ConnectionFactory)
context.lookup("ConnectionFactory");	
			Connection topicConn = tcFactory.createConnection();
			topicConn.setClientID(clientId);
			Session tcSession = topicConn.createSession(true, 0);
			
	        System.out.println("Consuming topic: " +
context.lookup("NeilTopic"));
			Topic myTopic = (Topic)context.lookup("NeilTopic");
			TopicSubscriber subscriber = tcSession.createDurableSubscriber(myTopic,
clientId);
			subscriber.setMessageListener(new MessageHandler(tcSession));

			System.out.println("Starting topicConn");
			topicConn.start();
			System.out.println("JMSTransactedTopicListen::main() started OK -
sleeping....");
			
			Thread.sleep(60 * 60 * 100);			
			
			System.out.println("JMSTransactedTopicListen::main() ... finished
sleeping...");

			tcSession.close();
			topicConn.close();	
		} catch (NamingException e){
			e.printStackTrace();
		} catch (JMSException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		System.out.println("JMSTransactedTopicListen::main() Done");
	}
}


neilwilk wrote:
> 
> Hi All
> 
> I'm experimenting with transactions.  They work fine for QueueSessions,
> but whenever I roll back a transaction on a TopicSession, I get the
> following exception:
> 
> ERROR Service                        - Async error occurred:
> javax.jms.JMSException: Invalid acknowledgment: MessageAck {commandId =
> 13, responseRequired = false, ackType = 3, consumerId =
> ID:briltp0073-2889-1204555304234-0:0:1:1, firstMessageId = null,
> lastMessageId = ID:briltp0073-2891-1204555307078-0:0:1:1:3, destination =
> topic://neil.MyTopic, transactionId = null, messageCount = 1}
> javax.jms.JMSException: Invalid acknowledgment: MessageAck {commandId =
> 13, responseRequired = false, ackType = 3, consumerId =
> ID:briltp0073-2889-1204555304234-0:0:1:1, firstMessageId = null,
> lastMessageId = ID:briltp0073-2891-1204555307078-0:0:1:1:3, destination =
> topic://neil.MyTopic, transactionId = null, messageCount = 1}
>         at
> org.apache.activemq.broker.region.TopicSubscription.acknowledge(TopicSubscription.java:217)
>         at
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:340)
>         at
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:427)
>         at
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
>         at
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:73)
>         at
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:73)
>         at
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:87)
>         at
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:440)
>         at
> org.apache.activemq.command.MessageAck.visit(MessageAck.java:196)
>         at
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:281)
>         at
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:178)
>         at
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
>         at
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:134)
>         at
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:138)
>         at
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:185)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:172)
>         at java.lang.Thread.run(Thread.java:619)
> 
> Has anyone experienced this?
> 
> Here's the code that results in the exception.  It is as simple as I can
> make it.  I've stared and stared at it and I cannot work out what is
> causing the exception.
> 
> package neil.transactions;
> 
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageListener;
> import javax.jms.Session;
> import javax.jms.Topic;
> import javax.jms.TopicConnection;
> import javax.jms.TopicConnectionFactory;
> import javax.jms.TopicSession;
> import javax.jms.TopicSubscriber;
> import javax.naming.Context;
> import javax.naming.InitialContext;
> import javax.naming.NamingException;
> 
> public class JMSTransactedTopicListen {
> 
> 	public static class MessageHandler implements MessageListener{
> 		Session session;
> 		int count=0;
> 		
> 		public MessageHandler(Session aSession){
> 			session = aSession;
> 		}
> 
> 		@Override
> 		public void onMessage(Message msg) {
> 			try {
> 				if (++count % 3 == 0){
> 					System.out.println("ROLLBACK message ID "+msg.getJMSMessageID());
> 					session.rollback();
> 					return;
> 				}
> 			} catch (JMSException e){
> 				e.printStackTrace();
> 				return;
> 			}
> 			
> 			try{
> 				System.out.println("COMMIT   message ID "+msg.getJMSMessageID());
> 				session.commit();
> 			} catch (JMSException e){
> 				e.printStackTrace();
> 			}
> 		}
> 	}
> 
> 	/**
> 	 * @param args
> 	 */
> 	public static void main(String[] args) {
> 		System.out.println("JMSTransactedTopicListen::main()");
> 		try {
> 			Context context = new InitialContext();
> 			
> 			java.util.Hashtable<?,?> myEnvironment = context.getEnvironment();
> 			if (myEnvironment.containsKey("java.naming.provider.url"))
> System.out.println("Connecting to url:
> "+myEnvironment.get("java.naming.provider.url"));
> 							
> 			TopicConnectionFactory tcFactory = (TopicConnectionFactory)
> context.lookup("ConnectionFactory");	
> 			TopicConnection topicConn = tcFactory.createTopicConnection();
> 			TopicSession tcSession = topicConn.createTopicSession(true, 0);
> 			
> 	        System.out.println("Consuming topic: " +
> context.lookup("NeilTopic"));
> 			Topic myTopic = (Topic)context.lookup("NeilTopic");
> 			TopicSubscriber subscriber = tcSession.createSubscriber(myTopic);
> 			subscriber.setMessageListener(new MessageHandler(tcSession));
> 
> 			System.out.println("Starting topicConn");
> 			topicConn.start();
> 			System.out.println("JMSTransactedTopicListen::main() started OK -
> sleeping....");
> 			
> 			Thread.sleep(60 * 60 * 100);			
> 			
> 			System.out.println("JMSTransactedTopicListen::main() ... finished
> sleeping...");
> 
> 			tcSession.close();
> 			topicConn.close();	
> 		} catch (NamingException e){
> 			e.printStackTrace();
> 		} catch (JMSException e) {
> 			e.printStackTrace();
> 		} catch (InterruptedException e) {
> 			e.printStackTrace();
> 		}
> 		
> 		System.out.println("JMSTransactedTopicListen::main() Done");
> 	}
> }
> 
> 
> 
> 

-- 
View this message in context: http://www.nabble.com/javax.jms.JMSException%3A-Invalid-acknowledgement-after-rollback-of-TopicSession-tp15805155s2354p15806371.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message