activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1073790 - /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
Date Wed, 23 Feb 2011 15:42:36 GMT
Author: tabish
Date: Wed Feb 23 15:42:36 2011
New Revision: 1073790

URL: http://svn.apache.org/viewvc?rev=1073790&view=rev
Log:
Adds a somewhat contrived test case for using async consumers with .NET transactions.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs?rev=1073790&r1=1073789&r2=1073790&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
Wed Feb 23 15:42:36 2011
@@ -15,10 +15,14 @@
  * limitations under the License.
  */
 
+using System;
+using System.Data.SqlClient;
 using System.IO;
 using System.Threading;
+using System.Transactions;
 using Apache.NMS.ActiveMQ.Transport;
 using Apache.NMS.ActiveMQ.Transport.Tcp;
+using Apache.NMS.Util;
 using NUnit.Framework;
 
 namespace Apache.NMS.ActiveMQ.Test.src.test.csharp
@@ -296,5 +300,83 @@ namespace Apache.NMS.ActiveMQ.Test.src.t
             VerifyBrokerQueueCount();
         }
 
+        #region Asynchronous Consumer Inside of a Transaction Test / Example
+
+        private const int BATCH_COUNT = 5;
+        private int batchSequence;
+        private DependentTransaction batchTxControl;
+        private readonly ManualResetEvent awaitBatchProcessingStart = new ManualResetEvent(false);
+
+        [Test]
+        public void TestTransactedAsyncConsumption()
+        {
+            PurgeDatabase();
+            PurgeAndFillQueue(MSG_COUNT * BATCH_COUNT);
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            using (INetTxSession session = connection.CreateNetTxSession())
+            {
+                IQueue queue = session.GetQueue(testQueueName);
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                consumer.Listener += AsyncTxAwareOnMessage;
+
+                connection.Start();
+
+                for (int i = 0; i < BATCH_COUNT; ++i)
+                {
+                    using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
+                    {
+                        batchTxControl = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
+                        awaitBatchProcessingStart.Set();
+                        scoped.Complete();
+                    }                    
+                }
+            }
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsFull(MSG_COUNT * BATCH_COUNT);
+
+            // check messages are NOT present in the queue
+            VerifyNoMessagesInQueue();
+        }
+
+        private void AsyncTxAwareOnMessage(IMessage message)
+        {
+            awaitBatchProcessingStart.WaitOne();
+
+            try
+            {
+                using (TransactionScope scoped = new TransactionScope(batchTxControl))
+                using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+                using (SqlCommand sqlInsertCommand = new SqlCommand())
+                {
+                    sqlConnection.Open();
+                    sqlInsertCommand.Connection = sqlConnection;
+
+                    ITextMessage textMessage = message as ITextMessage;
+                    Assert.IsNotNull(message, "missing message");
+                    sqlInsertCommand.CommandText =
+                        string.Format("INSERT INTO {0} VALUES ({1})", testTable, Convert.ToInt32(textMessage.Text));
+                    sqlInsertCommand.ExecuteNonQuery();
+                    scoped.Complete();
+                }
+
+                if(++batchSequence == MSG_COUNT)
+                {
+                    batchSequence = 0;
+                    awaitBatchProcessingStart.Reset();
+                    batchTxControl.Complete();
+                }
+            }
+            catch (Exception e)
+            {
+                Tracer.Debug("TX;Error from TransactionScope: " + e.Message);
+                Tracer.Debug(e.ToString());
+            }
+        }
+
+        #endregion
     }
 }



Mime
View raw message