activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1621142 [4/4] - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/ main/csharp/Commands/ main/csharp/Util/ test/csharp/Transport/failover/
Date Thu, 28 Aug 2014 14:30:50 GMT
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=1621142&r1=1621141&r2=1621142&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Thu Aug
28 14:30:49 2014
@@ -17,6 +17,7 @@
 
 using System;
 using System.Collections;
+using System.Collections.Generic;
 using System.Collections.Specialized;
 using System.Threading;
 using Apache.NMS.Util;
@@ -57,6 +58,7 @@ namespace Apache.NMS.ActiveMQ
         protected bool disposed = false;
         protected bool closed = false;
         protected bool closing = false;
+        protected Atomic<bool> clearInProgress = new Atomic<bool>();
         private TimeSpan disposeStopTimeout = TimeSpan.FromMilliseconds(30000);
         private TimeSpan closeStopTimeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
         private TimeSpan requestTimeout;
@@ -73,8 +75,8 @@ namespace Apache.NMS.ActiveMQ
             this.requestTimeout = connection.RequestTimeout;
             this.dispatchAsync = connection.DispatchAsync;
             this.transactionContext = CreateTransactionContext();
-			this.exclusive = connection.ExclusiveConsumer;
-			this.retroactive = connection.UseRetroactiveConsumer;
+            this.exclusive = connection.ExclusiveConsumer;
+            this.retroactive = connection.UseRetroactiveConsumer;
 
             Uri brokerUri = connection.BrokerUri;
 
@@ -282,10 +284,26 @@ namespace Apache.NMS.ActiveMQ
             set { this.producerTransformer = value; }
         }
 
-		internal Scheduler Scheduler
-		{
-			get { return this.connection.Scheduler; }
-		}
+        internal Scheduler Scheduler
+        {
+            get { return this.connection.Scheduler; }
+        }
+
+        internal List<MessageConsumer> Consumers
+        {
+            get
+            {
+                List<MessageConsumer> copy = new List<MessageConsumer>();
+                lock(consumers.SyncRoot)
+                {
+                    foreach(MessageConsumer consumer in consumers.Values)
+                    {
+                        copy.Add(consumer);
+                    }
+                }
+                return copy;
+            }
+        }
 
         #endregion
 
