activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hiram Chirino" <hi...@hiramchirino.com>
Subject Re: Redelivery problem with MessageListener and Session rollback
Date Mon, 17 Apr 2006 19:34:51 GMT
Hi Rodrigo,

Is that junit test a patch you want to contibute to Apache?  If so,
I'll add it to our test suite asap.

Regards,
Hiram

On 4/15/06, Rodrigo S de Castro <rodsc@terra.com.br> wrote:
>
> Hi,
>
> 1) I have a MessageListener implementation that rolls back the session if
> something goes wrong. I expected the message to be redelivered to this
> listener, but that does not happen. It is never redelivered.
>
> This code does NOT work:
>
>     private class MessageListenerTest implements MessageListener {
>         private Session session;
>         public int counter = 0;
>
>         public MessageListenerTest(ActiveMQMessageConsumer session) {
>                 this.session = session;
>         }
>
>                 public void onMessage(Message message) {
>                         try {
>                                 System.out.println("Message: " + message);
>                                 counter++;
>                                 if (counter <= 2) {
>                                         System.out.println("ROLLBACK");
>                                         session.rollback();
>                                 } else {
>                                         System.out.println("COMMIT");
>                                         message.acknowledge();
>                                         session.commit();
>                                 }
>                         } catch(JMSException e) {
>                                 System.err.println("Error when rolling back transaction");
>                         }
>                 }
>     }
>
>
> 2) The only I managed to make it redeliver is to pass a reference to
> MessageConsumer to the MessageListener implementation, cast it to
> ActiveMQMessageConsumer and call its rollback method.
>
> The code below DOES work:
>
>     private class MessageListenerTest implements MessageListener {
>         private ActiveMQMessageConsumer consumer;
>         public int counter = 0;
>
>         public MessageListenerTest(ActiveMQMessageConsumer consumer) {
>                 this.consumer = consumer;
>         }
>
>                 public void onMessage(Message message) {
>                         try {
>                                 System.out.println("Message: " + message);
>                                 counter++;
>                                 if (counter <= 2) {
>                                         System.out.println("ROLLBACK");
>                                         session.rollback();
>                                 } else {
>                                         System.out.println("COMMIT");
>                                         message.acknowledge();
>                                         session.commit();
>                                 }
>                         } catch(JMSException e) {
>                                 System.err.println("Error when rolling back transaction");
>                         }
>                 }
>     }
>
> It this right? I think that session.rollback() should work as well.
>
> Below a JUnit code that shows this problem:
>
>
> import javax.jms.Connection;
> import javax.jms.DeliveryMode;
> import javax.jms.Destination;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.MessageProducer;
> import javax.jms.Queue;
> import javax.jms.Session;
> import javax.jms.TextMessage;
>
> import junit.framework.TestCase;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.ActiveMQMessageConsumer;
> import org.apache.activemq.RedeliveryPolicy;
> import org.apache.activemq.command.ActiveMQMessage;
>
> public class MessageListenerRedeliveryTest extends TestCase {
>
>     private Connection connection;
>
>     protected void setUp() throws Exception {
>         connection = createConnection();
>     }
>
>     /**
>      * @see junit.framework.TestCase#tearDown()
>      */
>     protected void tearDown() throws Exception {
>         if (connection != null) {
>             connection.close();
>             connection = null;
>         }
>     }
>
>     protected RedeliveryPolicy getRedeliveryPolicy() {
>         RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
>         redeliveryPolicy.setInitialRedeliveryDelay(1000);
>         redeliveryPolicy.setBackOffMultiplier((short) 5);
>         redeliveryPolicy.setMaximumRedeliveries(10);
>         redeliveryPolicy.setUseExponentialBackOff(true);
>         return redeliveryPolicy;
>     }
>
>     protected Connection createConnection() throws Exception{
>         ActiveMQConnectionFactory factory = new
> ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
>         factory.setRedeliveryPolicy(getRedeliveryPolicy());
>         return factory.createConnection();
>     }
>
>     private class ConsumerMessageListenerTest implements MessageListener {
>         private ActiveMQMessageConsumer consumer;
>         public int counter = 0;
>
>         public ConsumerMessageListenerTest(ActiveMQMessageConsumer consumer) {
>                 this.consumer = consumer;
>         }
>
>                 public void onMessage(Message message) {
>                         try {
>                                 System.out.println("Message: " + message);
>                                 counter++;
>                                 if (counter <= 2) {
>                                         System.out.println("ROLLBACK");
>                                         consumer.rollback();
>                                 } else {
>                                         System.out.println("COMMIT");
>                                         message.acknowledge();
>                                         consumer.commit();
>                                 }
>                         } catch(JMSException e) {
>                                 System.err.println("Error when rolling back transaction");
>                         }
>                 }
>     }
>
>     private class SessionMessageListenerTest implements MessageListener {
>         private Session session;
>         public int counter = 0;
>
>         public SessionMessageListenerTest(Session session) {
>                 this.session = session;
>         }
>
>                 public void onMessage(Message message) {
>                         try {
>                                 System.out.println("Message: " + message);
>                                 counter++;
>                                 if (counter <= 2) {
>                                         System.out.println("ROLLBACK");
>                                         session.rollback();
>                                 } else {
>                                         System.out.println("COMMIT");
>                                         message.acknowledge();
>                                         session.commit();
>                                 }
>                         } catch(JMSException e) {
>                                 System.err.println("Error when rolling back transaction");
>                         }
>                 }
>     }
>
>     public void testQueueRollbackMessageListener() throws JMSException {
>         connection.start();
>
>         Session session = connection.createSession(true,
> Session.CLIENT_ACKNOWLEDGE);
>         Queue queue = session.createQueue("queue-"+getName());
>         MessageProducer producer = createProducer(session, queue);
>         Message message = createTextMessage(session);
>         producer.send(message);
>         session.commit();
>
>         MessageConsumer consumer = session.createConsumer(queue);
>
>         ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer) consumer;
>         mc.setRedeliveryPolicy(getRedeliveryPolicy());
>
>         SessionMessageListenerTest listener = new
> SessionMessageListenerTest(session);
>         consumer.setMessageListener(listener);
>
>         // redelivery works with the code below
>         /*
>         ConsumerMessageListenerTest listener = new
> ConsumerMessageListenerTest(session);
>         consumer.setMessageListener(listener);
>          */
>
>         try {
>                 Thread.sleep(7000);
>         } catch(InterruptedException e) {
>
>         }
>         assertEquals(2, listener.counter);
>
>         producer.send(createTextMessage(session));
>         session.commit();
>
>         try {
>                 Thread.sleep(2000);
>         } catch(InterruptedException e) {
>                 // ignore
>         }
>         assertEquals(3, listener.counter);
>
>         session.close();
>     }
>
>     private TextMessage createTextMessage(Session session) throws
> JMSException {
>         return session.createTextMessage("Hello");
>     }
>
>     private MessageProducer createProducer(Session session, Destination
> queue) throws JMSException {
>          MessageProducer producer = session.createProducer(queue);
>          producer.setDeliveryMode(getDeliveryMode());
>          return producer;
>     }
>
>     protected int getDeliveryMode() {
>         return DeliveryMode.PERSISTENT;
>     }
> }
>
>
>
>
> --
> View this message in context: http://www.nabble.com/Redelivery-problem-with-MessageListener-and-Session-rollback-t1455413.html#a3933378
> Sent from the ActiveMQ - User forum at Nabble.com.
>
>


--
Regards,
Hiram

Mime
View raw message