Return-Path: X-Original-To: apmail-activemq-dev-archive@www.apache.org Delivered-To: apmail-activemq-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 16BC7107A1 for ; Mon, 3 Mar 2014 10:58:30 +0000 (UTC) Received: (qmail 20584 invoked by uid 500); 3 Mar 2014 10:58:28 -0000 Delivered-To: apmail-activemq-dev-archive@activemq.apache.org Received: (qmail 20124 invoked by uid 500); 3 Mar 2014 10:58:24 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 19953 invoked by uid 99); 3 Mar 2014 10:58:23 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Mar 2014 10:58:23 +0000 Date: Mon, 3 Mar 2014 10:58:22 +0000 (UTC) From: "Imran (JIRA)" To: dev@activemq.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (AMQNET-472) Synchronous DTC Consumer will experience duplicates on transaction rollback MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/AMQNET-472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Imran updated AMQNET-472: ------------------------- Description: Rollback when using DTC will result in a duplicate message being received. {code:title=FailingTest|borderStyle=solid} using System; using System.Configuration; using System.ServiceProcess; using System.Transactions; using Apache.NMS; using Apache.NMS.ActiveMQ; using Apache.NMS.Policies; using Apache.NMS.Util; using Common.Logging; using Common.Logging.Simple; using NUnit.Framework; namespace IntegrationTests.ApacheNms.Jira { [TestFixture] public class Dtc { [Test, Explicit("Bug in 1.6.2")] public void First_message_should_be_redilivered_and_republished_on_rollback_and_second_message_processed_as_normal() { SendMessageToQueue("1"); SendMessageToQueue("2"); var session = _connection.CreateSession(); var sessionTwo = _connection.CreateSession(); var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQueue)); var producer = sessionTwo.CreateProducer(SessionUtil.GetDestination(session, OutQueue)); _log.Debug("Process message one and rollback"); var transaction = new TransactionScope(TransactionScopeOption.RequiresNew); var message = consumer.Receive(TimeSpan.FromSeconds(30)); producer.Send(message); transaction.Dispose(); _log.Debug("Processing message two and commit"); transaction = new TransactionScope(TransactionScopeOption.RequiresNew); message = consumer.Receive(TimeSpan.FromSeconds(30)); producer.Send(message); transaction.Complete(); transaction.Dispose(); _log.Debug("Processing message one replay and commit"); transaction = new TransactionScope(TransactionScopeOption.RequiresNew); message = consumer.Receive(TimeSpan.FromSeconds(30)); producer.Send(message); transaction.Complete(); transaction.Dispose(); _log.Debug("Process any repeats, there should be none"); transaction = new TransactionScope(TransactionScopeOption.RequiresNew); message = consumer.Receive(TimeSpan.FromSeconds(30)); if (message != null) producer.Send(message); transaction.Complete(); transaction.Dispose(); session.Dispose(); sessionTwo.Dispose(); Assert.That(CountMessagesInQueue(InQueue), Is.EqualTo(0)); Assert.That(CountMessagesInQueue(OutQueue), Is.EqualTo(2)); } public static void TransactionCallback(object s, TransactionEventArgs e) { LogManager.GetCurrentClassLogger().DebugFormat("Tranasaction {0}", e.Transaction.TransactionInformation.Status); } private int CountMessagesInQueue(string queue) { var count = 0; using (var session = _connection.CreateSession()) using (var consumerIn = session.CreateConsumer(SessionUtil.GetDestination(session, queue))) using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew)) { while (true) { var msg = consumerIn.Receive(TimeSpan.FromSeconds(2)); if (msg == null) break; count++; } } return count; } private void StartService(ServiceController service) { if (service.Status != ServiceControllerStatus.Running) service.Start(); service.WaitForStatus(ServiceControllerStatus.Running); _log.Debug("Started Broker"); } private void StopService(ServiceController service) { if (service.Status != ServiceControllerStatus.Stopped) service.Stop(); service.WaitForStatus(ServiceControllerStatus.Stopped); _log.Debug("Stopped Broker Broker"); } private void SendMessageToQueue(string message) { using (var session = _connection.CreateSession()) using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, InQueue))) using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew)) { producer.Send(producer.CreateTextMessage(message)); scope.Complete(); } _log.Debug("Primed Input Queue"); } private void DeleteQueue(string queue) { using (var session = _connection.CreateSession()) { SessionUtil.DeleteDestination(session, queue); } } [SetUp] public void TestSetup() { LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss"); _log = LogManager.GetLogger(typeof(Dtc).Name); StartService(ActiveMqMaster); StopService(ActiveMqSlave); _factory = new NetTxConnectionFactory(ActiveMqConnectionString) { AcknowledgementMode = AcknowledgementMode.Transactional, RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false }, DispatchAsync = true, AsyncSend = false, PrefetchPolicy = new PrefetchPolicy { All = 5 }, }; _connection = _factory.CreateConnection(); _log.Debug("Starting connection"); _connection.Start(); _log.Debug("Connection established"); DeleteQueue(InQueue); DeleteQueue(OutQueue); //Tracer.Trace = new CommonLoggingTraceAdapter(); } [TearDown] public void TestTearDown() { _connection.Dispose(); } protected ServiceController ActiveMqMaster = new ServiceController(ActiveMasterServiceName, ActiveMqMachineName); protected ServiceController ActiveMqSlave = new ServiceController(ActiveMqSlaveServiceName, ActiveMqMachineName); private static readonly string ActiveMqMachineName = ConfigurationManager.AppSettings["ActiveMqServiceMachineName"]; private static readonly string ActiveMqConnectionString = ConfigurationManager.ConnectionStrings["ActiveMqServer"].ConnectionString; private static readonly string ActiveMasterServiceName = ConfigurationManager.AppSettings["ActiveMqMasterName"]; private static readonly string ActiveMqSlaveServiceName = ConfigurationManager.AppSettings["ActiveMqSlaveName"]; private IConnection _connection; private const string InQueue = "integration-test-q-in"; private const string OutQueue = "integration-test-q-out"; private ILog _log; private NetTxConnectionFactory _factory; } } {code} was: Rollback when using DTC will result in a duplicate message being received. {code:title=FailingTest|borderStyle=solid} using System; using System.Configuration; using System.ServiceProcess; using System.Transactions; using Apache.NMS; using Apache.NMS.ActiveMQ; using Apache.NMS.Policies; using Apache.NMS.Util; using Common.Logging; using Common.Logging.Simple; using NUnit.Framework; namespace IntegrationTests.ApacheNms.Jira { [TestFixture] public class Dtc { [Test, Explicit("Bug in 1.6.2")] public void First_message_should_be_redilivered_and_republished_on_rollback_and_second_message_processed_as_normal() { SendMessageToQueue("1"); SendMessageToQueue("2"); var session = _connection.CreateSession(); var sessionTwo = _connection.CreateSession(); var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQueue)); var producer = sessionTwo.CreateProducer(SessionUtil.GetDestination(session, OutQueue)); _log.Debug("Process message one and rollback"); var transaction = new TransactionScope(TransactionScopeOption.RequiresNew); var message = consumer.Receive(TimeSpan.FromSeconds(30)); producer.Send(message); transaction.Dispose(); _log.Debug("Processing message two and commit"); transaction = new TransactionScope(TransactionScopeOption.RequiresNew); message = consumer.Receive(TimeSpan.FromSeconds(30)); producer.Send(message); transaction.Complete(); transaction.Dispose(); _log.Debug("Processing message one replay and commit"); transaction = new TransactionScope(TransactionScopeOption.RequiresNew); message = consumer.Receive(TimeSpan.FromSeconds(30)); producer.Send(message); transaction.Complete(); transaction.Dispose(); _log.Debug("Process any repeats, there should be none"); transaction = new TransactionScope(TransactionScopeOption.RequiresNew); message = consumer.Receive(TimeSpan.FromSeconds(30)); if (message != null) producer.Send(message); transaction.Complete(); transaction.Dispose(); session.Dispose(); sessionTwo.Dispose(); Assert.That(message, Is.Not.Null, "message was not redilivered"); Assert.That(CountMessagesInQueue(InQueue), Is.EqualTo(0)); Assert.That(CountMessagesInQueue(OutQueue), Is.EqualTo(2)); } public static void TransactionCallback(object s, TransactionEventArgs e) { LogManager.GetCurrentClassLogger().DebugFormat("Tranasaction {0}", e.Transaction.TransactionInformation.Status); } private int CountMessagesInQueue(string queue) { var count = 0; using (var session = _connection.CreateSession()) using (var consumerIn = session.CreateConsumer(SessionUtil.GetDestination(session, queue))) using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew)) { while (true) { var msg = consumerIn.Receive(TimeSpan.FromSeconds(2)); if (msg == null) break; count++; } } return count; } private void StartService(ServiceController service) { if (service.Status != ServiceControllerStatus.Running) service.Start(); service.WaitForStatus(ServiceControllerStatus.Running); _log.Debug("Started Broker"); } private void StopService(ServiceController service) { if (service.Status != ServiceControllerStatus.Stopped) service.Stop(); service.WaitForStatus(ServiceControllerStatus.Stopped); _log.Debug("Stopped Broker Broker"); } private void SendMessageToQueue(string message) { using (var session = _connection.CreateSession()) using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, InQueue))) using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew)) { producer.Send(producer.CreateTextMessage(message)); scope.Complete(); } _log.Debug("Primed Input Queue"); } private void DeleteQueue(string queue) { using (var session = _connection.CreateSession()) { SessionUtil.DeleteDestination(session, queue); } } [SetUp] public void TestSetup() { LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss"); _log = LogManager.GetLogger(typeof(Dtc).Name); StartService(ActiveMqMaster); StopService(ActiveMqSlave); _factory = new NetTxConnectionFactory(ActiveMqConnectionString) { AcknowledgementMode = AcknowledgementMode.Transactional, RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false }, DispatchAsync = true, AsyncSend = false, PrefetchPolicy = new PrefetchPolicy { All = 5 }, }; _connection = _factory.CreateConnection(); _log.Debug("Starting connection"); _connection.Start(); _log.Debug("Connection established"); DeleteQueue(InQueue); DeleteQueue(OutQueue); //Tracer.Trace = new CommonLoggingTraceAdapter(); } [TearDown] public void TestTearDown() { _connection.Dispose(); } protected ServiceController ActiveMqMaster = new ServiceController(ActiveMasterServiceName, ActiveMqMachineName); protected ServiceController ActiveMqSlave = new ServiceController(ActiveMqSlaveServiceName, ActiveMqMachineName); private static readonly string ActiveMqMachineName = ConfigurationManager.AppSettings["ActiveMqServiceMachineName"]; private static readonly string ActiveMqConnectionString = ConfigurationManager.ConnectionStrings["ActiveMqServer"].ConnectionString; private static readonly string ActiveMasterServiceName = ConfigurationManager.AppSettings["ActiveMqMasterName"]; private static readonly string ActiveMqSlaveServiceName = ConfigurationManager.AppSettings["ActiveMqSlaveName"]; private IConnection _connection; private const string InQueue = "integration-test-q-in"; private const string OutQueue = "integration-test-q-out"; private ILog _log; private NetTxConnectionFactory _factory; } } {code} > Synchronous DTC Consumer will experience duplicates on transaction rollback > --------------------------------------------------------------------------- > > Key: AMQNET-472 > URL: https://issues.apache.org/jira/browse/AMQNET-472 > Project: ActiveMQ .Net > Issue Type: Bug > Components: ActiveMQ > Affects Versions: 1.6.2 > Reporter: Imran > Assignee: Jim Gomes > > Rollback when using DTC will result in a duplicate message being received. > {code:title=FailingTest|borderStyle=solid} > using System; > using System.Configuration; > using System.ServiceProcess; > using System.Transactions; > using Apache.NMS; > using Apache.NMS.ActiveMQ; > using Apache.NMS.Policies; > using Apache.NMS.Util; > using Common.Logging; > using Common.Logging.Simple; > using NUnit.Framework; > namespace IntegrationTests.ApacheNms.Jira > { > [TestFixture] > public class Dtc > { > [Test, Explicit("Bug in 1.6.2")] > public void First_message_should_be_redilivered_and_republished_on_rollback_and_second_message_processed_as_normal() > { > SendMessageToQueue("1"); > SendMessageToQueue("2"); > var session = _connection.CreateSession(); > var sessionTwo = _connection.CreateSession(); > var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQueue)); > var producer = sessionTwo.CreateProducer(SessionUtil.GetDestination(session, OutQueue)); > _log.Debug("Process message one and rollback"); > var transaction = new TransactionScope(TransactionScopeOption.RequiresNew); > var message = consumer.Receive(TimeSpan.FromSeconds(30)); > producer.Send(message); > transaction.Dispose(); > _log.Debug("Processing message two and commit"); > transaction = new TransactionScope(TransactionScopeOption.RequiresNew); > message = consumer.Receive(TimeSpan.FromSeconds(30)); > producer.Send(message); > transaction.Complete(); > transaction.Dispose(); > _log.Debug("Processing message one replay and commit"); > transaction = new TransactionScope(TransactionScopeOption.RequiresNew); > message = consumer.Receive(TimeSpan.FromSeconds(30)); > producer.Send(message); > transaction.Complete(); > transaction.Dispose(); > _log.Debug("Process any repeats, there should be none"); > transaction = new TransactionScope(TransactionScopeOption.RequiresNew); > message = consumer.Receive(TimeSpan.FromSeconds(30)); > if (message != null) > producer.Send(message); > transaction.Complete(); > transaction.Dispose(); > session.Dispose(); > sessionTwo.Dispose(); > Assert.That(CountMessagesInQueue(InQueue), Is.EqualTo(0)); > Assert.That(CountMessagesInQueue(OutQueue), Is.EqualTo(2)); > } > public static void TransactionCallback(object s, TransactionEventArgs e) > { > LogManager.GetCurrentClassLogger().DebugFormat("Tranasaction {0}", e.Transaction.TransactionInformation.Status); > } > private int CountMessagesInQueue(string queue) > { > var count = 0; > using (var session = _connection.CreateSession()) > using (var consumerIn = session.CreateConsumer(SessionUtil.GetDestination(session, queue))) > using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew)) > { > while (true) > { > var msg = consumerIn.Receive(TimeSpan.FromSeconds(2)); > if (msg == null) > break; > count++; > } > } > return count; > } > private void StartService(ServiceController service) > { > if (service.Status != ServiceControllerStatus.Running) > service.Start(); > service.WaitForStatus(ServiceControllerStatus.Running); > _log.Debug("Started Broker"); > } > private void StopService(ServiceController service) > { > if (service.Status != ServiceControllerStatus.Stopped) > service.Stop(); > service.WaitForStatus(ServiceControllerStatus.Stopped); > _log.Debug("Stopped Broker Broker"); > } > private void SendMessageToQueue(string message) > { > using (var session = _connection.CreateSession()) > using (var producer = session.CreateProducer(SessionUtil.GetDestination(session, InQueue))) > using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew)) > { > producer.Send(producer.CreateTextMessage(message)); > scope.Complete(); > } > _log.Debug("Primed Input Queue"); > } > private void DeleteQueue(string queue) > { > using (var session = _connection.CreateSession()) > { > SessionUtil.DeleteDestination(session, queue); > } > } > [SetUp] > public void TestSetup() > { > LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss"); > _log = LogManager.GetLogger(typeof(Dtc).Name); > StartService(ActiveMqMaster); > StopService(ActiveMqSlave); > _factory = new NetTxConnectionFactory(ActiveMqConnectionString) > { > AcknowledgementMode = AcknowledgementMode.Transactional, > RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false }, > DispatchAsync = true, > AsyncSend = false, > PrefetchPolicy = new PrefetchPolicy { All = 5 }, > }; > _connection = _factory.CreateConnection(); > _log.Debug("Starting connection"); > _connection.Start(); > _log.Debug("Connection established"); > DeleteQueue(InQueue); > DeleteQueue(OutQueue); > //Tracer.Trace = new CommonLoggingTraceAdapter(); > } > [TearDown] > public void TestTearDown() > { > _connection.Dispose(); > } > protected ServiceController ActiveMqMaster = new ServiceController(ActiveMasterServiceName, ActiveMqMachineName); > protected ServiceController ActiveMqSlave = new ServiceController(ActiveMqSlaveServiceName, ActiveMqMachineName); > private static readonly string ActiveMqMachineName = ConfigurationManager.AppSettings["ActiveMqServiceMachineName"]; > private static readonly string ActiveMqConnectionString = ConfigurationManager.ConnectionStrings["ActiveMqServer"].ConnectionString; > private static readonly string ActiveMasterServiceName = ConfigurationManager.AppSettings["ActiveMqMasterName"]; > private static readonly string ActiveMqSlaveServiceName = ConfigurationManager.AppSettings["ActiveMqSlaveName"]; > private IConnection _connection; > private const string InQueue = "integration-test-q-in"; > private const string OutQueue = "integration-test-q-out"; > private ILog _log; > private NetTxConnectionFactory _factory; > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252)