@@ -338,13 +356,13 @@ namespace Apache.NMS.ActiveMQ
 
         internal void DoClose()
         {
-			Shutdown();
+            Shutdown();
             RemoveInfo removeInfo = new RemoveInfo();
             removeInfo.ObjectId = this.info.SessionId;
             removeInfo.LastDeliveredSequenceId = this.lastDeliveredSequenceId;
             this.connection.Oneway(removeInfo);
-		}
-		
+        }
+
         internal void Shutdown()
         {
             Tracer.InfoFormat("Executing Shutdown on Session with Id {0}", this.info.SessionId);
@@ -524,18 +542,18 @@ namespace Apache.NMS.ActiveMQ
                 throw new InvalidDestinationException("Cannot create a Consumer with a Null
destination");
             }
 
-			if (IsIndividualAcknowledge)
-			{
-				throw new NMSException("Cannot create a durable consumer for a session that is using
" +
-				                       "Individual Acknowledgement mode.");
-			}
+            if (IsIndividualAcknowledge)
+            {
+                throw new NMSException("Cannot create a durable consumer for a session that
is using " +
+                                       "Individual Acknowledgement mode.");
+            }
 
             ActiveMQDestination dest = ActiveMQDestination.Transform(destination);
             MessageConsumer consumer = null;
 
             try
             {
-                consumer = DoCreateMessageConsumer(GetNextConsumerId(), dest, name, selector,

+                consumer = DoCreateMessageConsumer(GetNextConsumerId(), dest, name, selector,
                                                    this.connection.PrefetchPolicy.DurableTopicPrefetch,
                                                    this.connection.PrefetchPolicy.MaximumPendingMessageLimit,
                                                    noLocal);
@@ -564,7 +582,7 @@ namespace Apache.NMS.ActiveMQ
         }
 
         internal virtual MessageConsumer DoCreateMessageConsumer(
-            ConsumerId id, ActiveMQDestination destination, string name, string selector,

+            ConsumerId id, ActiveMQDestination destination, string name, string selector,
             int prefetch, int maxPending, bool noLocal)
         {
             return new MessageConsumer(this, id, destination, name, selector, prefetch,
@@ -825,7 +843,7 @@ namespace Apache.NMS.ActiveMQ
             {
                 consumers.Remove(consumer.ConsumerId);
             }
-			connection.RemoveDispatcher(consumer);
+            connection.RemoveDispatcher(consumer);
         }
 
         public void AddProducer(MessageProducer producer)
@@ -878,13 +896,13 @@ namespace Apache.NMS.ActiveMQ
 
         public void Start()
         {
-			lock(this.consumers.SyncRoot)
-			{
-				foreach(MessageConsumer consumer in this.consumers.Values)
-				{
-					consumer.Start();
-				}
-			}
+            lock(this.consumers.SyncRoot)
+            {
+                foreach(MessageConsumer consumer in this.consumers.Values)
+                {
+                    consumer.Start();
+                }
+            }
 
             if(this.executor != null)
             {
@@ -900,9 +918,13 @@ namespace Apache.NMS.ActiveMQ
             }
         }
 
-        internal void Redispatch(MessageDispatchChannel channel)
+        internal void Redispatch(IDispatcher dispatcher, MessageDispatchChannel channel)
         {
             MessageDispatch[] messages = channel.RemoveAll();
+            foreach (MessageDispatch dispatch in messages)
+            {
+                this.connection.RollbackDuplicate(dispatcher, dispatch.Message);
+            }
             System.Array.Reverse(messages);
 
             foreach(MessageDispatch message in messages)
@@ -926,20 +948,31 @@ namespace Apache.NMS.ActiveMQ
                 this.executor.ClearMessagesInProgress();
             }
 
+            if (this.consumers.Count == 0)
+            {
+                return;
+            }
+
             // Because we are called from inside the Transport Reconnection logic
             // we spawn the Consumer clear to another Thread so that we can avoid
             // any lock contention that might exist between the consumer and the
-            // connection that is reconnecting.  Use the Connection Scheduler so 
-			// that the clear calls are done one at a time to avoid further 
-			// contention on the Connection and Session resources.
-            lock(this.consumers.SyncRoot)
+            // connection that is reconnecting.  Use the Connection Scheduler so
+            // that the clear calls are done one at a time to avoid further
+            // contention on the Connection and Session resources.
+            if (clearInProgress.CompareAndSet(false, true))
             {
-                foreach(MessageConsumer consumer in this.consumers.Values)
+                lock(this.consumers.SyncRoot)
                 {
-                    consumer.InProgressClearRequired();
-					Interlocked.Increment(ref transportInterruptionProcessingComplete);
-					Scheduler.ExecuteAfterDelay(ClearMessages, consumer, 0);
+                    foreach(MessageConsumer consumer in this.consumers.Values)
+                    {
+                        consumer.InProgressClearRequired();
+                        Interlocked.Increment(ref transportInterruptionProcessingComplete);
+                        Scheduler.ExecuteAfterDelay(ClearMessages, consumer, 0);
+                    }
                 }
+
+                // Clear after all consumer have had their ClearMessagesInProgress method
called.
+                Scheduler.ExecuteAfterDelay(ResetClearInProgressFlag, clearInProgress, 0);
             }
         }
 
@@ -955,6 +988,12 @@ namespace Apache.NMS.ActiveMQ
             consumer.ClearMessagesInProgress();
         }
 
+        private static void ResetClearInProgressFlag(object value)
+        {
+            Atomic<bool> clearInProgress = value as Atomic<bool>;
+            clearInProgress.Value = false;
+        }
+
         internal void Acknowledge()
         {
             lock(this.consumers.SyncRoot)
@@ -986,10 +1025,10 @@ namespace Apache.NMS.ActiveMQ
 
         internal void SendAck(MessageAck ack, bool lazy)
         {
-			if(Tracer.IsDebugEnabled)
-			{
-				Tracer.Debug("Session sending Ack: " + ack);
-			}
+            if(Tracer.IsDebugEnabled)
+            {
+                Tracer.Debug("Session sending Ack: " + ack);
+            }
 
             if(lazy || connection.SendAcksAsync || this.IsTransacted )
             {
@@ -1001,10 +1040,10 @@ namespace Apache.NMS.ActiveMQ
             }
         }
 
-		protected virtual TransactionContext CreateTransactionContext()
-		{
-			return new TransactionContext(this);
-		}
+        protected virtual TransactionContext CreateTransactionContext()
+        {
+            return new TransactionContext(this);
+        }
 
         private void CheckClosed()
         {
@@ -1055,16 +1094,16 @@ namespace Apache.NMS.ActiveMQ
 
         internal bool IsInUse(ActiveMQTempDestination dest)
         {
-			lock(this.consumers.SyncRoot)
-			{
-				foreach(MessageConsumer consumer in this.consumers.Values)
-				{
-					if(consumer.IsInUse(dest))
-					{
-						return true;
-					}
-				}
-			}
+            lock(this.consumers.SyncRoot)
+            {
+                foreach(MessageConsumer consumer in this.consumers.Values)
+                {
+                    if(consumer.IsInUse(dest))
+                    {
+                        return true;
+                    }
+                }
+            }
 
             return false;
         }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs?rev=1621142&r1=1621141&r2=1621142&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
Thu Aug 28 14:30:49 2014
@@ -15,12 +15,13 @@
  * limitations under the License.
  */
 
+using System;
 using System.Collections;
 using Apache.NMS.ActiveMQ.Commands;
 
 namespace Apache.NMS.ActiveMQ
 {
-	public enum TransactionType
+    public enum TransactionType
     {
         Begin = 0, Prepare = 1, CommitOnePhase = 2, CommitTwoPhase = 3, Rollback = 4, Recover=5,
Forget = 6, End = 7
     }
@@ -28,14 +29,14 @@ namespace Apache.NMS.ActiveMQ
 
 namespace Apache.NMS.ActiveMQ
 {
-	public class TransactionContext
+    public class TransactionContext
     {
         protected readonly Session session;
         protected readonly Connection connection;
         protected readonly ArrayList synchronizations = ArrayList.Synchronized(new ArrayList());
 
         public TransactionContext(Session session)
-		{
+        {
             this.session = session;
             this.connection = session.Connection;
         }
@@ -50,33 +51,33 @@ namespace Apache.NMS.ActiveMQ
             get{ return this.TransactionId != null; }
         }
 
-		public TransactionId TransactionId
-		{
-			get;
-			protected set;
-		}
-        
+        public TransactionId TransactionId
+        {
+            get;
+            protected set;
+        }
+
         public void AddSynchronization(ISynchronization synchronization)
         {
             synchronizations.Add(synchronization);
         }
-        
+
         public void RemoveSynchronization(ISynchronization synchronization)
         {
             synchronizations.Remove(synchronization);
         }
-        
+
         public virtual void Begin()
         {
             if(!InTransaction)
             {
                 this.TransactionId = this.session.Connection.CreateLocalTransactionId();
-                
+
                 TransactionInfo info = new TransactionInfo();
                 info.ConnectionId = this.session.Connection.ConnectionId;
                 info.TransactionId = this.TransactionId;
                 info.Type = (int) TransactionType.Begin;
-                
+
                 this.session.Connection.Oneway(info);
 
                 SignalTransactionStarted();
@@ -87,54 +88,78 @@ namespace Apache.NMS.ActiveMQ
                 }
             }
         }
-        
+
         public virtual void Rollback()
         {
             if(InTransaction)
             {
-                this.BeforeEnd();
-    
+                try
+                {
+                    this.BeforeEnd();
+                }
+                catch (TransactionRolledBackException canOccurOnFailover)
+                {
+                    Tracer.WarnFormat("Rollback processing error {0}", canOccurOnFailover.Message);
+                }
+
                 if(Tracer.IsDebugEnabled)
                 {
                     Tracer.Debug("Rollback: "  + this.TransactionId +
                                  " syncCount: " +
                                  (synchronizations != null ? synchronizations.Count : 0));
                 }
-    
+
                 TransactionInfo info = new TransactionInfo();
                 info.ConnectionId = this.session.Connection.ConnectionId;
                 info.TransactionId = this.TransactionId;
                 info.Type = (int) TransactionType.Rollback;
-                
+
                 this.TransactionId = null;
                 this.session.Connection.SyncRequest(info);
-    
+
                 this.AfterRollback();
             }
         }
-        
+
         public virtual void Commit()
         {
             if(InTransaction)
             {
-                this.BeforeEnd();
-    
+                try
+                {
+                    this.BeforeEnd();
+                }
+                catch
+                {
+                    Rollback();
+                    throw;
+                }
+
                 if(Tracer.IsDebugEnabled)
                 {
                     Tracer.Debug("Commit: "  + this.TransactionId +
                                  " syncCount: " +
                                  (synchronizations != null ? synchronizations.Count : 0));
                 }
-    
+
                 TransactionInfo info = new TransactionInfo();
                 info.ConnectionId = this.session.Connection.ConnectionId;
                 info.TransactionId = this.TransactionId;
                 info.Type = (int) TransactionType.CommitOnePhase;
-                
-                this.TransactionId = null;
-                this.session.Connection.SyncRequest(info);
-                
-                this.AfterCommit();
+
+                try
+                {
+                    this.TransactionId = null;
+                    this.session.Connection.SyncRequest(info);
+                    this.AfterCommit();
+                }
+                catch (Exception e)
+                {
+                    Tracer.InfoFormat("Commit failed for transaction {0} - {1}",
+                                      info.TransactionId, e.Message);
+                    AfterRollback();
+                    throw;
+                }
             }
         }
 
@@ -219,7 +244,7 @@ namespace Apache.NMS.ActiveMQ
             }
         }
 
-	    #endregion
+        #endregion
 
     }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs?rev=1621142&r1=1621141&r2=1621142&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs
Thu Aug 28 14:30:49 2014
@@ -38,18 +38,18 @@ namespace Apache.NMS.ActiveMQ.Util
         {
             get{ return this.mutex; }
         }
-        
+
         public bool Closed
         {
-            get 
+            get
             {
                 lock(this.mutex)
                 {
-                    return this.closed; 
+                    return this.closed;
                 }
             }
-            
-            set 
+
+            set
             {
                 lock(this.mutex)
                 {
@@ -67,7 +67,7 @@ namespace Apache.NMS.ActiveMQ.Util
                     return this.running;
                 }
             }
-            
+
             set
             {
                 lock(this.mutex)
@@ -130,12 +130,12 @@ namespace Apache.NMS.ActiveMQ.Util
                 {
                     this.running = false;
                     this.closed = true;
-                }          
+                }
 
                 Monitor.PulseAll(this.mutex);
-            }            
+            }
         }
