activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1214576 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/State/ConnectionStateTracker.cs test/csharp/Transport/failover/FailoverTransactionTest.cs
Date Thu, 15 Dec 2011 00:22:38 GMT
Author: tabish
Date: Thu Dec 15 00:22:37 2011
New Revision: 1214576

URL: http://svn.apache.org/viewvc?rev=1214576&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQNET-362

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs?rev=1214576&r1=1214575&r2=1214576&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
Thu Dec 15 00:22:37 2011
@@ -42,8 +42,8 @@ namespace Apache.NMS.ActiveMQ.State
         private bool _trackMessages = true;
         private int _maxCacheSize = 256;
         private int currentCacheSize;
-        private readonly Dictionary<MessageId, Message> messageCache = new Dictionary<MessageId,
Message>();
-        private readonly Queue<MessageId> messageCacheFIFO = new Queue<MessageId>();
+        private readonly Dictionary<Object, Command> messageCache = new Dictionary<Object,
Command>();
+        private readonly Queue<Object> messageCacheFIFO = new Queue<Object>();
 
         protected void RemoveEldestInCache()
         {
@@ -102,10 +102,17 @@ namespace Apache.NMS.ActiveMQ.State
 
         public void TrackBack(Command command)
         {
-            if(TrackMessages && command != null && command.IsMessage)
+            if (command != null)
             {
-                Message message = (Message) command;
-                if(message.TransactionId == null)
+                if (TrackMessages && command.IsMessage)
+                {
+                    Message message = (Message) command;
+                    if(message.TransactionId == null)
+                    {
+                        currentCacheSize = currentCacheSize + 1;
+                    }
+                }
+                else if (command.IsMessagePull)
                 {
                     currentCacheSize = currentCacheSize + 1;
                 }
@@ -119,6 +126,10 @@ namespace Apache.NMS.ActiveMQ.State
             {
                 ConnectionInfo info = connectionState.Info;
                 info.FailoverReconnect = true;
+                if (Tracer.IsDebugEnabled)
+                {
+                    Tracer.Debug("conn: " + connectionState.Info.ConnectionId);
+                }
                 transport.Oneway(info);
 
                 DoRestoreTempDestinations(transport, connectionState);
@@ -133,19 +144,46 @@ namespace Apache.NMS.ActiveMQ.State
                     DoRestoreTransactions(transport, connectionState);
                 }
             }
-            //now flush messages
-            foreach(Message msg in messageCache.Values)
+
+            // Now flush messages
+            foreach(Command command in messageCache.Values)
             {
-                transport.Oneway(msg);
+                if (Tracer.IsDebugEnabled)
+                {
+                    Tracer.Debug("Replaying command: " + command);
+                }
+
+                transport.Oneway(command);
             }
         }
 
         private void DoRestoreTransactions(ITransport transport, ConnectionState connectionState)
         {
             AtomicCollection<TransactionState> transactionStates = connectionState.TransactionStates;
+            List<TransactionInfo> toRollback = new List<TransactionInfo>();
 
             foreach(TransactionState transactionState in transactionStates)
             {
+                // rollback any completed transactions - no way to know if commit got there
+                // or if reply went missing
+                if (transactionState.Commands.Count != 0)
+                {
+                    Command lastCommand = transactionState.Commands[transactionState.Commands.Count
- 1];
+                    if (lastCommand.IsTransactionInfo)
+                    {
+                        TransactionInfo transactionInfo = lastCommand as TransactionInfo;
+                        if (transactionInfo.Type == TransactionInfo.COMMIT_ONE_PHASE)
+                        {
+                            if (Tracer.IsDebugEnabled)
+                            {
+                                Tracer.Debug("rolling back potentially completed tx: " +
transactionState.getId());
+                            }
+                            toRollback.Add(transactionInfo);
+                            continue;
+                        }
+                    }
+                }
+
                 // replay the add and remove of short lived producers that may have been
                 // involved in the transaction
                 foreach (ProducerState producerState in transactionState.ProducerStates)
@@ -178,6 +216,18 @@ namespace Apache.NMS.ActiveMQ.State
                     transport.Oneway(producerRemove);
                 }
             }
+
+            foreach (TransactionInfo command in toRollback)
+            {
+                // respond to the outstanding commit
+                ExceptionResponse response = new ExceptionResponse();
+                response.Exception = new BrokerError();
+                response.Exception.Message =
+                    "Transaction completion in doubt due to failover. Forcing rollback of
" + command.TransactionId;
+                response.Exception.ExceptionClass = (new TransactionRolledBackException()).GetType().FullName;
+                response.CorrelationId = command.CommandId;
+                transport.Command(transport, response);
+            }
         }
 
         /// <summary>
@@ -189,6 +239,10 @@ namespace Apache.NMS.ActiveMQ.State
             // Restore the connection's sessions
             foreach(SessionState sessionState in connectionState.SessionStates)
             {
+                if (Tracer.IsDebugEnabled)
+                {
+                    Tracer.Debug("Restoring session: " + sessionState.Info.SessionId);
+                }
                 transport.Oneway(sessionState.Info);
 
                 if(RestoreProducers)
@@ -262,6 +316,10 @@ namespace Apache.NMS.ActiveMQ.State
             // Restore the session's producers
             foreach(ProducerState producerState in sessionState.ProducerStates)
             {
+                if (Tracer.IsDebugEnabled)
+                {
+                    Tracer.Debug("Restoring producer: " + producerState.Info.ProducerId);
+                }
                 transport.Oneway(producerState.Info);
             }
         }
