Return-Path: X-Original-To: apmail-activemq-dev-archive@www.apache.org Delivered-To: apmail-activemq-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D0EE010786 for ; Tue, 4 Mar 2014 16:33:43 +0000 (UTC) Received: (qmail 37303 invoked by uid 500); 4 Mar 2014 16:33:36 -0000 Delivered-To: apmail-activemq-dev-archive@activemq.apache.org Received: (qmail 36373 invoked by uid 500); 4 Mar 2014 16:33:33 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 35914 invoked by uid 99); 4 Mar 2014 16:33:27 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Mar 2014 16:33:27 +0000 Date: Tue, 4 Mar 2014 16:33:27 +0000 (UTC) From: "Gary Tully (JIRA)" To: dev@activemq.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (AMQNET-471) Synchronous message consumer will lose a message that failed to commit whilst the broker was unavailable MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/AMQNET-471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13919575#comment-13919575 ] Gary Tully commented on AMQNET-471: ----------------------------------- The idea of RollbackOnFailedRecoveryRedelivery is that a consumer in a transaction will track messages that it has received and will only allow a pending commit to complete after a failover, if it received the same messages again. Failover clients side retains state and replays it, broker side it a failover reconnect looks like a new consumer. So it is possible, with multiple consumers that messages get dispatched somewhere else after a failover reconnect. If this happens, client side notice missing messages and we force a rollback. That is the what '1 previously delivered...' not replayed means. Hope this helps your understanding. > Synchronous message consumer will lose a message that failed to commit whilst the broker was unavailable > -------------------------------------------------------------------------------------------------------- > > Key: AMQNET-471 > URL: https://issues.apache.org/jira/browse/AMQNET-471 > Project: ActiveMQ .Net > Issue Type: Bug > Components: ActiveMQ > Affects Versions: 1.6.2 > Reporter: Imran > Assignee: Jim Gomes > > If the broker is down then the client can not commit the current message. An exception is thrown by the library. This is the behavior you would expect. > If you then try and rollback the transaction on the session due to the exception and resume message consumption, the rolled back message will never be redelivered. > {code:title=FailingTest|borderStyle=solid} > [TestFixture, Explicit] > public class BrokerRestart > { > [Test] > public void Message_should_be_redilivered_if_broker_is_down_and_try_commit() > { > StartService(ActiveMqMaster); > DeleteQueue(); > SendMessageToQueue(); > var session = _connection.CreateSession(AcknowledgementMode.Transactional); > var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQ)); > var message = consumer.Receive(TimeSpan.FromSeconds(30)); > _log.Debug("Received message"); > StopService(ActiveMqMaster); > _log.Debug("Commiting transaction"); > try > { > session.Commit(); > } > catch (Exception ex) > { > _log.ErrorFormat("Exception: {0}", ex.ToString().Substring(0, 250)); > try > { > session.Rollback(); > } > catch (Exception einner) > { > _log.Debug("Rollback transaction"); > _log.ErrorFormat("Exception: {0}", einner.ToString().Substring(0, 250)); > } > } > StartService(ActiveMqMaster); > message = consumer.Receive(TimeSpan.FromSeconds(30)); > Assert.That(message, Is.Not.Null, "message was not redilivered"); > } > private void StartService(ServiceController service) > { > if (service.Status != ServiceControllerStatus.Running) > service.Start(); > service.WaitForStatus(ServiceControllerStatus.Running); > _log.Debug("Started Broker"); > } > private void StopService(ServiceController service) > { > if (service.Status != ServiceControllerStatus.Stopped) > service.Stop(); > service.WaitForStatus(ServiceControllerStatus.Stopped); > _log.Debug("Stopped Broker Broker"); > } > private void SendMessageToQueue() > { > using (var session = _connection.CreateSession()) > using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, InQ))) > { > producer.Send(producer.CreateTextMessage(1.ToString())); > session.Commit(); > } > _log.Debug("Primed Input Queue"); > } > private void DeleteQueue() > { > using (var session = _connection.CreateSession()) > { > SessionUtil.DeleteDestination(session, InQ); > } > } > [SetUp] > public void TestSetup() > { > LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss"); > _log = LogManager.GetLogger(typeof (BrokerRestart).Name); > var factory = new ConnectionFactory(@"failover:(tcp://localhost:61616)") > { > AcknowledgementMode = AcknowledgementMode.Transactional, > RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false }, > AsyncSend = false, > }; > _connection = factory.CreateConnection(); > _connection.Start(); > //Tracer.Trace = new CommonLoggingTraceAdapter(); > } > protected ServiceController ActiveMqMaster = new ServiceController(@"ActiveMQ"); > //protected ServiceController ActiveMqSlave = new ServiceController(@"ActiveMQSlave"); > private IConnection _connection; > private const string InQ = "integration-test-q"; > private ILog _log; > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)