-        
+
         public void Enqueue(MessageDispatch dispatch)
         {
             lock(this.mutex)
@@ -163,27 +163,27 @@ namespace Apache.NMS.ActiveMQ.Util
                 {
                     Monitor.Wait(this.mutex, timeout);
                 }
-        
-                if( Closed || !Running || Empty ) 
+
+                if( Closed || !Running || Empty )
                 {
                     return null;
                 }
-        
-                return DequeueNoWait();                      
+
+                return DequeueNoWait();
             }
         }
 
         public MessageDispatch DequeueNoWait()
         {
             MessageDispatch result = null;
-            
+
             lock(this.mutex)
             {
-                if( Closed || !Running || Empty ) 
+                if( Closed || !Running || Empty )
                 {
                     return null;
                 }
-                
+
                 result = channel.First.Value;
                 this.channel.RemoveFirst();
             }
@@ -195,11 +195,11 @@ namespace Apache.NMS.ActiveMQ.Util
         {
             lock(this.mutex)
             {
-                if( Closed || !Running || Empty ) 
+                if( Closed || !Running || Empty )
                 {
                     return null;
                 }
-                
+
                 return channel.First.Value;
             }
         }
@@ -215,7 +215,7 @@ namespace Apache.NMS.ActiveMQ.Util
         public MessageDispatch[] RemoveAll()
         {
             MessageDispatch[] result;
-            
+
             lock(mutex)
             {
                 result = new MessageDispatch[this.Count];
@@ -225,6 +225,14 @@ namespace Apache.NMS.ActiveMQ.Util
 
             return result;
         }
+
+        public void Signal()
+        {
+            lock(mutex)
+            {
+                Monitor.PulseAll(this.mutex);
+            }
+        }
     }
 }
 

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs?rev=1621142&r1=1621141&r2=1621142&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
Thu Aug 28 14:30:49 2014
@@ -33,7 +33,7 @@ namespace Apache.NMS.ActiveMQ.Util
         {
             get;
         }
