activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Imran (JIRA)" <jira+amq...@apache.org>
Subject [jira] [Updated] (AMQNET-471) Synchronous message consumer will lose a message that failed to commit whilst the broker was unavailable
Date Wed, 05 Mar 2014 13:19:44 GMT

     [ https://issues.apache.org/jira/browse/AMQNET-471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Imran updated AMQNET-471:
-------------------------

    Description: 
If the broker is down then the client can not commit the current message. An exception is
thrown by the library. This is the behavior you would expect.

If you then try and rollback the transaction on the session due to the exception and resume
message consumption, the rolled back message will never be redelivered.

{code:title=FailingTest|borderStyle=solid} 
 [TestFixture, Explicit]
    public class BrokerRestart
    {
        [Test]
        public void Message_should_be_redilivered_if_broker_is_down_and_try_commit()
        {
            StartService(ActiveMqMaster);
            DeleteQueue();
            SendMessageToQueue();
            var session = _connection.CreateSession(AcknowledgementMode.Transactional);
            var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQ));

            var message = consumer.Receive(TimeSpan.FromSeconds(30));
            _log.Debug("Received message");
            StopService(ActiveMqMaster);
            _log.Debug("Commiting transaction");
            try
            {
                session.Commit();
            }
            catch (Exception ex)
            {
                _log.ErrorFormat("Exception: {0}", ex.ToString().Substring(0, 250));
                try
                {
                    session.Rollback();
                }
                catch (Exception einner)
                {
                    _log.Debug("Rollback transaction");
                    _log.ErrorFormat("Exception: {0}", einner.ToString().Substring(0, 250));
                }
            }
            StartService(ActiveMqMaster);
            message = consumer.Receive(TimeSpan.FromSeconds(30));

            Assert.That(message, Is.Not.Null, "message was not redilivered");
        }

        private void StartService(ServiceController service)
        {
            if (service.Status != ServiceControllerStatus.Running)
                service.Start();
            service.WaitForStatus(ServiceControllerStatus.Running);
            _log.Debug("Started Broker");
        }

        private void StopService(ServiceController service)
        {
            if (service.Status != ServiceControllerStatus.Stopped)
                service.Stop();
            service.WaitForStatus(ServiceControllerStatus.Stopped);
            _log.Debug("Stopped Broker Broker");
        }

        private void SendMessageToQueue()
        {
            using (var session = _connection.CreateSession())
            using (var producer = session.CreateProducer(SessionUtil.GetDestination(session,
InQ)))
            {
                producer.Send(producer.CreateTextMessage(1.ToString()));
                session.Commit();
            }
            _log.Debug("Primed Input Queue");
        }

        private void DeleteQueue()
        {
            using (var session = _connection.CreateSession())
            {
                SessionUtil.DeleteDestination(session, InQ);
            }
        }

        [SetUp]
        public void TestSetup()
        {
            LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true,
true, true, "HH:MM:ss");
            _log = LogManager.GetLogger(typeof (BrokerRestart).Name);
            var factory = new ConnectionFactory(@"failover:(tcp://localhost:61616)")
            {
                AcknowledgementMode = AcknowledgementMode.Transactional,
                RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0, MaximumRedeliveries
= 3, BackOffMultiplier = 0, UseExponentialBackOff = false },
                AsyncSend = false,
                PrefetchPolicy = new PrefetchPolicy {All = 5}
            };
            _connection = factory.CreateConnection();
            _connection.Start();
            //Tracer.Trace = new CommonLoggingTraceAdapter();
        }

        protected ServiceController ActiveMqMaster = new ServiceController(@"ActiveMQ");
        //protected ServiceController ActiveMqSlave = new ServiceController(@"ActiveMQSlave");
        private IConnection _connection;
        private const string InQ = "integration-test-q";
        private ILog _log;
    }
{code}

  was:
If the broker is down then the client can not commit the current message. An exception is
thrown by the library. This is the behavior you would expect.

If you then try and rollback the transaction on the session due to the exception and resume
message consumption, the rolled back message will never be redelivered.