@@ -662,6 +720,17 @@ namespace Apache.NMS.ActiveMQ.State
             return null;
         }
 
+        public override Response processMessagePull(MessagePull pull)
+        {
+            if (pull != null)
+            {
+                // leave a single instance in the cache
+                String id = pull.Destination + "::" + pull.ConsumerId;
+                messageCache.Add(id, pull);
+            }
+            return null;
+        }
+
         public bool RestoreConsumers
         {
             get { return _restoreConsumers; }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs?rev=1214576&r1=1214575&r2=1214576&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs
Thu Dec 15 00:22:37 2011
@@ -39,8 +39,19 @@ namespace Apache.NMS.ActiveMQ.Test
         private readonly int MSG_COUNT = 2;
         private readonly String destinationName = "FailoverTransactionTestQ";
 
+        [SetUp]
+        public override void SetUp()
+        {
+            base.SetUp();
+
+            this.connection = null;
+            this.interrupted = false;
+            this.resumed = false;
+            this.commitFailed = false;
+        }
+
         [Test]
-        public void FailoverBeforeCommitSentTest()
+        public void FailoverAfterCommitSentTest()
         {
             string uri = "failover:(tcpfaulty://${activemqhost}:61616?transport.useLogging=true)";
             IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri));
@@ -56,7 +67,7 @@ namespace Apache.NMS.ActiveMQ.Test
                 ITransport transport = (connection as Connection).ITransport;
                 TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport))
as TcpFaultyTransport;
                 Assert.IsNotNull(tcpFaulty);
-                tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook;
+                tcpFaulty.OnewayCommandPostProcessor += this.FailOnCommitTransportHook;
 
                 using(ISession session = connection.CreateSession())
                 {
@@ -69,7 +80,20 @@ namespace Apache.NMS.ActiveMQ.Test
                 using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional))
                 {
                     IDestination destination = session.GetQueue(destinationName);
-                    PutMsgIntoQueue(session, destination);
+                    PutMsgIntoQueue(session, destination, false);
+
+                    try
+                    {
+                        session.Commit();
+                        Assert.Fail("Should have thrown a TransactionRolledBackException");
+                    }
+                    catch(TransactionRolledBackException)
+                    {
+                    }
+                    catch
+                    {
+                        Assert.Fail("Should have thrown a TransactionRolledBackException");
+                    }
                 }
 
                 Assert.IsTrue(this.interrupted);
@@ -95,7 +119,7 @@ namespace Apache.NMS.ActiveMQ.Test
         }
 
         [Test]
-        public void FailoverWithShortLivedProducerTest()
+        public void FailoverBeforeCommitSentTest()
         {
             string uri = "failover:(tcpfaulty://${activemqhost}:61616?transport.useLogging=true)";
             IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri));
@@ -125,6 +149,72 @@ namespace Apache.NMS.ActiveMQ.Test
                 {
                     IDestination destination = session.GetQueue(destinationName);
                     PutMsgIntoQueue(session, destination, false);
+
+                    try
+                    {
+                        session.Commit();
+                        Assert.Fail("Should have thrown a TransactionRolledBackException");
+                    }
+                    catch(TransactionRolledBackException)
+                    {
+                    }
+                    catch
+                    {
+                        Assert.Fail("Should have thrown a TransactionRolledBackException");
+                    }
+                }
+
+                Assert.IsTrue(this.interrupted);
+                Assert.IsTrue(this.resumed);
+
+                Tracer.Debug("Test is attempting to read a message from" +
+                             destinationName + " but no messages are expected");
+
+                using(ISession session = connection.CreateSession())
+                {
+                    IDestination destination = session.GetQueue(destinationName);
+                    IMessageConsumer consumer = session.CreateConsumer(destination);
+                    IMessage msg = consumer.Receive(TimeSpan.FromSeconds(5));
+                    Assert.IsNull(msg, "Should not receive a message after commit failed.");
+                }
+            }
+
+            Assert.IsTrue(this.interrupted);
+            Assert.IsTrue(this.resumed);
+        }
+
+        [Test]
+        public void FailoverWithShortLivedProducerTest()
+        {
+            string uri = "failover:(tcpfaulty://${activemqhost}:61616?transport.useLogging=true)";
+            IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri));
+            using(connection = factory.CreateConnection() as Connection)
+            {
+                connection.ConnectionInterruptedListener +=
+                    new ConnectionInterruptedListener(TransportInterrupted);
+                connection.ConnectionResumedListener +=
+                    new ConnectionResumedListener(TransportResumed);
+
+                connection.Start();
+
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport))
as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+
+                using(ISession session = connection.CreateSession())
+                {
+                    IDestination destination = session.GetQueue(destinationName);
+                    PurgeQueue(connection, destination);
+                }
+
+                Tracer.Debug("Test is putting " + MSG_COUNT + " messages on the queue: "
+ destinationName);
+
+                using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional))
+                {
+                    IDestination destination = session.GetQueue(destinationName);
+                    PutMsgIntoQueue(session, destination, false);
+                    tcpFaulty.Close();
+                    PutMsgIntoQueue(session, destination, false);
                     session.Commit();
                 }
 



Mime
View raw message