activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r381643 [2/2] - in /incubator/activemq/trunk/openwire-dotnet: src/OpenWire.Client/ src/OpenWire.Client/Core/ src/OpenWire.Client/IO/ tests/OpenWire.Client/
Date Tue, 28 Feb 2006 12:36:29 GMT
Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs?rev=381643&r1=381642&r2=381643&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/MessageConsumer.cs Tue Feb
28 04:36:22 2006
@@ -22,10 +22,11 @@
 
 namespace OpenWire.Client
 {
-    public enum AckType {
+    public enum AckType
+    {
         DeliveredAck = 0, // Message delivered but not consumed
-        ConsumedAck = 1, // Message consumed, discard
-        PoisonAck = 2 // Message could not be processed due to poison pill but discard anyway
+        PoisonAck = 1,    // Message could not be processed due to poison pill but discard
anyway
+        ConsumedAck = 2   // Message consumed, discard
     }
     
     
@@ -40,9 +41,12 @@
         private AcknowledgementMode acknowledgementMode;
         private bool closed;
         private Dispatcher dispatcher = new Dispatcher();
+        private int maximumRedeliveryCount = 10;
+        private int redeliveryTimeout = 500;
         
         public event MessageListener Listener;
         
+        
         public MessageConsumer(Session session, ConsumerInfo info, AcknowledgementMode acknowledgementMode)
         {
             this.session = session;
@@ -50,12 +54,29 @@
             this.acknowledgementMode = acknowledgementMode;
         }
         
-        public ConsumerId ConsumerId {
+        public ConsumerId ConsumerId
+        {
             get {
                 return info.ConsumerId;
             }
         }
-
+        
+        public int MaximumRedeliveryCount
+        {
+            get { return maximumRedeliveryCount; }
+            set { maximumRedeliveryCount = value; }
+        }
+        
+        public int RedeliveryTimeout
+        {
+            get { return redeliveryTimeout; }
+            set { redeliveryTimeout = value; }
+        }
+        
+        public void RedeliverRolledBackMessages()
+        {
+            dispatcher.RedeliverRolledBackMessages();
+        }
         
         /// <summary>
         /// Method Dispatch
@@ -65,7 +86,8 @@
         {
             dispatcher.Enqueue(message);
             
-            if (Listener != null) {
+            if (Listener != null)
+            {
                 // lets dispatch to the thread pool for this connection for messages to be
processed
                 ThreadPool.QueueUserWorkItem(new WaitCallback(session.DispatchAsyncMessages));
             }
@@ -77,7 +99,7 @@
             return AutoAcknowledge(dispatcher.Dequeue());
         }
         
-        public IMessage Receive(long timeout)
+        public IMessage Receive(int timeout)
         {
             CheckClosed();
             return AutoAcknowledge(dispatcher.Dequeue(timeout));
@@ -102,12 +124,15 @@
         /// </summary>
         public void DispatchAsyncMessages()
         {
-            while (Listener != null) {
+            while (Listener != null)
+            {
                 IMessage message = dispatcher.DequeueNoWait();
-                if (message != null) {
+                if (message != null)
+                {
                     Listener(message);
                 }
-                else {
+                else
+                {
                     break;
                 }
             }
@@ -145,7 +170,7 @@
                 DoAcknowledge(message);
             }
         }
-
+        
         protected void DoAcknowledge(Message message)
         {
             MessageAck ack = CreateMessageAck(message);
@@ -163,10 +188,70 @@
             ack.FirstMessageId = message.MessageId;
             ack.LastMessageId = message.MessageId;
             ack.MessageCount = 1;
-            ack.TransactionId = message.TransactionId;
+            
+            if (session.Transacted)
+            {
+                session.DoStartTransaction();
+                ack.TransactionId = session.TransactionContext.TransactionId;
+                session.TransactionContext.AddSynchronization(new MessageConsumerSynchronization(this,
message));
+            }
             return ack;
         }
         
+        public void AfterRollback(ActiveMQMessage message)
+        {
+            // lets redeliver the message again
+            message.RedeliveryCounter += 1;
+            if (message.RedeliveryCounter > MaximumRedeliveryCount)
+            {
+                // lets send back a poisoned pill
+                MessageAck ack = new MessageAck();
+                ack.AckType = (int) AckType.PoisonAck;
+                ack.ConsumerId = info.ConsumerId;
+                ack.Destination = message.Destination;
+                ack.FirstMessageId = message.MessageId;
+                ack.LastMessageId = message.MessageId;
+                ack.MessageCount = 1;
+                session.Connection.OneWay(ack);
+            }
+            else
+            {
+                dispatcher.Redeliver(message);
+                
+                if (Listener != null)
+                {
+                    // lets re-dispatch the message at some point in the future
+                    Thread.Sleep(RedeliveryTimeout);
+                    ThreadPool.QueueUserWorkItem(new WaitCallback(session.DispatchAsyncMessages));
+                }
+            }
+        }
+    }
+    
+    // TODO maybe there's a cleaner way of creating stateful delegates to make this code
neater
+    class MessageConsumerSynchronization : ISynchronization
+    {
+        private MessageConsumer consumer;
+        private Message message;
+        
+        public MessageConsumerSynchronization(MessageConsumer consumer, Message message)
+        {
+            this.message = message;
+            this.consumer = consumer;
+        }
+        
+        public void BeforeCommit()
+        {
+        }
+        
+        public void AfterCommit()
+        {
+        }
+        
+        public void AfterRollback()
+        {
+            consumer.AfterRollback((ActiveMQMessage) message);
+        }
         
     }
 }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs?rev=381643&r1=381642&r2=381643&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/MessageProducer.cs Tue Feb
28 04:36:22 2006
@@ -29,11 +29,11 @@
         private ProducerInfo info;
         private long messageCounter;
         
-		bool persistent;
-		long timeToLive;
-		int priority;
-		bool disableMessageID;
-		bool disableMessageTimestamp;
+        bool persistent;
+        long timeToLive;
+        int priority;
+        bool disableMessageID;
+        bool disableMessageTimestamp;
         
         public MessageProducer(Session session, ProducerInfo info)
         {
@@ -59,42 +59,48 @@
             activeMessage.ProducerId = info.ProducerId;
             activeMessage.Destination = ActiveMQDestination.Transform(destination);
             
+            if (session.Transacted)
+            {
+                session.DoStartTransaction();
+                activeMessage.TransactionId = session.TransactionContext.TransactionId;
+            }
+            
             session.DoSend(destination, message);
         }
         
         public void Dispose()
         {
-            session.DisposeOf(info.ProducerId);
+            session.Connection.DisposeOf(info.ProducerId);
+        }
+        
+        public bool Persistent
+        {
+            get { return persistent; }
+            set { this.persistent = value; }
+        }
+        
+        public long TimeToLive
+        {
+            get { return timeToLive; }
+            set { this.timeToLive = value; }
         }
-
-		public bool Persistent
-		{
-			get { return persistent; }
-			set { this.persistent = value; }
-    }
-
-		public long TimeToLive
-		{
-			get { return timeToLive; }
-			set { this.timeToLive = value; }
-}
-		public int Priority
-		{
-			get { return priority; }
-			set { this.priority = value; }
-		}
-
-		public bool DisableMessageID
-		{
-			get { return disableMessageID; }
-			set { this.disableMessageID = value; }
-		}
-		
-		public bool DisableMessageTimestamp
-		{
-			get { return disableMessageTimestamp; }
-			set { this.disableMessageTimestamp = value; }
-		}
-
+        public int Priority
+        {
+            get { return priority; }
+            set { this.priority = value; }
+        }
+        
+        public bool DisableMessageID
+        {
+            get { return disableMessageID; }
+            set { this.disableMessageID = value; }
+        }
+        
+        public bool DisableMessageTimestamp
+        {
+            get { return disableMessageTimestamp; }
+            set { this.disableMessageTimestamp = value; }
+        }
+        
     }
 }

Modified: incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Session.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Session.cs?rev=381643&r1=381642&r2=381643&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Session.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/src/OpenWire.Client/Session.cs Tue Feb 28 04:36:22
2006
@@ -33,17 +33,20 @@
         private long producerCounter;
         private int prefetchSize = 1000;
         private IDictionary consumers = new Hashtable();
+        private TransactionContext transactionContext;
         
         public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode)
         {
             this.connection = connection;
             this.info = info;
             this.acknowledgementMode = acknowledgementMode;
+            transactionContext = new TransactionContext(this);
         }
         
+        
         public void Dispose()
         {
-            DisposeOf(info.SessionId);
+            connection.DisposeOf(info.SessionId);
         }
         
         public IMessageProducer CreateProducer()
@@ -174,13 +177,52 @@
             return answer;
         }
         
+        public void Commit()
+        {
+            if (! Transacted)
+            {
+                throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted
session. Acknowlegement mode is: " + acknowledgementMode);
+            }
+            transactionContext.Commit();
+        }
+        
+        public void Rollback()
+        {
+            if (! Transacted)
+            {
+                throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted
session. Acknowlegement mode is: " + acknowledgementMode);
+            }
+            transactionContext.Rollback();
+            
+            // lets ensure all the consumers redeliver any rolled back messages
+            foreach (MessageConsumer consumer in consumers.Values)
+            {
+                consumer.RedeliverRolledBackMessages();
+            }
+        }
+        
+        
         
         // Properties
+        
         public Connection Connection
         {
-            get {
-                return connection;
-            }
+            get { return connection; }
+        }
+        
+        public SessionId SessionId
+        {
+            get { return info.SessionId; }
+        }
+        
+        public bool Transacted
+        {
+            get { return acknowledgementMode == AcknowledgementMode.Transactional; }
+        }
+        
+        public TransactionContext TransactionContext
+        {
+            get { return transactionContext; }
         }
         
         // Implementation methods
@@ -191,21 +233,22 @@
             connection.SyncRequest(command);
         }
         
