activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Gary Tully (JIRA)" <jira+amq...@apache.org>
Subject [jira] [Commented] (AMQNET-471) Synchronous message consumer will lose a message that failed to commit whilst the broker was unavailable
Date Fri, 07 Mar 2014 16:42:45 GMT

    [ https://issues.apache.org/jira/browse/AMQNET-471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13924039#comment-13924039
] 

Gary Tully commented on AMQNET-471:
-----------------------------------

would it be possible to pull together a java/openwire only test case for comparison?

> Synchronous message consumer will lose a message that failed to commit whilst the broker
was unavailable
> --------------------------------------------------------------------------------------------------------
>
>                 Key: AMQNET-471
>                 URL: https://issues.apache.org/jira/browse/AMQNET-471
>             Project: ActiveMQ .Net
>          Issue Type: Bug
>          Components: ActiveMQ
>    Affects Versions: 1.6.2
>            Reporter: Imran
>            Assignee: Jim Gomes
>         Attachments: TransactionContext.cs.patch
>
>
> If the broker is down then the client can not commit the current message. An exception
is thrown by the library. This is the behavior you would expect.
> If you then try and rollback the transaction on the session due to the exception and
resume message consumption, the rolled back message will never be redelivered.
> {code:title=Failing Test|borderStyle=solid} 
>  [TestFixture, Explicit]
>     public class BrokerRestart
>     {
>         [Test]
>         public void Message_should_be_redilivered_if_broker_is_down_and_try_commit()
>         {
>             StartService(ActiveMqMaster);
>             DeleteQueue();
>             SendMessageToQueue();
>             var session = _connection.CreateSession(AcknowledgementMode.Transactional);
>             var consumer = session.CreateConsumer(SessionUtil.GetDestination(session,
InQ));
>             var message = consumer.Receive(TimeSpan.FromSeconds(30));
>             _log.Debug("Received message");
>             StopService(ActiveMqMaster);
>             _log.Debug("Commiting transaction");
>             try
>             {
>                 session.Commit();
>             }
>             catch (Exception ex)
>             {
>                 _log.ErrorFormat("Exception: {0}", ex.ToString().Substring(0, 250));
>                 try
>                 {
>                     session.Rollback();
>                 }
>                 catch (Exception einner)
>                 {
>                     _log.Debug("Rollback transaction");
>                     _log.ErrorFormat("Exception: {0}", einner.ToString().Substring(0,
250));
>                 }
>             }
>             StartService(ActiveMqMaster);
>             message = consumer.Receive(TimeSpan.FromSeconds(30));
>             Assert.That(message, Is.Not.Null, "message was not redilivered");
>         }
>         private void StartService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Running)
>                 service.Start();
>             service.WaitForStatus(ServiceControllerStatus.Running);
>             _log.Debug("Started Broker");
>         }
>         private void StopService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Stopped)
>                 service.Stop();
>             service.WaitForStatus(ServiceControllerStatus.Stopped);
>             _log.Debug("Stopped Broker Broker");
>         }
>         private void SendMessageToQueue()
>         {
>             using (var session = _connection.CreateSession())
>             using (var producer = session.CreateProducer(SessionUtil.GetDestination(session,
InQ)))
>             {
>                 producer.Send(producer.CreateTextMessage(1.ToString()));
>                 session.Commit();
>             }
>             _log.Debug("Primed Input Queue");
>         }
>         private void DeleteQueue()
>         {
>             using (var session = _connection.CreateSession())
>             {
>                 SessionUtil.DeleteDestination(session, InQ);
>             }
>         }
>         [SetUp]
>         public void TestSetup()
>         {
>             LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true,
true, true, "HH:MM:ss");
>             _log = LogManager.GetLogger(typeof (BrokerRestart).Name);
>             var factory = new ConnectionFactory(@"failover:(tcp://localhost:61616)")
>             {
>                 AcknowledgementMode = AcknowledgementMode.Transactional,
>                 RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0,
MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false },
>                 AsyncSend = false,
>                 PrefetchPolicy = new PrefetchPolicy {All = 5}
>             };
>             _connection = factory.CreateConnection();
>             _connection.Start();
>             //Tracer.Trace = new CommonLoggingTraceAdapter();
>         }
>         protected ServiceController ActiveMqMaster = new ServiceController(@"ActiveMQ");
>         //protected ServiceController ActiveMqSlave = new ServiceController(@"ActiveMQSlave");
>         private IConnection _connection;
>         private const string InQ = "integration-test-q";
>         private ILog _log;
>     }
> {code}
> {code:title=Passing Test With Patch|borderStyle=solid} 
> using System;
> using System.Configuration;
> using System.ServiceProcess;
> using System.Threading;
> using System.Threading.Tasks;
> using Apache.NMS;
> using Apache.NMS.ActiveMQ;
> using Apache.NMS.Policies;
> using Apache.NMS.Util;
> using Common.Logging;
> using Common.Logging.Simple;
> using NUnit.Framework;
> namespace IntegrationTests.ApacheNms.Jira
> {
>     [TestFixture]
>     public class BrokerRestart
>     {
>         //AMQNET-471
>         [Test]
>         public void Message_should_be_redilivered_if_broker_is_down_and_try_to_commit()
>         {
>             var session = _connection.CreateSession(AcknowledgementMode.Transactional);
>             var consumer = session.CreateConsumer(SessionUtil.GetDestination(session,
InQueue));
>             SendMessageToQueue();
>             consumer.Receive(TimeSpan.FromSeconds(5));
>             StopService(ActiveMqMaster);
>             var commiter = TryCommit(session);
>             StartService(ActiveMqMaster);
>             commiter.Wait();
>             var message = consumer.Receive(TimeSpan.FromSeconds(5));
>             TryCommit(session).Wait();
>             Assert.That(message, Is.Not.Null, "message was not redilivered");
>             Assert.That(CountMessagesInQueue(OutQueue), Is.EqualTo(0));
>         }
>         //Commit blocks if the broker is down with the patch for AMQNET-471
>         private Task TryCommit(ISession session)
>         {
>             var task = Task.Factory.StartNew(() =>
>             {
>                 try
>                 {
>                     session.Commit();
>                 }
>                 catch (Exception ex)
>                 {
>                     _log.ErrorFormat("Exception: {0}", ex.ToString().Substring(0, 250));
>                     try
>                     {
>                         session.Rollback();
>                     }
>                     catch (Exception einner)
>                     {
>                         _log.Debug("Rollback transaction");
>                         _log.ErrorFormat("Exception: {0}", einner.ToString().Substring(0,
250));
>                     }
>                 }
>             });
>             //Give it a chance to start.
>             Thread.Sleep(1000);
>             return task;
>         }
>         private int CountMessagesInQueue(string queue)
>         {
>             var count = 0;
>             using (var session = _connection.CreateSession(AcknowledgementMode.Transactional))
>             using (var consumerIn = session.CreateConsumer(SessionUtil.GetDestination(session,
queue)))
>             {
>                 while (true)
>                 {
>                     var msg = consumerIn.Receive(TimeSpan.FromSeconds(2));
>                     if (msg == null)
>                         break;
>                     count++;
>                 }
>             }
>             return count;
>         }
>         private void StartService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Running || service.Status ==
ServiceControllerStatus.StartPending)
>                 service.Start();
>             service.WaitForStatus(ServiceControllerStatus.Running);
>             _log.Debug("Started Broker");
>         }
>         private void StopService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Stopped)
>                 service.Stop();
>             service.WaitForStatus(ServiceControllerStatus.Stopped);
>             _log.Debug("Stopped Broker");
>         }
>         private void SendMessageToQueue()
>         {
>             using (var session = _connection.CreateSession())
>             using (var producer = session.CreateProducer(SessionUtil.GetDestination(session,
InQueue)))
>             {
>                 producer.Send(producer.CreateTextMessage(1.ToString()));
>                 session.Commit();
>             }
>             _log.Debug("Primed Input Queue");
>         }
>         private void DeleteQueue(string queue)
>         {
>             using (var session = _connection.CreateSession())
>             {
>                 SessionUtil.DeleteDestination(session, queue);
>             }
>         }
>         [SetUp]
>         public void TestSetup()
>         {
>             LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true,
true, true, "HH:MM:ss");
>             _log = LogManager.GetLogger(typeof(BrokerRestart).Name);
>             StartService(ActiveMqMaster);
>             _factory = new ConnectionFactory(ActiveMqConnectionString)
>             {
>                 AcknowledgementMode = AcknowledgementMode.Transactional,
>                 RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0,
MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false },
>                 AsyncSend = false,
>                 PrefetchPolicy = new PrefetchPolicy { All = 5 }
>             };
>             _connection = _factory.CreateConnection();
>             _log.Debug("Starting connection");
>             _connection.Start();
>             _log.Debug("Connection established");
>             DeleteQueue(InQueue);
>             DeleteQueue(OutQueue);
>             //Tracer.Trace = new CommonLoggingTraceAdapter();
>         }
>         [TearDown]
>         public void TestTearDown()
>         {
>             _connection.Dispose();
>         }
>         protected ServiceController ActiveMqMaster = new ServiceController(ActiveMasterServiceName,
ActiveMqMachineName);
>         protected ServiceController ActiveMqSlave = new ServiceController(ActiveMqSlaveServiceName,
ActiveMqMachineName);
>         private static readonly string ActiveMqMachineName = ConfigurationManager.AppSettings["ActiveMqServiceMachineName"];
>         private static readonly string ActiveMqConnectionString = ConfigurationManager.ConnectionStrings["ActiveMqServer"].ConnectionString;
>         private static readonly string ActiveMasterServiceName = ConfigurationManager.AppSettings["ActiveMqMasterName"];
>         private static readonly string ActiveMqSlaveServiceName = ConfigurationManager.AppSettings["ActiveMqSlaveName"];
>         private IConnection _connection;
>         private const string InQueue = "integration-test-q-in";
>         private const string OutQueue = "integration-test-q-out";
>         private ILog _log;
>         private ConnectionFactory _factory;
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message