{code:title=FailingTest|borderStyle=solid} 
 [TestFixture, Explicit]
    public class BrokerRestart
    {
        [Test]
        public void Message_should_be_redilivered_if_broker_is_down_and_try_commit()
        {
            StartService(ActiveMqMaster);
            DeleteQueue();
            SendMessageToQueue();
            var session = _connection.CreateSession(AcknowledgementMode.Transactional);
            var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQ));

            var message = consumer.Receive(TimeSpan.FromSeconds(30));
            _log.Debug("Received message");
            StopService(ActiveMqMaster);
            _log.Debug("Commiting transaction");
            try
            {
                session.Commit();
            }
            catch (Exception ex)
            {
                _log.ErrorFormat("Exception: {0}", ex.ToString().Substring(0, 250));
                try
                {
                    session.Rollback();
                }
                catch (Exception einner)
                {
                    _log.Debug("Rollback transaction");
                    _log.ErrorFormat("Exception: {0}", einner.ToString().Substring(0, 250));
                }
            }
            StartService(ActiveMqMaster);
            message = consumer.Receive(TimeSpan.FromSeconds(30));

            Assert.That(message, Is.Not.Null, "message was not redilivered");
        }

        private void StartService(ServiceController service)
        {
            if (service.Status != ServiceControllerStatus.Running)
                service.Start();
            service.WaitForStatus(ServiceControllerStatus.Running);
            _log.Debug("Started Broker");
        }

        private void StopService(ServiceController service)
        {
            if (service.Status != ServiceControllerStatus.Stopped)
                service.Stop();
            service.WaitForStatus(ServiceControllerStatus.Stopped);
            _log.Debug("Stopped Broker Broker");
        }

        private void SendMessageToQueue()
        {
            using (var session = _connection.CreateSession())
            using (var producer = session.CreateProducer(SessionUtil.GetDestination(session,
InQ)))
            {
                producer.Send(producer.CreateTextMessage(1.ToString()));
                session.Commit();
            }
            _log.Debug("Primed Input Queue");
        }

        private void DeleteQueue()
        {
            using (var session = _connection.CreateSession())
            {
                SessionUtil.DeleteDestination(session, InQ);
            }
        }

        [SetUp]
        public void TestSetup()
        {
            LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true,
true, true, "HH:MM:ss");
            _log = LogManager.GetLogger(typeof (BrokerRestart).Name);
            var factory = new ConnectionFactory(@"failover:(tcp://localhost:61616)")
            {
                AcknowledgementMode = AcknowledgementMode.Transactional,
                RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0, MaximumRedeliveries
= 3, BackOffMultiplier = 0, UseExponentialBackOff = false },
                AsyncSend = false,
            };
            _connection = factory.CreateConnection();
            _connection.Start();
            //Tracer.Trace = new CommonLoggingTraceAdapter();
        }

        protected ServiceController ActiveMqMaster = new ServiceController(@"ActiveMQ");
        //protected ServiceController ActiveMqSlave = new ServiceController(@"ActiveMQSlave");
        private IConnection _connection;
        private const string InQ = "integration-test-q";
        private ILog _log;
    }
{code}


> Synchronous message consumer will lose a message that failed to commit whilst the broker
was unavailable
> --------------------------------------------------------------------------------------------------------
>
>                 Key: AMQNET-471
>                 URL: https://issues.apache.org/jira/browse/AMQNET-471
>             Project: ActiveMQ .Net
>          Issue Type: Bug
>          Components: ActiveMQ
>    Affects Versions: 1.6.2
>            Reporter: Imran
>            Assignee: Jim Gomes
>
> If the broker is down then the client can not commit the current message. An exception
is thrown by the library. This is the behavior you would expect.
> If you then try and rollback the transaction on the session due to the exception and
resume message consumption, the rolled back message will never be redelivered.
> {code:title=FailingTest|borderStyle=solid} 
>  [TestFixture, Explicit]
>     public class BrokerRestart
>     {
>         [Test]
>         public void Message_should_be_redilivered_if_broker_is_down_and_try_commit()
>         {
>             StartService(ActiveMqMaster);
>             DeleteQueue();
>             SendMessageToQueue();
>             var session = _connection.CreateSession(AcknowledgementMode.Transactional);
>             var consumer = session.CreateConsumer(SessionUtil.GetDestination(session,
InQ));
>             var message = consumer.Receive(TimeSpan.FromSeconds(30));
>             _log.Debug("Received message");
>             StopService(ActiveMqMaster);
>             _log.Debug("Commiting transaction");
>             try
>             {
>                 session.Commit();
>             }
>             catch (Exception ex)
>             {
>                 _log.ErrorFormat("Exception: {0}", ex.ToString().Substring(0, 250));
>                 try
>                 {
>                     session.Rollback();
>                 }
>                 catch (Exception einner)
>                 {
>                     _log.Debug("Rollback transaction");
>                     _log.ErrorFormat("Exception: {0}", einner.ToString().Substring(0,
250));
>                 }
>             }
>             StartService(ActiveMqMaster);
>             message = consumer.Receive(TimeSpan.FromSeconds(30));
>             Assert.That(message, Is.Not.Null, "message was not redilivered");
>         }
>         private void StartService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Running)
>                 service.Start();
>             service.WaitForStatus(ServiceControllerStatus.Running);
>             _log.Debug("Started Broker");
>         }
>         private void StopService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Stopped)
>                 service.Stop();
>             service.WaitForStatus(ServiceControllerStatus.Stopped);
>             _log.Debug("Stopped Broker Broker");
>         }
>         private void SendMessageToQueue()
>         {
>             using (var session = _connection.CreateSession())
>             using (var producer = session.CreateProducer(SessionUtil.GetDestination(session,
InQ)))
>             {
>                 producer.Send(producer.CreateTextMessage(1.ToString()));
>                 session.Commit();
>             }
>             _log.Debug("Primed Input Queue");
>         }
>         private void DeleteQueue()
>         {
>             using (var session = _connection.CreateSession())
>             {
>                 SessionUtil.DeleteDestination(session, InQ);
>             }
>         }
>         [SetUp]
>         public void TestSetup()
>         {
>             LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true,
true, true, "HH:MM:ss");
>             _log = LogManager.GetLogger(typeof (BrokerRestart).Name);
>             var factory = new ConnectionFactory(@"failover:(tcp://localhost:61616)")
>             {
>                 AcknowledgementMode = AcknowledgementMode.Transactional,
>                 RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0,
MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false },
>                 AsyncSend = false,
>                 PrefetchPolicy = new PrefetchPolicy {All = 5}
>             };
>             _connection = factory.CreateConnection();
>             _connection.Start();
>             //Tracer.Trace = new CommonLoggingTraceAdapter();
>         }
>         protected ServiceController ActiveMqMaster = new ServiceController(@"ActiveMQ");
>         //protected ServiceController ActiveMqSlave = new ServiceController(@"ActiveMQSlave");
>         private IConnection _connection;
>         private const string InQ = "integration-test-q";
>         private ILog _log;
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message