-        public void DisposeOf(DataStructure objectId)
+        /// <summary>
+        /// Ensures that a transaction is started
+        /// </summary>
+        public void DoStartTransaction()
         {
-            // TODO dispose of all the session first?
-            RemoveInfo command = new RemoveInfo();
-            command.ObjectId = objectId;
-            connection.SyncRequest(command);
+            if (Transacted)
+            {
+                transactionContext.Begin();
+            }
         }
         
         public void DisposeOf(ConsumerId objectId)
         {
             consumers.Remove(objectId);
             connection.RemoveConsumer(objectId);
-            RemoveInfo command = new RemoveInfo();
-            command.ObjectId = objectId;
-            connection.SyncRequest(command);
+            connection.DisposeOf(objectId);
         }
         
         public void DispatchAsyncMessages(object state)

Modified: incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/ConsumerTest.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/ConsumerTest.cs?rev=381643&r1=381642&r2=381643&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/ConsumerTest.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/ConsumerTest.cs Tue Feb
28 04:36:22 2006
@@ -27,65 +27,65 @@
     [TestFixture]
     public class ConsumerTest : TestSupport
     {
-		IConnectionFactory factory;
-		IConnection connection;
-		IDestination destination;
-
-		[SetUp]
-		protected void SetUp() 
-		{
-			factory = new ConnectionFactory("localhost", 61616);
-			connection = factory.CreateConnection();
-		}
-
-		[TearDown]
-		protected void TearDown() 
-		{
-			connection.Dispose();
-		}
+        IConnectionFactory factory;
+        IConnection connection;
+        IDestination destination;
+
+        [SetUp]
+        protected void SetUp()
+        {
+            factory = new ConnectionFactory("localhost", 61616);
+            connection = factory.CreateConnection();
+        }
+
+        [TearDown]
+        protected void TearDown()
+        {
+            connection.Dispose();
+        }
 
         [Test]
-		[Ignore("Not fully implemented yet.")]
-		public void testDurableConsumerSelectorChange()  
-		{
-
-			// Receive a message with the JMS API
-			connection.ClientId="test";
-			connection.Start();
-			
-			ISession session = connection.CreateSession(false, AcknowledgementMode.AutoAcknowledge);
-			destination = session.GetTopic("foo");
-			IMessageProducer producer = session.CreateProducer(destination);
-			producer.Persistent = true;
-			IMessageConsumer consumer = session.CreateDurableConsumer((ITopic)destination, "test",
"color='red'", false);
-
-			// Send the messages
-			ITextMessage message = session.CreateTextMessage("1st");
-			//message.SetStringProperty("color", "red");
-			producer.Send(message);
-	        
-			IMessage m = consumer.Receive(1000);
-			Assert.IsNotNull(m);
-			Assert.AreEqual("1st", ((ITextMessage)m).Text );
-
-			// Change the subscription.
-			consumer.Dispose();
-			consumer = session.CreateDurableConsumer((ITopic)destination, "test", "color='blue'",
false);
-	        
-			message = session.CreateTextMessage("2nd");
-			// message.setStringProperty("color", "red");
-			producer.Send(message);
-			message = session.CreateTextMessage("3rd");
-			 // message.setStringProperty("color", "blue");
-			producer.Send(message);
-
-			// Selector should skip the 2nd message.
-			m = consumer.Receive(1000);
-			Assert.IsNotNull(m);
-			Assert.AreEqual("3rd", ((ITextMessage)m).Text);
-	        
-			Assert.IsNull(consumer.ReceiveNoWait());
-		}
+        [Ignore("Not fully implemented yet.")]
+        public void testDurableConsumerSelectorChange()
+        {
+
+            // Receive a message with the JMS API
+            connection.ClientId="test";
+            connection.Start();
+            
+            ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+            destination = session.GetTopic("foo");
+            IMessageProducer producer = session.CreateProducer(destination);
+            producer.Persistent = true;
+            IMessageConsumer consumer = session.CreateDurableConsumer((ITopic)destination,
"test", "color='red'", false);
+
+            // Send the messages
+            ITextMessage message = session.CreateTextMessage("1st");
+            //message.SetStringProperty("color", "red");
+            producer.Send(message);
+            
+            IMessage m = consumer.Receive(1000);
+            Assert.IsNotNull(m);
+            Assert.AreEqual("1st", ((ITextMessage)m).Text );
+
+            // Change the subscription.
+            consumer.Dispose();
+            consumer = session.CreateDurableConsumer((ITopic)destination, "test", "color='blue'",
false);
+            
+            message = session.CreateTextMessage("2nd");
+            // message.setStringProperty("color", "red");
+            producer.Send(message);
+            message = session.CreateTextMessage("3rd");
+             // message.setStringProperty("color", "blue");
+            producer.Send(message);
+
+            // Selector should skip the 2nd message.
+            m = consumer.Receive(1000);
+            Assert.IsNotNull(m);
+            Assert.AreEqual("3rd", ((ITextMessage)m).Text);
+            
+            Assert.IsNull(consumer.ReceiveNoWait());
+        }
 
     }
 }