-        
+
         bool Closed
         {
             get;
@@ -61,7 +61,7 @@ namespace Apache.NMS.ActiveMQ.Util
         void Stop();
 
         void Close();
-        
+
         void Enqueue(MessageDispatch dispatch);
 
         void EnqueueFirst(MessageDispatch dispatch);
@@ -75,5 +75,7 @@ namespace Apache.NMS.ActiveMQ.Util
         void Clear();
 
         MessageDispatch[] RemoveAll();
+
+        void Signal();
     }
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs?rev=1621142&r1=1621141&r2=1621142&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
Thu Aug 28 14:30:49 2014
@@ -46,15 +46,15 @@ namespace Apache.NMS.ActiveMQ.Util
         {
             get{ return this.mutex; }
         }
-        
+
         public bool Closed
         {
-            get 
+            get
             {
-                return this.closed; 
+                return this.closed;
             }
-            
-            set 
+
+            set
             {
                 lock(this.mutex)
                 {
@@ -69,7 +69,7 @@ namespace Apache.NMS.ActiveMQ.Util
             {
                 return this.running;
             }
-            
+
             set
             {
                 lock(this.mutex)
@@ -126,12 +126,12 @@ namespace Apache.NMS.ActiveMQ.Util
                 {
                     this.running = false;
                     this.closed = true;
-                }          
+                }
 
                 Monitor.PulseAll(this.mutex);
-            }            
+            }
         }
