activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Imran (JIRA)" <jira+amq...@apache.org>
Subject [jira] [Created] (AMQNET-474) DTC Consumer is forcibly closed if a transaction is in progress and connection to the broker is interrupted
Date Wed, 19 Mar 2014 12:51:42 GMT
Imran created AMQNET-474:
----------------------------

             Summary: DTC Consumer is forcibly closed if a transaction is in progress and
connection to the broker is interrupted
                 Key: AMQNET-474
                 URL: https://issues.apache.org/jira/browse/AMQNET-474
             Project: ActiveMQ .Net
          Issue Type: Bug
          Components: ActiveMQ
    Affects Versions: 1.6.2
            Reporter: Imran
            Assignee: Jim Gomes


DTC Consumer is forcibly closed if a transaction is in progress and the connection to the
broker is interrupted. This behavior is different to non DTC consumers. This happens with
a fail over connection specified which is not the correct behavior as you would expect the
fail over feature to reestablish the connection on behalf of the client.

{code}

using System;
using System.ServiceProcess;
using System.Transactions;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.Policies;
using Apache.NMS.Util;
using Common.Logging;
using Common.Logging.Simple;
using NUnit.Framework;

namespace IntegrationTests.ApacheNms.Tests.Jira.DistributedTransaction
{
    [TestFixture]
    public class BrokerRestartAndFailover
    {
        [Test, Explicit("After a broker restart the consumer is forcibly closed. This is not
desirable as this behaviour is different to non dtc consumers.")]
        public void Should_rediliver_message_after_broker_restart()
        {
            SendMessageToQueue("1");

            var session = _connection.CreateSession(AcknowledgementMode.Transactional);
            var consumer = session.CreateConsumer(SessionUtil.GetDestination(session, InQueue));

            var transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
            consumer.Receive(TimeSpan.FromSeconds(1));

            StopService(ActiveMqMaster);
            StartService(ActiveMqMaster);
            transaction.Complete();
            transaction.Dispose();

            //try a few times to drain the queue
            var messageRedilivered = 0;
            for (var i = 0; i < 2; i++)
            {
                transaction = new TransactionScope(TransactionScopeOption.RequiresNew);
                try
                {
                    var message = consumer.Receive(TimeSpan.FromSeconds(1));
                    transaction.Complete();
                    if (message != null)
                        messageRedilivered++;
                }
                catch (Exception ex)
                {
                    LogManager.GetCurrentClassLogger().Error(ex);
                }
                finally
                {
                    transaction.Dispose();
                }
            }

            Assert.That(CountMessagesInQueue(InQueue), Is.EqualTo(0));
            Assert.That(messageRedilivered, Is.EqualTo(1));
        }

        public int CountMessagesInQueue(string queue)
        {
            var factory = new ConnectionFactory(ConnectionString)
            {
                AcknowledgementMode = AcknowledgementMode.Transactional
            };
            
            var count = 0;
            using (var connection = factory.CreateConnection())
            using (var session = connection.CreateSession())
            using (var consumer = session.CreateConsumer(SessionUtil.GetQueue(session, queue)))
            {
                connection.Start();
                while (true)
                {
                    var message = consumer.Receive(TimeSpan.FromSeconds(1));
                    if (message == null)
                        break;
                    count++;
                }
            }

            return count;
        }

        private void DeleteQueue(string queue)
        {
            using (var session = _connection.CreateSession())
            {
                SessionUtil.DeleteDestination(session, queue);
            }
        }

        private void SendMessageToQueue(string message)
        {
            using (var session = _connection.CreateSession())
            using (var producer = session.CreateProducer(SessionUtil.GetDestination(session,
InQueue)))
            using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew))
            {
                producer.Send(producer.CreateTextMessage(message));
                scope.Complete();
            }
            Log.Debug("Primed Input Queue");
        }

        private void StartService(ServiceController service)
        {
            if(service.Status != ServiceControllerStatus.Running)
                service.Start();
            service.WaitForStatus(ServiceControllerStatus.Running);
        }

        private void StopService(ServiceController service)
        {
            if (service.Status != ServiceControllerStatus.Stopped)
                service.Stop();
            service.WaitForStatus(ServiceControllerStatus.Stopped);
        }

        [SetUp]
        public void TestSetup()
        {
            LogManager.Adapter = new ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true,
true, true, "HH:MM:ss");
            StartService(ActiveMqMaster);
            StopService(ActiveMqSlave);

            _connectionFactory = new NetTxConnectionFactory(ConnectionString)
            {
                AcknowledgementMode = AcknowledgementMode.Transactional,
                RedeliveryPolicy = new RedeliveryPolicy { InitialRedeliveryDelay = 10, MaximumRedeliveries
= 3, BackOffMultiplier = 0, UseExponentialBackOff = false },
                DispatchAsync = true,
                AsyncSend = false,
                PrefetchPolicy = new PrefetchPolicy { All = 10 },
            };

            _connection = _connectionFactory.CreateConnection();
            _connection.ConnectionInterruptedListener += () => LogManager.GetCurrentClassLogger().Debug("Connection
interrupted");
            _connection.ConnectionResumedListener += () => LogManager.GetCurrentClassLogger().Debug("Connection
resumed");
            _connection.ExceptionListener += ex => LogManager.GetCurrentClassLogger().ErrorFormat("Connection
exception: '{0}'", ex.ToString());
            _connection.Start();

            DeleteQueue(InQueue);
            DeleteQueue(OutQueue);
        }

        [TearDown]
        public void TestTeardown()
        {
            StartService(ActiveMqMaster);
            StopService(ActiveMqSlave);
        }

        private const string ConnectionString = @"failover:(tcp://localhost:61616)";
        protected ServiceController ActiveMqMaster = new ServiceController(@"ActiveMQ");
        protected ServiceController ActiveMqSlave = new ServiceController(@"ActiveMQSlave");
        private IConnection _connection;
        private const string InQueue = "in-q";
        private const string OutQueue = "out-q";
        private static readonly ILog Log = LogManager.GetLogger(typeof(BrokerRestartAndFailover).Name);
        private NetTxConnectionFactory _connectionFactory;
    }
}

{code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message