activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Imran (JIRA)" <jira+amq...@apache.org>
Subject [jira] [Commented] (AMQNET-472) Synchronous DTC Consumer will experience duplicates on transaction rollback
Date Thu, 13 Mar 2014 15:02:44 GMT

    [ https://issues.apache.org/jira/browse/AMQNET-472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13933361#comment-13933361
] 

Imran commented on AMQNET-472:
------------------------------

I think I have found the issue here. Suggested patch is attached. It looks like there were
some lock conditions missing. Since all dtc operations are completed asynchronously, the main
processing thread could possibly have moved on to consuming a new message in a new transaction
even before the previous transaction has committed or rolled back in this case. The message
consumer needs to check if the next message being consumed is part of the same transaction
or a new transaction and dispatch / block respectively. Messages that are part of the same
transactions will be rolled back or committed together and trying to consume a message in
a new transaction will block until the last transaction has completed.

> Synchronous DTC Consumer will experience duplicates on transaction rollback
> ---------------------------------------------------------------------------
>
>                 Key: AMQNET-472
>                 URL: https://issues.apache.org/jira/browse/AMQNET-472
>             Project: ActiveMQ .Net
>          Issue Type: Bug
>          Components: ActiveMQ
>    Affects Versions: 1.6.2
>            Reporter: Imran
>            Assignee: Jim Gomes
>         Attachments: NetTxMessageConsumer.cs
>
>
> Rollback when using DTC will result in a duplicate message being received.
> {code:title=FailingTest|borderStyle=solid} 
> using System;
> using System.Configuration;
> using System.ServiceProcess;
> using System.Transactions;
> using Apache.NMS;
> using Apache.NMS.ActiveMQ;
> using Apache.NMS.Policies;
> using Apache.NMS.Util;
> using Common.Logging;
> using Common.Logging.Simple;
> using NUnit.Framework;
> namespace IntegrationTests.ApacheNms.Jira
> {
>     [TestFixture]
>     public class Dtc
>     {
>         [Test, Explicit("Bug in 1.6.2")]
>         public void First_message_should_be_redilivered_and_republished_on_rollback_and_second_message_processed_as_normal()
>         {
>             SendMessageToQueue("1");
>             SendMessageToQueue("2");
>             var session = _connection.CreateSession();
>             var sessionTwo = _connection.CreateSession();
>             var consumer = session.CreateConsumer(SessionUtil.GetDestination(session,
InQueue));
>             var producer = sessionTwo.CreateProducer(SessionUtil.GetDestination(session,
OutQueue));
>             _log.Debug("Process message one and rollback");
>             var transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
>             var message = consumer.Receive(TimeSpan.FromSeconds(30));
>             producer.Send(message);
>             transaction.Dispose();
>             _log.Debug("Processing message two and commit");
>             transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
>             message = consumer.Receive(TimeSpan.FromSeconds(30));
>             producer.Send(message);
>             transaction.Complete();
>             transaction.Dispose();
>             _log.Debug("Processing message one replay and commit");
>             transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
>             message = consumer.Receive(TimeSpan.FromSeconds(30));
>             producer.Send(message);
>             transaction.Complete();
>             transaction.Dispose();
>             _log.Debug("Process any repeats, there should be none");
>             transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
>             message = consumer.Receive(TimeSpan.FromSeconds(30));
>             if (message != null)
>                 producer.Send(message);
>             transaction.Complete();
>             transaction.Dispose();
>             session.Dispose();
>             sessionTwo.Dispose();
>             Assert.That(CountMessagesInQueue(InQueue), Is.EqualTo(0));
>             Assert.That(CountMessagesInQueue(OutQueue), Is.EqualTo(2));
>         }
>         public static void TransactionCallback(object s, TransactionEventArgs e)
>         {
>             LogManager.GetCurrentClassLogger().DebugFormat("Tranasaction  {0}", e.Transaction.TransactionInformation.Status);
>         }
>         private int CountMessagesInQueue(string queue)
>         {
>             var count = 0;
>             using (var session = _connection.CreateSession())
>             using (var consumerIn = session.CreateConsumer(SessionUtil.GetDestination(session,
queue)))
>             using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew))
>             {
>                 while (true)
>                 {
>                     var msg = consumerIn.Receive(TimeSpan.FromSeconds(2));
>                     if (msg == null)
>                         break;
>                     count++;
>                 }
>             }
>             return count;
>         }
>         private void StartService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Running)
>                 service.Start();
>             service.WaitForStatus(ServiceControllerStatus.Running);
>             _log.Debug("Started Broker");
>         }
>         private void StopService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Stopped)
>                 service.Stop();
>             service.WaitForStatus(ServiceControllerStatus.Stopped);
>             _log.Debug("Stopped Broker Broker");
>         }
>         private void SendMessageToQueue(string message)
>         {
>             using (var session = _connection.CreateSession())
>             using (var producer = session.CreateProducer(SessionUtil.GetDestination(session,
InQueue)))
>             using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew))
>             {
>                 producer.Send(producer.CreateTextMessage(message));
>                 scope.Complete();
>             }
>             _log.Debug("Primed Input Queue");
>         }
>         private void DeleteQueue(string queue)
>         {
>             using (var session = _connection.CreateSession())
>             {
>                 SessionUtil.DeleteDestination(session, queue);
>             }
>         }
>         [SetUp]
>         public void TestSetup()
>         {
>             LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true,
true, true, "HH:MM:ss");
>             _log = LogManager.GetLogger(typeof(Dtc).Name);
>             StartService(ActiveMqMaster);
>             StopService(ActiveMqSlave);
>             _factory = new NetTxConnectionFactory(ActiveMqConnectionString)
>             {
>                 AcknowledgementMode = AcknowledgementMode.Transactional,
>                 RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 0,
MaximumRedeliveries = 3, BackOffMultiplier = 0, UseExponentialBackOff = false },
>                 DispatchAsync = true,
>                 AsyncSend = false,
>                 PrefetchPolicy = new PrefetchPolicy { All = 5 },
>             };
>             _connection = _factory.CreateConnection();
>             _log.Debug("Starting connection");
>             _connection.Start();
>             _log.Debug("Connection established");
>             DeleteQueue(InQueue);
>             DeleteQueue(OutQueue);
>             //Tracer.Trace = new CommonLoggingTraceAdapter();
>         }
>         [TearDown]
>         public void TestTearDown()
>         {
>             _connection.Dispose();
>         }
>         protected ServiceController ActiveMqMaster = new ServiceController(ActiveMasterServiceName,
ActiveMqMachineName);
>         protected ServiceController ActiveMqSlave = new ServiceController(ActiveMqSlaveServiceName,
ActiveMqMachineName);
>         private static readonly string ActiveMqMachineName = ConfigurationManager.AppSettings["ActiveMqServiceMachineName"];
>         private static readonly string ActiveMqConnectionString = ConfigurationManager.ConnectionStrings["ActiveMqServer"].ConnectionString;
>         private static readonly string ActiveMasterServiceName = ConfigurationManager.AppSettings["ActiveMqMasterName"];
>         private static readonly string ActiveMqSlaveServiceName = ConfigurationManager.AppSettings["ActiveMqSlaveName"];
>         private IConnection _connection;
>         private const string InQueue = "integration-test-q-in";
>         private const string OutQueue = "integration-test-q-out";
>         private ILog _log;
>         private NetTxConnectionFactory _factory;
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message