Return-Path: X-Original-To: apmail-activemq-dev-archive@www.apache.org Delivered-To: apmail-activemq-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4E880C8FB for ; Fri, 7 Mar 2014 14:24:53 +0000 (UTC) Received: (qmail 14566 invoked by uid 500); 7 Mar 2014 14:24:49 -0000 Delivered-To: apmail-activemq-dev-archive@activemq.apache.org Received: (qmail 14484 invoked by uid 500); 7 Mar 2014 14:24:47 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 14133 invoked by uid 99); 7 Mar 2014 14:24:44 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Mar 2014 14:24:44 +0000 Date: Fri, 7 Mar 2014 14:24:44 +0000 (UTC) From: "Imran (JIRA)" To: dev@activemq.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (AMQNET-471) Synchronous message consumer will lose a message that failed to commit whilst the broker was unavailable MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/AMQNET-471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13923902#comment-13923902 ] Imran commented on AMQNET-471: ------------------------------ I have attached a patch with a temporary workaround. I don't think this fixes the root cause of the issue but it will stop message loss. I have also added an updated test cases that passes with the patch applied. I think the root cause is that the fail over code should not be executing the same code as the commit (which it does at the moment). To change this would require a large re-factor so I would be curious to know how this is working in the java client as the nms and jms code look equivalent to me. > Synchronous message consumer will lose a message that failed to commit whilst the broker was unavailable > -------------------------------------------------------------------------------------------------------- > > Key: AMQNET-471 > URL: https://issues.apache.org/jira/browse/AMQNET-471 > Project: ActiveMQ .Net > Issue Type: Bug > Components: ActiveMQ > Affects Versions: 1.6.2 > Reporter: Imran > Assignee: Jim Gomes > Attachments: TransactionContext.cs.patch > > > If the broker is down then the client can not commit the current message. An exception is thrown by the library. This is the behavior you would expect. > If you then try and rollback the transaction on the session due to the exception and resume message consumption, the rolled back message will never be redelivered. > {code:title=FailingTest|borderStyle=solid} > [TestFixture, Explicit] > public class BrokerRestart > { > [Test] > public void Message_should_be_redilivered_if_broker_is_down_and_try_commit() > { > StartService(ActiveMqMaster); > DeleteQueue(); > SendMessageToQueue(); > var session = _connection.CreateSession(AcknowledgementMode.Transactional); > var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQ)); > var message = consumer.Receive(TimeSpan.FromSeconds(30)); > _log.Debug("Received message"); > StopService(ActiveMqMaster); > _log.Debug("Commiting transaction"); > try > { > session.Commit(); > } > catch (Exception ex) > { > _log.ErrorFormat("Exception: {0}", ex.ToString().Substring(0, 250)); > try > { > session.Rollback(); > } > catch (Exception einner) > { > _log.Debug("Rollback transaction"); > _log.ErrorFormat("Exception: {0}", einner.ToString().Substring(0, 250)); > } > } > StartService(ActiveMqMaster); > message = consumer.Receive(TimeSpan.FromSeconds(30)); > Assert.That(message, Is.Not.Null, "message was not redilivered"); > } > private void StartService(ServiceController service) > { > if (service.Status != ServiceControllerStatus.Running) > service.Start(); > service.WaitForStatus(ServiceControllerStatus.Running); > _log.Debug("Started Broker"); > } > private void StopService(ServiceController service) > { > if (service.Status != ServiceControllerStatus.Stopped) > service.Stop(); > service.WaitForStatus(ServiceControllerStatus.Stopped); > _log.Debug("Stopped Broker Broker"); > } > private void SendMessageToQueue() > { > using (var session = _connection.CreateSession()) > using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, InQ))) > { > producer.Send(producer.CreateTextMessage(1.ToString())); > session.Commit(); > } > _log.Debug("Primed Input Queue"); > } > private void DeleteQueue() > { > using (var session = _connection.CreateSession()) > { > SessionUtil.DeleteDestination(session, InQ); > } > } > [SetUp] > public void TestSetup() > { > LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss"); > _log = LogManager.GetLogger(typeof (BrokerRestart).Name); > var factory = new ConnectionFactory(@"failover:(tcp://localhost:61616)") > { > AcknowledgementMode = AcknowledgementMode.Transactional, > RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false }, > AsyncSend = false, > PrefetchPolicy = new PrefetchPolicy {All = 5} > }; > _connection = factory.CreateConnection(); > _connection.Start(); > //Tracer.Trace = new CommonLoggingTraceAdapter(); > } > protected ServiceController ActiveMqMaster = new ServiceController(@"ActiveMQ"); > //protected ServiceController ActiveMqSlave = new ServiceController(@"ActiveMQSlave"); > private IConnection _connection; > private const string InQ = "integration-test-q"; > private ILog _log; > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)