-        
+
         public void Enqueue(MessageDispatch dispatch)
         {
             lock(this.mutex)
@@ -161,12 +161,12 @@ namespace Apache.NMS.ActiveMQ.Util
                 {
                     Monitor.Wait(this.mutex, timeout);
                 }
-        
-                if( Closed || !Running || Empty ) 
+
+                if( Closed || !Running || Empty )
                 {
                     return null;
                 }
-        
+
                 return RemoveFirst();
             }
         }
@@ -174,14 +174,14 @@ namespace Apache.NMS.ActiveMQ.Util
         public MessageDispatch DequeueNoWait()
         {
             MessageDispatch result = null;
-            
+
             lock(this.mutex)
             {
-                if( Closed || !Running || Empty ) 
+                if( Closed || !Running || Empty )
                 {
                     return null;
                 }
-                
+
                 result = RemoveFirst();
             }
 
@@ -192,11 +192,11 @@ namespace Apache.NMS.ActiveMQ.Util
         {
             lock(this.mutex)
             {
-                if( Closed || !Running || Empty ) 
+                if( Closed || !Running || Empty )
                 {
                     return null;
                 }
-                
+
                 return GetFirst();
             }
         }
@@ -215,7 +215,7 @@ namespace Apache.NMS.ActiveMQ.Util
         public MessageDispatch[] RemoveAll()
         {
             MessageDispatch[] result;
-            
+
             lock(mutex)
             {
                 result = new MessageDispatch[this.size];
@@ -234,6 +234,14 @@ namespace Apache.NMS.ActiveMQ.Util
             return result;
         }
 
+        public void Signal()
+        {
+            lock(mutex)
+            {
+                Monitor.PulseAll(this.mutex);
+            }
+        }
+
         protected int GetPriority(MessageDispatch message)
         {
             int priority = (int) MsgPriority.Lowest;

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs?rev=1621142&r1=1621141&r2=1621142&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs
Thu Aug 28 14:30:49 2014
@@ -72,7 +72,7 @@ namespace Apache.NMS.ActiveMQ.Test
                 using(ISession session = connection.CreateSession())
                 {
                     IDestination destination = session.GetQueue(destinationName);
-                    PurgeQueue(connection, destination);
+                    DeleteQueue(connection, destination);
                 }
 
                 Tracer.Debug("Test is putting " + MSG_COUNT + " messages on the queue: "
+ destinationName);
@@ -89,10 +89,12 @@ namespace Apache.NMS.ActiveMQ.Test
                     }
                     catch(TransactionRolledBackException)
                     {
+                        Tracer.Info("TEST: Caught expected TransactionRolledBackException");
                     }
-                    catch
+                    catch(Exception ex)
                     {
-                        Assert.Fail("Should have thrown a TransactionRolledBackException");
+                        Assert.Fail("Should have thrown a TransactionRolledBackException,
but was: " +
+                                    ex.GetType().Name);
                     }
                 }
 
@@ -240,6 +242,50 @@ namespace Apache.NMS.ActiveMQ.Test
             Assert.IsTrue(this.resumed);
         }
 
