Return-Path: X-Original-To: apmail-activemq-dev-archive@www.apache.org Delivered-To: apmail-activemq-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 01999CF02 for ; Fri, 7 Mar 2014 16:42:57 +0000 (UTC) Received: (qmail 77175 invoked by uid 500); 7 Mar 2014 16:42:50 -0000 Delivered-To: apmail-activemq-dev-archive@activemq.apache.org Received: (qmail 76421 invoked by uid 500); 7 Mar 2014 16:42:47 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 76402 invoked by uid 99); 7 Mar 2014 16:42:45 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Mar 2014 16:42:45 +0000 Date: Fri, 7 Mar 2014 16:42:45 +0000 (UTC) From: "Gary Tully (JIRA)" To: dev@activemq.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (AMQNET-471) Synchronous message consumer will lose a message that failed to commit whilst the broker was unavailable MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/AMQNET-471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13924039#comment-13924039 ] Gary Tully commented on AMQNET-471: ----------------------------------- would it be possible to pull together a java/openwire only test case for comparison? > Synchronous message consumer will lose a message that failed to commit whilst the broker was unavailable > -------------------------------------------------------------------------------------------------------- > > Key: AMQNET-471 > URL: https://issues.apache.org/jira/browse/AMQNET-471 > Project: ActiveMQ .Net > Issue Type: Bug > Components: ActiveMQ > Affects Versions: 1.6.2 > Reporter: Imran > Assignee: Jim Gomes > Attachments: TransactionContext.cs.patch > > > If the broker is down then the client can not commit the current message. An exception is thrown by the library. This is the behavior you would expect. > If you then try and rollback the transaction on the session due to the exception and resume message consumption, the rolled back message will never be redelivered. > {code:title=Failing Test|borderStyle=solid} > [TestFixture, Explicit] > public class BrokerRestart > { > [Test] > public void Message_should_be_redilivered_if_broker_is_down_and_try_commit() > { > StartService(ActiveMqMaster); > DeleteQueue(); > SendMessageToQueue(); > var session = _connection.CreateSession(AcknowledgementMode.Transactional); > var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQ)); > var message = consumer.Receive(TimeSpan.FromSeconds(30)); > _log.Debug("Received message"); > StopService(ActiveMqMaster); > _log.Debug("Commiting transaction"); > try > { > session.Commit(); > } > catch (Exception ex) > { > _log.ErrorFormat("Exception: {0}", ex.ToString().Substring(0, 250)); > try > { > session.Rollback(); > } > catch (Exception einner) > { > _log.Debug("Rollback transaction"); > _log.ErrorFormat("Exception: {0}", einner.ToString().Substring(0, 250)); > } > } > StartService(ActiveMqMaster); > message = consumer.Receive(TimeSpan.FromSeconds(30)); > Assert.That(message, Is.Not.Null, "message was not redilivered"); > } > private void StartService(ServiceController service) > { > if (service.Status != ServiceControllerStatus.Running) > service.Start(); > service.WaitForStatus(ServiceControllerStatus.Running); > _log.Debug("Started Broker"); > } > private void StopService(ServiceController service) > { > if (service.Status != ServiceControllerStatus.Stopped) > service.Stop(); > service.WaitForStatus(ServiceControllerStatus.Stopped); > _log.Debug("Stopped Broker Broker"); > } > private void SendMessageToQueue() > { > using (var session = _connection.CreateSession()) > using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, InQ))) > { > producer.Send(producer.CreateTextMessage(1.ToString())); > session.Commit(); > } > _log.Debug("Primed Input Queue"); > } > private void DeleteQueue() > { > using (var session = _connection.CreateSession()) > { > SessionUtil.DeleteDestination(session, InQ); > } > } > [SetUp] > public void TestSetup() > { > LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss"); > _log = LogManager.GetLogger(typeof (BrokerRestart).Name); > var factory = new ConnectionFactory(@"failover:(tcp://localhost:61616)") > { > AcknowledgementMode = AcknowledgementMode.Transactional, > RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false }, > AsyncSend = false, > PrefetchPolicy = new PrefetchPolicy {All = 5} > }; > _connection = factory.CreateConnection(); > _connection.Start(); > //Tracer.Trace = new CommonLoggingTraceAdapter(); > } > protected ServiceController ActiveMqMaster = new ServiceController(@"ActiveMQ"); > //protected ServiceController ActiveMqSlave = new ServiceController(@"ActiveMQSlave"); > private IConnection _connection; > private const string InQ = "integration-test-q"; > private ILog _log; > } > {code} > {code:title=Passing Test With Patch|borderStyle=solid} > using System; > using System.Configuration; > using System.ServiceProcess; > using System.Threading; > using System.Threading.Tasks; > using Apache.NMS; > using Apache.NMS.ActiveMQ; > using Apache.NMS.Policies; > using Apache.NMS.Util; > using Common.Logging; > using Common.Logging.Simple; > using NUnit.Framework; > namespace IntegrationTests.ApacheNms.Jira > { > [TestFixture] > public class BrokerRestart > { > //AMQNET-471 > [Test] > public void Message_should_be_redilivered_if_broker_is_down_and_try_to_commit() > { > var session = _connection.CreateSession(AcknowledgementMode.Transactional); > var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQueue)); > SendMessageToQueue(); > consumer.Receive(TimeSpan.FromSeconds(5)); > StopService(ActiveMqMaster); > var commiter = TryCommit(session); > StartService(ActiveMqMaster); > commiter.Wait(); > var message = consumer.Receive(TimeSpan.FromSeconds(5)); > TryCommit(session).Wait(); > Assert.That(message, Is.Not.Null, "message was not redilivered"); > Assert.That(CountMessagesInQueue(OutQueue), Is.EqualTo(0)); > } > //Commit blocks if the broker is down with the patch for AMQNET-471 > private Task TryCommit(ISession session) > { > var task = Task.Factory.StartNew(() => > { > try > { > session.Commit(); > } > catch (Exception ex) > { > _log.ErrorFormat("Exception: {0}", ex.ToString().Substring(0, 250)); > try > { > session.Rollback(); > } > catch (Exception einner) > { > _log.Debug("Rollback transaction"); > _log.ErrorFormat("Exception: {0}", einner.ToString().Substring(0, 250)); > } > } > }); > //Give it a chance to start. > Thread.Sleep(1000); > return task; > } > private int CountMessagesInQueue(string queue) > { > var count = 0; > using (var session = _connection.CreateSession(AcknowledgementMode.Transactional)) > using (var consumerIn = session.CreateConsumer(SessionUtil.GetDestination(session, queue))) > { > while (true) > { > var msg = consumerIn.Receive(TimeSpan.FromSeconds(2)); > if (msg == null) > break; > count++; > } > } > return count; > } > private void StartService(ServiceController service) > { > if (service.Status != ServiceControllerStatus.Running || service.Status == ServiceControllerStatus.StartPending) > service.Start(); > service.WaitForStatus(ServiceControllerStatus.Running); > _log.Debug("Started Broker"); > } > private void StopService(ServiceController service) > { > if (service.Status != ServiceControllerStatus.Stopped) > service.Stop(); > service.WaitForStatus(ServiceControllerStatus.Stopped); > _log.Debug("Stopped Broker"); > } > private void SendMessageToQueue() > { > using (var session = _connection.CreateSession()) > using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, InQueue))) > { > producer.Send(producer.CreateTextMessage(1.ToString())); > session.Commit(); > } > _log.Debug("Primed Input Queue"); > } > private void DeleteQueue(string queue) > { > using (var session = _connection.CreateSession()) > { > SessionUtil.DeleteDestination(session, queue); > } > } > [SetUp] > public void TestSetup() > { > LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss"); > _log = LogManager.GetLogger(typeof(BrokerRestart).Name); > StartService(ActiveMqMaster); > _factory = new ConnectionFactory(ActiveMqConnectionString) > { > AcknowledgementMode = AcknowledgementMode.Transactional, > RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false }, > AsyncSend = false, > PrefetchPolicy = new PrefetchPolicy { All = 5 } > }; > _connection = _factory.CreateConnection(); > _log.Debug("Starting connection"); > _connection.Start(); > _log.Debug("Connection established"); > DeleteQueue(InQueue); > DeleteQueue(OutQueue); > //Tracer.Trace = new CommonLoggingTraceAdapter(); > } > [TearDown] > public void TestTearDown() > { > _connection.Dispose(); > } > protected ServiceController ActiveMqMaster = new ServiceController(ActiveMasterServiceName, ActiveMqMachineName); > protected ServiceController ActiveMqSlave = new ServiceController(ActiveMqSlaveServiceName, ActiveMqMachineName); > private static readonly string ActiveMqMachineName = ConfigurationManager.AppSettings["ActiveMqServiceMachineName"]; > private static readonly string ActiveMqConnectionString = ConfigurationManager.ConnectionStrings["ActiveMqServer"].ConnectionString; > private static readonly string ActiveMasterServiceName = ConfigurationManager.AppSettings["ActiveMqMasterName"]; > private static readonly string ActiveMqSlaveServiceName = ConfigurationManager.AppSettings["ActiveMqSlaveName"]; > private IConnection _connection; > private const string InQueue = "integration-test-q-in"; > private const string OutQueue = "integration-test-q-out"; > private ILog _log; > private ConnectionFactory _factory; > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)