Modified: incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/TestSupport.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/TestSupport.cs?rev=381643&r1=381642&r2=381643&view=diff
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/TestSupport.cs (original)
+++ incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/TestSupport.cs Tue Feb
28 04:36:22 2006
@@ -73,11 +73,17 @@
         
         protected virtual IDestination CreateDestination(ISession session)
         {
-            string name = "Test.DotNet." + GetType().Name;
+            string name = CreateDestinationName();
             IDestination destination = session.GetQueue(name);
             
             Console.WriteLine("Using queue: " + destination);
             return destination;
+        }
+
+        protected virtual string CreateDestinationName()
+        {
+            string name = "Test.DotNet." + GetType().Name;
+            return name;
         }
         
         protected virtual IMessage CreateMessage(ISession session)

Added: incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/TransactionTest.cs
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/TransactionTest.cs?rev=381643&view=auto
==============================================================================
--- incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/TransactionTest.cs (added)
+++ incubator/activemq/trunk/openwire-dotnet/tests/OpenWire.Client/TransactionTest.cs Tue
Feb 28 04:36:22 2006
@@ -0,0 +1,290 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.IO;
+
+using NUnit.Framework;
+
+using OpenWire.Client;
+using OpenWire.Client.Core;
+using System.Collections;
+
+namespace OpenWire.Client
+{
+    [TestFixture]
+    public class TransactionTest : TestSupport
+    {
+        private static int destinationCounter;
+        
+        IDestination destination;
+        IConnection connection;
+        ISession session;
+        IMessageProducer producer;
+        IMessageConsumer consumer;
+        
+        [SetUp]
+        public void SetUp()
+        {
+            Connect();
+            
+            // lets consume any outstanding messages from previous test runs
+            while (consumer.Receive(1000) != null)
+            {
+            }
+            session.Commit();
+        }
+        
+        
+        
+        [TearDown]
+        public void TearDown()
+        {
+            Disconnect();
+        }
+        
+        [Test]
+        public void TestSendRollback()
+        {
+            IMessage[] outbound = new IMessage[]{
+                session.CreateTextMessage("First Message"),
+                session.CreateTextMessage("Second Message")
+            };
+            
+            //sends a message
+            producer.Send(outbound[0]);
+            session.Commit();
+            
+            //sends a message that gets rollbacked
+            producer.Send(session.CreateTextMessage("I'm going to get rolled back."));
+            session.Rollback();
+            
+            //sends a message
+            producer.Send(outbound[1]);
+            session.Commit();
+            
+            //receives the first message
+            ArrayList messages = new ArrayList();
+            Console.WriteLine("About to consume message 1");
+            IMessage message = consumer.Receive(1000);
+            messages.Add(message);
+            Console.WriteLine("Received: " + message);
+            
+            //receives the second message
+            Console.WriteLine("About to consume message 2");
+            message = consumer.Receive(4000);
+            messages.Add(message);
+            Console.WriteLine("Received: " + message);
+            
+            //validates that the rollbacked was not consumed
+            session.Commit();
+            IMessage[] inbound = new IMessage[messages.Count];
+            messages.CopyTo(inbound);
+            AssertTextMessagesEqual("Rollback did not work.", outbound, inbound);
+        }
+        
+        [Test]
+        public void TestSendSessionClose()
+        {
+            IMessage[] outbound = new IMessage[]{
+                session.CreateTextMessage("First Message"),
+                session.CreateTextMessage("Second Message")
+            };
+            
+            //sends a message
+            producer.Send(outbound[0]);
+            session.Commit();
+            
+            //sends a message that gets rollbacked
+            producer.Send(session.CreateTextMessage("I'm going to get rolled back."));
+            consumer.Dispose();
+            session.Dispose();
+            
+            Reconnect();
+            
+            //sends a message
+            producer.Send(outbound[1]);
+            session.Commit();
+            
+            //receives the first message
+            ArrayList messages = new ArrayList();
+            Console.WriteLine("About to consume message 1");
+            IMessage message = consumer.Receive(1000);
+            messages.Add(message);
+            Console.WriteLine("Received: " + message);
+            
+            //receives the second message
+            Console.WriteLine("About to consume message 2");
+            message = consumer.Receive(4000);
+            messages.Add(message);
+            Console.WriteLine("Received: " + message);
+            
+            //validates that the rollbacked was not consumed
+            session.Commit();
+            IMessage[] inbound = new IMessage[messages.Count];
+            messages.CopyTo(inbound);
+            AssertTextMessagesEqual("Rollback did not work.", outbound, inbound);
+        }
+        
+        [Test]
+        public void TestReceiveRollback()
+        {
+            IMessage[] outbound = new IMessage[]{
+                session.CreateTextMessage("First Message"),
+                session.CreateTextMessage("Second Message")
+            };
+            
+            //sent both messages
+            producer.Send(outbound[0]);
+            producer.Send(outbound[1]);
+            session.Commit();
+            
+            Console.WriteLine("Sent 0: " + outbound[0]);
+            Console.WriteLine("Sent 1: " + outbound[1]);
+            
+            ArrayList messages = new ArrayList();
+            IMessage message = consumer.Receive(1000);
+            messages.Add(message);
+            Assert.AreEqual(outbound[0], message);
+            session.Commit();
+            
+            // rollback so we can get that last message again.
+            message = consumer.Receive(1000);
+            Assert.IsNotNull(message);
+            Assert.AreEqual(outbound[1], message);
+            session.Rollback();
+            
+            // Consume again.. the previous message should
+            // get redelivered.
+            message = consumer.Receive(5000);
+            Assert.IsNotNull(message, "Should have re-received the message again!");
+            messages.Add(message);
+            session.Commit();
+            
+            IMessage[] inbound = new IMessage[messages.Count];
+            messages.CopyTo(inbound);
+            AssertTextMessagesEqual("Rollback did not work", outbound, inbound);
+        }
+        
+        
+        [Test]
+        public void TestReceiveTwoThenRollback()
+        {
+            IMessage[] outbound = new IMessage[]{
+                session.CreateTextMessage("First Message"),
+                session.CreateTextMessage("Second Message")
+            };
+            
+            producer.Send(outbound[0]);
+            producer.Send(outbound[1]);
+            session.Commit();
+            
+            Console.WriteLine("Sent 0: " + outbound[0]);
+            Console.WriteLine("Sent 1: " + outbound[1]);
+            
+            ArrayList messages = new ArrayList();
+            IMessage message = consumer.Receive(1000);
+            AssertTextMessageEqual("first mesage received before rollback", outbound[0],
message);
+            
+            message = consumer.Receive(1000);
+            Assert.IsNotNull(message);
+            AssertTextMessageEqual("second message received before rollback", outbound[1],
message);
+            session.Rollback();
+            
+            // Consume again.. the previous message should
+            // get redelivered.
+            message = consumer.Receive(5000);
+            Assert.IsNotNull(message, "Should have re-received the first message again!");
+            messages.Add(message);
+            AssertTextMessageEqual("first message received after rollback", outbound[0],
message);
+            
+            message = consumer.Receive(5000);
+            Assert.IsNotNull(message, "Should have re-received the second message again!");
+            messages.Add(message);
+            AssertTextMessageEqual("second message received after rollback", outbound[1],
message);
+            
+            Assert.IsNull(consumer.ReceiveNoWait());
+            session.Commit();
+            
+            IMessage[] inbound = new IMessage[messages.Count];
+            messages.CopyTo(inbound);
+            AssertTextMessagesEqual("Rollback did not work", outbound, inbound);
+        }
+        
+        protected override string CreateDestinationName()
+        {
+            // TODO - how can we get the test name?
+            return base.CreateDestinationName() + (++destinationCounter);
+        }
+        
+        protected void AssertTextMessagesEqual(String message, IMessage[] expected, IMessage[]
actual)
+        {
+            Assert.AreEqual(expected.Length, actual.Length, "Incorrect number of messages
received");
+            
+            for (int i = 0; i < expected.Length; i++)
+            {
+                AssertTextMessageEqual(message + ". Index: " + i, expected[i], actual[i]);
+            }
+        }
+        
+        protected void AssertTextMessageEqual(String message, IMessage expected, IMessage
actual)
+        {
+            Assert.IsTrue(expected is ITextMessage, "expected object not a text message");
+            Assert.IsTrue(actual is ITextMessage, "actual object not a text message");
+            
+            String expectedText = ((ITextMessage) expected).Text;
+            String actualText = ((ITextMessage) actual).Text;
+            
+            Assert.AreEqual(expectedText, actualText, message);
+        }
+        
+        protected void Connect()
+        {
+            IConnectionFactory factory = new ConnectionFactory("localhost", 61616);
+            
+            connection = factory.CreateConnection();
+            
+            session = connection.CreateSession(AcknowledgementMode.Transactional);
+            
+            // reuse the same destination if we reconnect
+            if (destination == null)
+            {
+                destination = CreateDestination(session);
+            }
+            
+            consumer = session.CreateConsumer(destination);
+            
+            producer = session.CreateProducer(destination);
+        }
+        
+        
+        protected void Disconnect()
+        {
+            if (connection != null)
+            {
+                connection.Dispose();
+                connection = null;
+            }
+        }
+        
+        protected void Reconnect()
+        {
+            Disconnect();
+            Connect();
+        }
+        
+    }
+}



Mime
View raw message