+        [Test]
+        public void TestMessageDeliveredAfterCommitFailsAndRollback()
+        {
+            string uri = "failover:(tcpfaulty://${activemqhost}:61616?transport.useLogging=true)";
+            IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri));
+            using(connection = factory.CreateConnection() as Connection)
+            {
+                using(ISession session = connection.CreateSession())
+                {
+                    IDestination destination = session.GetQueue(destinationName);
+                    DeleteQueue(connection, destination);
+                    PutOneMsgIntoQueue(session, destination);
+                }
+
+                using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional))
+                {
+                    connection.Start();
+
+                    ITransport transport = (connection as Connection).ITransport;
+                    TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport))
as TcpFaultyTransport;
+                    Assert.IsNotNull(tcpFaulty);
+                    tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook;
+
+                    IMessageConsumer consumer = session.CreateConsumer(session.GetQueue(destinationName));
+                    IMessage message = consumer.Receive(TimeSpan.FromSeconds(30));
+                    Assert.IsNotNull(message, "Message was not delivered");
+                    Tracer.Debug("Commiting transaction");
+
+                    try
+                    {
+                        Tracer.Info("Now attempting to commit the transaction");
+                        session.Commit();
+                    }
+                    catch (Exception ex)
+                    {
+                        Tracer.InfoFormat("Commit failed as expected. {0}", ex.Message);
+                    }
+
+                    message = consumer.Receive(TimeSpan.FromSeconds(30));
+                    Assert.IsNotNull(message, "message was not redilivered");
+                }
+            }
+        }
+
         public void TransportInterrupted()
         {
             this.interrupted = true;
@@ -252,15 +298,25 @@ namespace Apache.NMS.ActiveMQ.Test
 
         private void PutMsgIntoQueue(ISession session, IDestination destination)
         {
-            PutMsgIntoQueue(session, destination, true);
+            PutMsgIntoQueue(session, destination, true, MSG_COUNT);
+        }
+
+        private void PutOneMsgIntoQueue(ISession session, IDestination destination)
+        {
+            PutMsgIntoQueue(session, destination, true, 1);
         }
 
         private void PutMsgIntoQueue(ISession session, IDestination destination, bool commit)
         {
+            PutMsgIntoQueue(session, destination, commit, MSG_COUNT);
+        }
+
+        private void PutMsgIntoQueue(ISession session, IDestination destination, bool commit,
int count)
+        {
             using(IMessageProducer producer = session.CreateProducer(destination))
             {
                 ITextMessage message = session.CreateTextMessage();
-                for(int i = 0; i < MSG_COUNT; ++i)
+                for(int i = 0; i < count; ++i)
                 {
                     message.Text = "Test message " + (i + 1);
                     producer.Send(message);
@@ -284,6 +340,14 @@ namespace Apache.NMS.ActiveMQ.Test
             }
         }
 
+        private void DeleteQueue(IConnection connection, IDestination queue)
+        {
+            using (ISession session = connection.CreateSession())
+            {
+                session.DeleteDestination(queue);
+            }
+        }
+
         private void BreakConnection()
         {
             TcpTransport transport = this.connection.ITransport.Narrow(typeof(TcpTransport))
as TcpTransport;
@@ -303,13 +367,13 @@ namespace Apache.NMS.ActiveMQ.Test
                 TransactionInfo txInfo = command as TransactionInfo;
                 if (txInfo.Type == (byte)TransactionType.CommitOnePhase)
                 {
-                    Tracer.Debug("Closing the TcpTransport to simulate an connection drop.");
+                    Tracer.Debug("Exception from the Commit to simulate an connection drop.");
                     commitFailed = true;
-                    (transport as TcpTransport).Close();
+                    TcpTransport tcpTransport = transport as TcpTransport;
+                    tcpTransport.Close();
                 }
             }
         }
-
     }
 }
 



Mime
View raw message