Return-Path: X-Original-To: apmail-activemq-dev-archive@www.apache.org Delivered-To: apmail-activemq-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BB4B510D66 for ; Thu, 13 Mar 2014 15:06:11 +0000 (UTC) Received: (qmail 3430 invoked by uid 500); 13 Mar 2014 15:04:51 -0000 Delivered-To: apmail-activemq-dev-archive@activemq.apache.org Received: (qmail 3323 invoked by uid 500); 13 Mar 2014 15:04:48 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 3299 invoked by uid 99); 13 Mar 2014 15:04:47 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Mar 2014 15:04:47 +0000 Date: Thu, 13 Mar 2014 15:04:47 +0000 (UTC) From: "Imran (JIRA)" To: dev@activemq.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Comment Edited] (AMQNET-472) Synchronous DTC Consumer will experience duplicates on transaction rollback MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/AMQNET-472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13933361#comment-13933361 ] Imran edited comment on AMQNET-472 at 3/13/14 3:02 PM: ------------------------------------------------------- I think I have found the issue here. Suggested patch is attached. It looks like there were some lock conditions missing. Since all dtc operations are completed asynchronously, the main processing thread could possibly have moved on to consuming a new message in a new transaction even before the previous transaction had committed or rolled back in this case. The message consumer needs to check if the next message being consumed is part of the same transaction or a new transaction and dispatch / block respectively. Messages that are part of the same transaction will be rolled back or committed together and trying to consume a message in a new transaction will block until the last transaction has completed. was (Author: imran99): I think I have found the issue here. Suggested patch is attached. It looks like there were some lock conditions missing. Since all dtc operations are completed asynchronously, the main processing thread could possibly have moved on to consuming a new message in a new transaction even before the previous transaction had committed or rolled back in this case. The message consumer needs to check if the next message being consumed is part of the same transaction or a new transaction and dispatch / block respectively. Messages that are part of the same transactions will be rolled back or committed together and trying to consume a message in a new transaction will block until the last transaction has completed. > Synchronous DTC Consumer will experience duplicates on transaction rollback > --------------------------------------------------------------------------- > > Key: AMQNET-472 > URL: https://issues.apache.org/jira/browse/AMQNET-472 > Project: ActiveMQ .Net > Issue Type: Bug > Components: ActiveMQ > Affects Versions: 1.6.2 > Reporter: Imran > Assignee: Jim Gomes > Attachments: NetTxMessageConsumer.cs > > > Rollback when using DTC will result in a duplicate message being received. > {code:title=FailingTest|borderStyle=solid} > using System; > using System.Configuration; > using System.ServiceProcess; > using System.Transactions; > using Apache.NMS; > using Apache.NMS.ActiveMQ; > using Apache.NMS.Policies; > using Apache.NMS.Util; > using Common.Logging; > using Common.Logging.Simple; > using NUnit.Framework; > namespace IntegrationTests.ApacheNms.Jira > { > [TestFixture] > public class Dtc > { > [Test, Explicit("Bug in 1.6.2")] > public void First_message_should_be_redilivered_and_republished_on_rollback_and_second_message_processed_as_normal() > { > SendMessageToQueue("1"); > SendMessageToQueue("2"); > var session = _connection.CreateSession(); > var sessionTwo = _connection.CreateSession(); > var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQueue)); > var producer = sessionTwo.CreateProducer(SessionUtil.GetDestination(session, OutQueue)); > _log.Debug("Process message one and rollback"); > var transaction = new TransactionScope(TransactionScopeOption.RequiresNew); > var message = consumer.Receive(TimeSpan.FromSeconds(30)); > producer.Send(message); > transaction.Dispose(); > _log.Debug("Processing message two and commit"); > transaction = new TransactionScope(TransactionScopeOption.RequiresNew); > message = consumer.Receive(TimeSpan.FromSeconds(30)); > producer.Send(message); > transaction.Complete(); > transaction.Dispose(); > _log.Debug("Processing message one replay and commit"); > transaction = new TransactionScope(TransactionScopeOption.RequiresNew); > message = consumer.Receive(TimeSpan.FromSeconds(30)); > producer.Send(message); > transaction.Complete(); > transaction.Dispose(); > _log.Debug("Process any repeats, there should be none"); > transaction = new TransactionScope(TransactionScopeOption.RequiresNew); > message = consumer.Receive(TimeSpan.FromSeconds(30)); > if (message != null) > producer.Send(message); > transaction.Complete(); > transaction.Dispose(); > session.Dispose(); > sessionTwo.Dispose(); > Assert.That(CountMessagesInQueue(InQueue), Is.EqualTo(0)); > Assert.That(CountMessagesInQueue(OutQueue), Is.EqualTo(2)); > } > public static void TransactionCallback(object s, TransactionEventArgs e) > { > LogManager.GetCurrentClassLogger().DebugFormat("Tranasaction {0}", e.Transaction.TransactionInformation.Status); > } > private int CountMessagesInQueue(string queue) > { > var count = 0; > using (var session = _connection.CreateSession()) > using (var consumerIn = session.CreateConsumer(SessionUtil.GetDestination(session, queue))) > using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew)) > { > while (true) > { > var msg = consumerIn.Receive(TimeSpan.FromSeconds(2)); > if (msg == null) > break; > count++; > } > } > return count; > } > private void StartService(ServiceController service) > { > if (service.Status != ServiceControllerStatus.Running) > service.Start(); > service.WaitForStatus(ServiceControllerStatus.Running); > _log.Debug("Started Broker"); > } > private void StopService(ServiceController service) > { > if (service.Status != ServiceControllerStatus.Stopped) > service.Stop(); > service.WaitForStatus(ServiceControllerStatus.Stopped); > _log.Debug("Stopped Broker Broker"); > } > private void SendMessageToQueue(string message) > { > using (var session = _connection.CreateSession()) > using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, InQueue))) > using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew)) > { > producer.Send(producer.CreateTextMessage(message)); > scope.Complete(); > } > _log.Debug("Primed Input Queue"); > } > private void DeleteQueue(string queue) > { > using (var session = _connection.CreateSession()) > { > SessionUtil.DeleteDestination(session, queue); > } > } > [SetUp] > public void TestSetup() > { > LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss"); > _log = LogManager.GetLogger(typeof(Dtc).Name); > StartService(ActiveMqMaster); > StopService(ActiveMqSlave); > _factory = new NetTxConnectionFactory(ActiveMqConnectionString) > { > AcknowledgementMode = AcknowledgementMode.Transactional, > RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false }, > DispatchAsync = true, > AsyncSend = false, > PrefetchPolicy = new PrefetchPolicy { All = 5 }, > }; > _connection = _factory.CreateConnection(); > _log.Debug("Starting connection"); > _connection.Start(); > _log.Debug("Connection established"); > DeleteQueue(InQueue); > DeleteQueue(OutQueue); > //Tracer.Trace = new CommonLoggingTraceAdapter(); > } > [TearDown] > public void TestTearDown() > { > _connection.Dispose(); > } > protected ServiceController ActiveMqMaster = new ServiceController(ActiveMasterServiceName, ActiveMqMachineName); > protected ServiceController ActiveMqSlave = new ServiceController(ActiveMqSlaveServiceName, ActiveMqMachineName); > private static readonly string ActiveMqMachineName = ConfigurationManager.AppSettings["ActiveMqServiceMachineName"]; > private static readonly string ActiveMqConnectionString = ConfigurationManager.ConnectionStrings["ActiveMqServer"].ConnectionString; > private static readonly string ActiveMasterServiceName = ConfigurationManager.AppSettings["ActiveMqMasterName"]; > private static readonly string ActiveMqSlaveServiceName = ConfigurationManager.AppSettings["ActiveMqSlaveName"]; > private IConnection _connection; > private const string InQueue = "integration-test-q-in"; > private const string OutQueue = "integration-test-q-out"; > private ILog _log; > private NetTxConnectionFactory _factory; > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)