activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1083101 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x: ./ src/test/csharp/
Date Fri, 18 Mar 2011 23:06:33 GMT
Author: tabish
Date: Fri Mar 18 23:06:32 2011
New Revision: 1083101

URL: http://svn.apache.org/viewvc?rev=1083101&view=rev
Log:
Fix for https://issues.apache.org/jira/browse/AMQNET-321

Add DTC Tests to this release.  Only run in manual mode.

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcBasicTransactionsTest.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcConsumerTransactionsTest.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcProducerTransactionsTest.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcTransactionsTestSupport.cs
Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/nant.build
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/vs2008-activemq-test.csproj

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/nant.build
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/nant.build?rev=1083101&r1=1083100&r2=1083101&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/nant.build (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/nant.build Fri Mar 18 23:06:32 2011
@@ -86,6 +86,7 @@
             <include name="${current.build.framework.assembly.dir}/mscorlib.dll" />
             <include name="${current.build.framework.assembly.dir}/System.dll" />
             <include name="${current.build.framework.assembly.dir}/System.Xml.dll" />
+            <include name="${current.build.framework.assembly.dir}/System.Data.dll" />
             <include name="${current.build.framework.assembly.dir}/System.Transactions.dll" />
             <include name="${Apache.NMS.dll}" />
             <include name="${Apache.NMS.Test.dll}" />

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcBasicTransactionsTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcBasicTransactionsTest.cs?rev=1083101&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcBasicTransactionsTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcBasicTransactionsTest.cs Fri Mar 18 23:06:32 2011
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Threading;
+using Apache.NMS.ActiveMQ.Transactions;
+using NUnit.Framework;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+    [TestFixture]
+    [Category("Manual")]
+    class DtcBasicTransactionsTest : DtcTransactionsTestSupport
+    {
+        [Test]
+        [ExpectedException("Apache.NMS.NMSException")]
+        public void TestSessionCreateFailsWithInvalidLogLocation()
+        {
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                NetTxConnection con = connection as NetTxConnection;
+                NetTxRecoveryPolicy policy = con.RecoveryPolicy;
+                (policy.RecoveryLogger as RecoveryFileLogger).Location = nonExistantPath;
+                connection.CreateNetTxSession();
+            }
+        }
+
+        [Test]
+        public void TestTransactedDBReadAndProduce()
+        {
+            // Test initialize - Fills in DB with data to send.
+            PrepareDatabase();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsEmpty();
+
+            // check messages are present in the queue
+            VerifyBrokerQueueCount();
+        }
+
+        [Test]
+        public void TestTransacteDequeueAndDbWrite()
+        {
+            // Test initialize - Fills in DB with data to send.
+            PurgeDatabase();
+            PurgeAndFillQueue();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ReadFromQueueAndInsertIntoDbWithCommit(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // verify sql server has commited the transaction and stored all messages
+            VerifyDatabaseTableIsFull();
+
+            // check no messages are present in the queue after commit.
+            VerifyNoMessagesInQueueNoRecovery();
+        }
+    }
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcConsumerTransactionsTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcConsumerTransactionsTest.cs?rev=1083101&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcConsumerTransactionsTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcConsumerTransactionsTest.cs Fri Mar 18 23:06:32 2011
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Data.SqlClient;
+using System.IO;
+using System.Threading;
+using System.Transactions;
+using Apache.NMS.ActiveMQ.Transport;
+using Apache.NMS.ActiveMQ.Transport.Tcp;
+using Apache.NMS.Util;
+using NUnit.Framework;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+    [TestFixture]
+    [Category("Manual")]
+    class DtcConsumerTransactionsTest : DtcTransactionsTestSupport
+    {
+        [Test]
+        public void TestRecoveryAfterCommitFailsBeforeSent()
+        {
+            // Test initialize - Fills in queue with data to send and clears the DB.
+            PurgeDatabase();
+            PurgeAndFillQueue();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook;
+
+                ReadFromQueueAndInsertIntoDbWithCommit(connection);
+
+                Thread.Sleep(1000);
+            }
+
+            // transaction should not have been commited
+            VerifyNoMessagesInQueueNoRecovery();
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsFull();
+
+            // check messages are not present in the queue
+            VerifyNoMessagesInQueue();
+        }
+
+        [Test]
+        public void TestRecoveryAfterCommitFailsAfterSent()
+        {
+            // Test initialize - Fills in queue with data to send and clears the DB.
+            PurgeDatabase();
+            PurgeAndFillQueue();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPostProcessor += this.FailOnCommitTransportHook;
+
+                ReadFromQueueAndInsertIntoDbWithCommit(connection);
+
+                Thread.Sleep(1000);
+            }
+
+            // transaction should have been commited
+            VerifyNoMessagesInQueueNoRecovery();
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsFull();
+
+            // check messages are not present in the queue
+            VerifyNoMessagesInQueue();
+        }
+
+        [Test]
+        public void TestIterativeTransactedConsume()
+        {
+            // Test initialize - Fills in queue with data to send and clears the DB.
+            PurgeDatabase();
+            PurgeAndFillQueue(5 * MSG_COUNT);
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ReadFromQueueAndInsertIntoDbWithCommit(connection);
+                ReadFromQueueAndInsertIntoDbWithCommit(connection);
+                ReadFromQueueAndInsertIntoDbWithCommit(connection);
+                ReadFromQueueAndInsertIntoDbWithCommit(connection);
+                ReadFromQueueAndInsertIntoDbWithCommit(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsFull(5 * MSG_COUNT);
+
+            // check messages are NOT present in the queue
+            VerifyNoMessagesInQueueNoRecovery();
+        }
+
+        [Test]
+        public void TestConsumeWithDBInsertLogLocation()
+        {
+            const string logLocation = @".\RecoveryDir";
+            const string newConnectionUri =
+                connectionURI + "?nms.RecoveryPolicy.RecoveryLogger.Location=" + logLocation;
+
+            // Test initialize - Fills in queue with data to send and clears the DB.
+            PurgeDatabase();
+            PurgeAndFillQueue();
+
+            if (Directory.Exists(logLocation))
+            {
+                Directory.Delete(logLocation, true);
+            }
+
+            Directory.CreateDirectory(logLocation);
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(newConnectionUri));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook;
+
+                ReadFromQueueAndInsertIntoDbWithCommit(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            Assert.AreEqual(1, Directory.GetFiles(logLocation).Length);
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsFull();
+
+            // check messages are NOT present in the queue
+            VerifyBrokerQueueCount(0, newConnectionUri);
+
+            Assert.AreEqual(0, Directory.GetFiles(logLocation).Length);
+        }
+
+        [Test]
+        public void TestRecoverAfterTransactionScopeAborted()
+        {
+            // Test initialize - Fills in queue with data to send and clears the DB.
+            PurgeDatabase();
+            PurgeAndFillQueue();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ReadFromQueueAndInsertIntoDbWithScopeAborted(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // verify sql server has NOT commited the transaction                    
+            VerifyDatabaseTableIsEmpty();
+
+            // check messages are present in the queue
+            VerifyBrokerQueueCount();
+        }
+
+        [Test]
+        public void TestRecoverAfterRollbackFailWhenScopeAborted()
+        {
+            // Test initialize - Fills in queue with data to send and clears the DB.
+            PurgeDatabase();
+            PurgeAndFillQueue();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPreProcessor += this.FailOnRollbackTransportHook;
+
+                ReadFromQueueAndInsertIntoDbWithScopeAborted(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // verify sql server has NOT commited the transaction                    
+            VerifyDatabaseTableIsEmpty();
+
+            // check messages are recovered and present in the queue 
+            VerifyBrokerQueueCount();
+        }
+
+        [Test]
+        public void TestRecoverAfterFailOnTransactionBeforePrepareSent()
+        {
+            // Test initialize - Fills in queue with data to send and clears the DB.
+            PurgeDatabase();
+            PurgeAndFillQueue();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPreProcessor += this.FailOnPrepareTransportHook;
+
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ReadFromQueueAndInsertIntoDbWithCommit(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // Messages are visible since no prepare sent
+            VerifyBrokerQueueCountNoRecovery();
+
+            // verify sql server has NOT commited the transaction                    
+            VerifyDatabaseTableIsEmpty();
+
+            // check messages are present in the queue
+            VerifyBrokerQueueCount();
+        }
+
+        [Test]
+        public void TestRecoverAfterFailOnTransactionAfterPrepareSent()
+        {
+            // Test initialize - Fills in queue with data to send and clears the DB.
+            PurgeDatabase();
+            PurgeAndFillQueue();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPostProcessor += this.FailOnPrepareTransportHook;
+
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ReadFromQueueAndInsertIntoDbWithCommit(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // not visible yet because it must be rolled back
+            VerifyNoMessagesInQueueNoRecovery();
+
+            // verify sql server has NOT commited the transaction                    
+            VerifyDatabaseTableIsEmpty();
+
+            // check messages are present in the queue
+            VerifyBrokerQueueCount();
+        }
+
+        #region Asynchronous Consumer Inside of a Transaction Test / Example
+
+        private const int BATCH_COUNT = 5;
+        private int batchSequence;
+        private DependentTransaction batchTxControl;
+        private readonly ManualResetEvent awaitBatchProcessingStart = new ManualResetEvent(false);
+
+        [Test]
+        public void TestTransactedAsyncConsumption()
+        {
+            PurgeDatabase();
+            PurgeAndFillQueue(MSG_COUNT * BATCH_COUNT);
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            using (NetTxSession session = connection.CreateNetTxSession() as NetTxSession)
+            {
+                IQueue queue = session.GetQueue(testQueueName);
+                IMessageConsumer consumer = session.CreateConsumer(queue);
+                consumer.Listener += AsyncTxAwareOnMessage;
+
+                // Be carefull, message are dispatched once this is done, so you could receive
+                // a Message outside a TX.  We use the awaitBatchProcessingStart event here to
+                // gate te OnMessage callback, once that method returns the Message is ack'd and
+                // no longer has a chance to participate in a TX.
+                connection.Start();
+
+                for (int i = 0; i < BATCH_COUNT; ++i)
+                {
+                    using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
+                    {
+                        session.Enlist(Transaction.Current);
+
+                        batchTxControl = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
+                        awaitBatchProcessingStart.Set();
+                        scoped.Complete();
+                    }
+
+                    // Reenlisting to fast seems to annoy the DTC.  Also since DTC operations are
+                    // async we need to allow a little time for lag so that the last TX actually 
+                    // completes before we start a new one.
+                    Thread.Sleep(250);
+                }
+            }
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsFull(MSG_COUNT * BATCH_COUNT);
+
+            // check messages are NOT present in the queue
+            VerifyNoMessagesInQueue();
+        }
+
+        private void AsyncTxAwareOnMessage(IMessage message)
+        {
+            awaitBatchProcessingStart.WaitOne();
+
+            try
+            {
+                using (TransactionScope scoped = new TransactionScope(batchTxControl))
+                using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+                using (SqlCommand sqlInsertCommand = new SqlCommand())
+                {
+                    sqlConnection.Open();
+                    sqlInsertCommand.Connection = sqlConnection;
+
+                    ITextMessage textMessage = message as ITextMessage;
+                    Assert.IsNotNull(message, "missing message");
+                    sqlInsertCommand.CommandText =
+                        string.Format("INSERT INTO {0} VALUES ({1})", testTable, Convert.ToInt32(textMessage.Text));
+                    sqlInsertCommand.ExecuteNonQuery();
+                    scoped.Complete();
+                }
+
+                if(++batchSequence == MSG_COUNT)
+                {
+                    batchSequence = 0;
+                    awaitBatchProcessingStart.Reset();
+                    batchTxControl.Complete();
+                }
+            }
+            catch (Exception e)
+            {
+                Tracer.Debug("TX;Error from TransactionScope: " + e.Message);
+                Tracer.Debug(e.ToString());
+            }
+        }
+
+        #endregion
+    }
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcProducerTransactionsTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcProducerTransactionsTest.cs?rev=1083101&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcProducerTransactionsTest.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcProducerTransactionsTest.cs Fri Mar 18 23:06:32 2011
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.IO;
+using System.Threading;
+using Apache.NMS.ActiveMQ.Transactions;
+using Apache.NMS.ActiveMQ.Transport;
+using Apache.NMS.ActiveMQ.Transport.Tcp;
+using NUnit.Framework;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+    [TestFixture]
+    [Category("Manual")]
+    class DtcProducerTransactionsTest : DtcTransactionsTestSupport
+    {
+        [Test]
+        public void TestRecoverAfterFailOnTransactionCommit()
+        {
+            // Test initialize - Fills in DB with data to send.
+            PrepareDatabase();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook;
+
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+
+                Thread.Sleep(1000);
+            }
+
+            // transaction should not have been commited
+            VerifyNoMessagesInQueueNoRecovery();
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsEmpty();
+
+            // check messages are present in the queue
+            VerifyBrokerQueueCount();
+        }
+
+        [Test]
+        public void TestRecoverAfterFailOnTransactionPostCommitSend()
+        {
+            // Test initialize - Fills in DB with data to send.
+            PrepareDatabase();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPostProcessor += this.FailOnCommitTransportHook;
+
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+
+                Thread.Sleep(1000);
+            }
+
+            // transaction should have been commited
+            VerifyBrokerQueueCountNoRecovery();
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsEmpty();
+
+            // check messages are present in the queue
+            VerifyBrokerQueueCount();
+        }
+
+        [Test]
+        public void TestNoRecoverAfterFailOnTransactionWhenLogDeleted()
+        {
+            // Test initialize - Fills in DB with data to send.
+            PrepareDatabase();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+            NetTxConnectionFactory netTxFactory = factory as NetTxConnectionFactory;
+            RecoveryFileLogger logger = netTxFactory.RecoveryPolicy.RecoveryLogger as RecoveryFileLogger;
+            string logDirectory = logger.Location;
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook;
+
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // transaction should not have been commited
+            VerifyNoMessagesInQueueNoRecovery();
+
+            // delete all recovery files            
+            foreach (string file in Directory.GetFiles(logDirectory, "*.bin"))
+            {
+                File.Delete(file);
+            }
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsEmpty();
+
+            // check messages are NOT present in the queue bacause recovery file has been deleted
+            VerifyNoMessagesInQueue();
+        }
+
+        [Test]
+        public void TestNoRecoverAfterFailOnTransactionWhenLogWriteFails()
+        {
+            // Test initialize - Fills in DB with data to send.
+            PrepareDatabase();
+
+            const string newConnectionUri = 
+                connectionURI + "?nms.RecoveryPolicy.RecoveryLoggerType=harness";
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(newConnectionUri));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                IRecoveryLogger logger = (connection as NetTxConnection).RecoveryPolicy.RecoveryLogger;
+                Assert.IsNotNull(logger);
+                RecoveryLoggerHarness harness = logger as RecoveryLoggerHarness;
+                Assert.IsNotNull(harness);
+
+                harness.PreLogRecoveryInfoEvent += FailOnPreLogRecoveryHook;
+
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // verify sql server has not commited the transaction                    
+            VerifyDatabaseTableIsFull();
+
+            // check messages are not present in the queue
+            VerifyNoMessagesInQueue();
+        }
+
+        [Test]
+        public void TestRecoverAfterFailOnTransactionBeforePrepareSent()
+        {
+            // Test initialize - Fills in DB with data to send.
+            PrepareDatabase();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPreProcessor += this.FailOnPrepareTransportHook;
+
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // verify sql server has not commited the transaction                    
+            VerifyDatabaseTableIsFull();
+
+            // check messages are not present in the queue
+            VerifyNoMessagesInQueue();
+        }
+
+        [Test]
+        public void TestRecoverAfterFailOnTransactionDuringPrepareSend()
+        {
+            // Test initialize - Fills in DB with data to send.
+            PrepareDatabase();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPostProcessor += this.FailOnPrepareTransportHook;
+
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsFull();
+
+            // check messages are present in the queue
+            VerifyNoMessagesInQueue();
+        }
+
+        [Test]
+        public void TestRecoverAfterTransactionScopeAborted()
+        {
+            // Test initialize - Fills in DB with data to send.
+            PrepareDatabase();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ReadFromDbAndProduceToQueueWithScopeAborted(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // verify sql server has NOT commited the transaction                    
+            VerifyDatabaseTableIsFull();
+
+            // check messages are NOT present in the queue
+            VerifyNoMessagesInQueue();
+        }
+
+        [Test]
+        public void TestRecoverAfterRollbackFailWhenScopeAborted()
+        {
+            // Test initialize - Fills in DB with data to send.
+            PrepareDatabase();
+
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport)) as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPreProcessor += this.FailOnRollbackTransportHook;
+
+                ReadFromDbAndProduceToQueueWithScopeAborted(connection);
+
+                Thread.Sleep(2000);
+            }
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsFull();
+
+            // before recovering, messages should NOT be present in the queue
+            VerifyNoMessagesInQueueNoRecovery();
+
+            // check messages are not present in the queue after recover
+            VerifyNoMessagesInQueue();
+        }
+
+        [Test]
+        public void TestIterativeTransactedProduceWithDBDelete()
+        {
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                connection.ExceptionListener += this.OnException;
+                connection.Start();
+
+                PrepareDatabase();
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+
+                PrepareDatabase();
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+
+                PrepareDatabase();
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+
+                PrepareDatabase();
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+
+                PrepareDatabase();
+                ReadFromDbAndProduceToQueueWithCommit(connection);
+            }
+
+            // verify sql server has commited the transaction                    
+            VerifyDatabaseTableIsEmpty();
+
+            // check messages are present in the queue
+            VerifyBrokerQueueCount(MSG_COUNT * 5);
+        }
+
+    }
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcTransactionsTestSupport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcTransactionsTestSupport.cs?rev=1083101&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcTransactionsTestSupport.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/DtcTransactionsTestSupport.cs Fri Mar 18 23:06:32 2011
@@ -0,0 +1,664 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.IO;
+using System.Transactions;
+using System.Threading;
+
+using NUnit.Framework;
+using Apache.NMS.Test;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.Transport;
+using System.Data.SqlClient;
+using System.Collections;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+    // PREREQUISITES to run those tests :
+    // - A local instance of sql server 2008 running, with a db (e.g. TestDB) that 
+    //   as a table (e.g. TestTable) with a single column (e.g. TestID) of type INT.
+    //   The test default to using an SQL Connection string with a user id of 
+    //   'user' and the password 'password'
+    // - AMQ Server 5.4.2+
+    // - NMS 1.5+
+    //
+    // IMPORTANT
+    // Because you cannot perform recovery in a process more than once you cannot
+    // run these tests sequentially in the NUnit GUI or NUnit Console runner you
+    // must run them one at a time from the console or using a tool like the ReSharper
+    // plugin for Visual Studio.
+    //
+
+    public class DtcTransactionsTestSupport : NMSTestSupport
+    {
+        protected const int MSG_COUNT = 5;
+        protected string nonExistantPath;
+        
+        private ITrace oldTracer;
+
+        protected const string sqlConnectionString =
+            "Data Source=localhost;Initial Catalog=TestDB;User ID=user;Password=password";
+        protected const string testTable = "TestTable";
+        protected const string testColumn = "TestID";
+        protected const string testQueueName = "TestQueue";
+        protected const string connectionURI = "tcpfaulty://${activemqhost}:61616";
+
+        [SetUp]
+        public override void SetUp()
+        {
+            this.oldTracer = Tracer.Trace;
+            this.nonExistantPath = Directory.GetCurrentDirectory() + Path.DirectorySeparatorChar + Guid.NewGuid();
+
+            base.SetUp();
+
+            PurgeDestination();
+        }
+
+        [TearDown]
+        public override void TearDown()
+        {
+            DeleteDestination();
+            
+            base.TearDown();
+
+            Tracer.Trace = this.oldTracer;
+        }
+
+        protected void OnException(Exception ex)
+        {
+            Tracer.Debug("Test Driver received Error Notification: " + ex.Message);
+        }
+
+        #region Database Utility Methods
+
+        protected static void PrepareDatabase()
+        {
+            using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+            {
+                sqlConnection.Open();
+
+                // remove all data from test table
+                using (SqlCommand sqlCommand = new SqlCommand(string.Format("TRUNCATE TABLE {0}", testTable), sqlConnection))
+                {
+                    sqlCommand.ExecuteNonQuery();
+                }
+
+                // add some data to test table
+                for (int i = 0; i < MSG_COUNT; ++i)
+                {
+                    using (SqlCommand sqlCommand = new SqlCommand(
+                        string.Format(
+                                        "INSERT INTO {0} ({1}) values ({2})",
+                                        testTable,
+                                        testColumn,
+                                        i), sqlConnection))
+                    {
+                        sqlCommand.ExecuteNonQuery();
+                    }
+                }
+
+                sqlConnection.Close();
+            }
+        }
+
+        protected static void PurgeDatabase()
+        {
+            using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+            {
+                sqlConnection.Open();
+
+                // remove all data from test table
+                using (SqlCommand sqlCommand = new SqlCommand(string.Format("TRUNCATE TABLE {0}", testTable), sqlConnection))
+                {
+                    sqlCommand.ExecuteNonQuery();
+                }
+
+                sqlConnection.Close();
+            }
+        }
+
+        protected static IList ExtractDataSet()
+        {
+            IList entries = new ArrayList();
+
+            using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+            {
+                sqlConnection.Open();
+
+                using (SqlCommand sqlReadCommand = new SqlCommand(
+                    string.Format("SELECT {0} FROM {1}", testColumn, testTable), sqlConnection))
+                using (SqlDataReader reader = sqlReadCommand.ExecuteReader())
+                {
+                    while (reader.Read())
+                    {
+                        entries.Add("Hello World " + (int)reader[0]);
+                    }
+                }
+            }
+
+            return entries;
+        }
+
+        protected static void VerifyDatabaseTableIsEmpty()
+        {
+            using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+            {
+                sqlConnection.Open();
+                SqlCommand sqlCommand = new SqlCommand(
+                    string.Format("SELECT COUNT(*) FROM {0}", testTable),
+                    sqlConnection);
+                int count = (int)sqlCommand.ExecuteScalar();
+                Assert.AreEqual(0, count, "wrong number of rows in DB");
+            }
+        }
+
+        protected static void VerifyDatabaseTableIsFull()
+        {
+            VerifyDatabaseTableIsFull(MSG_COUNT);
+        }
+
+        protected static void VerifyDatabaseTableIsFull(int expected)
+        {
+            using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+            {
+                sqlConnection.Open();
+                SqlCommand sqlCommand = new SqlCommand(
+                    string.Format("SELECT COUNT(*) FROM {0}", testTable),
+                    sqlConnection);
+                int count = (int)sqlCommand.ExecuteScalar();
+                Assert.AreEqual(expected, count, "wrong number of rows in DB");
+            }
+        }
+
+        #endregion
+
+        #region Destination Utility Methods
+
+        protected static void DeleteDestination()
+        {
+            IConnectionFactory factory = new ConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (Connection connection = factory.CreateConnection() as Connection)
+            {
+                using (ISession session = connection.CreateSession())
+                {
+                    IQueue queue = session.GetQueue(testQueueName);
+                    try
+                    {
+                        connection.DeleteDestination(queue);
+                    }
+                    catch
+                    {
+                    }
+                }
+            }
+        }
+
+        protected static void PurgeDestination()
+        {
+            IConnectionFactory factory = new ConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (IConnection connection = factory.CreateConnection())
+            {
+                connection.Start();
+
+                using (ISession session = connection.CreateSession())
+                using (IMessageConsumer consumer = session.CreateConsumer(session.GetQueue(testQueueName)))
+                {
+                    IMessage recvd;
+                    while ((recvd = consumer.Receive(TimeSpan.FromMilliseconds(3000))) != null)
+                    {
+                        Tracer.Debug("Setup Purged Message: " + recvd);
+                    }
+                }
+            }
+        }
+
+        protected static void PurgeAndFillQueue()
+        {
+            PurgeAndFillQueue(MSG_COUNT);
+        }
+
+        protected static void PurgeAndFillQueue(int msgCount)
+        {
+            IConnectionFactory factory = new ConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (IConnection connection = factory.CreateConnection())
+            {
+                connection.Start();
+
+                using (ISession session = connection.CreateSession())
+                {
+                    IQueue queue = session.GetQueue(testQueueName);
+
+                    // empty queue
+                    using (IMessageConsumer consumer = session.CreateConsumer(queue))
+                    {
+                        while ((consumer.Receive(TimeSpan.FromMilliseconds(2000))) != null)
+                        {
+                        }
+                    }
+
+                    // enqueue several messages
+                    using (IMessageProducer producer = session.CreateProducer(queue))
+                    {
+                        producer.DeliveryMode = MsgDeliveryMode.Persistent;
+
+                        for (int i = 0; i < msgCount; i++)
+                        {
+                            producer.Send(session.CreateTextMessage(i.ToString()));
+                        }
+                    }
+                }
+            }
+        }
+
+        #endregion
+
+        #region Broker Queue State Validation Routines
+
+        protected static void VerifyBrokerQueueCountNoRecovery()
+        {
+            VerifyBrokerQueueCountNoRecovery(MSG_COUNT);
+        }
+
+        protected static void VerifyBrokerQueueCountNoRecovery(int expectedNumberOfMessages)
+        {
+            IConnectionFactory factory = new ConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (IConnection connection = factory.CreateConnection())
+            {
+                // check messages are present in the queue
+                using (ISession session = connection.CreateSession())
+                {
+                    IQueue queue = session.GetQueue(testQueueName);
+
+                    using (IQueueBrowser browser = session.CreateBrowser(queue))
+                    {
+                        connection.Start();
+                        int count = 0;
+                        IEnumerator enumerator = browser.GetEnumerator();
+
+                        while(enumerator.MoveNext())
+                        {
+                            IMessage msg = enumerator.Current as IMessage;
+                            Assert.IsNotNull(msg, "message is not in the queue !");
+                            count++;
+                        }
+
+                        // count should match the expected count
+                        Assert.AreEqual(expectedNumberOfMessages, count);
+                    }
+                }
+            }
+        }
+
+        protected static void VerifyBrokerQueueCount()
+        {
+            VerifyBrokerQueueCount(MSG_COUNT, connectionURI);
+        }
+
+        protected static void VerifyBrokerQueueCount(int expectedCount)
+        {
+            VerifyBrokerQueueCount(expectedCount, connectionURI);
+        }
+
+        protected static void VerifyBrokerQueueCount(string connectionUri)
+        {
+            VerifyBrokerQueueCount(MSG_COUNT, connectionUri);
+        }
+
+        protected static void VerifyBrokerQueueCount(int expectedCount, string connectionUri)
+        {
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionUri));
+            
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                // check messages are present in the queue
+                using (INetTxSession session = connection.CreateNetTxSession())
+                {
+                    IQueue queue = session.GetQueue(testQueueName);
+
+                    using (IQueueBrowser browser = session.CreateBrowser(queue))
+                    {
+                        connection.Start();
+                        int count = 0;
+                        IEnumerator enumerator = browser.GetEnumerator();
+
+                        while (enumerator.MoveNext())
+                        {
+                            IMessage msg = enumerator.Current as IMessage;
+                            Assert.IsNotNull(msg, "message is not in the queue !");
+                            count++;
+                        }
+
+                        // count should match the expected count
+                        Assert.AreEqual(expectedCount, count);
+                    }
+                }
+            }
+        }
+
+        protected static void VerifyNoMessagesInQueueNoRecovery()
+        {
+            VerifyBrokerQueueCountNoRecovery(0);
+        }
+
+        protected static void VerifyNoMessagesInQueue()
+        {
+            VerifyBrokerQueueCount(0);
+        }
+
+        protected static void VerifyBrokerStateNoRecover(int expectedNumberOfMessages)
+        {
+            IConnectionFactory factory = new ConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (IConnection connection = factory.CreateConnection())
+            {
+                // check messages are present in the queue
+                using (ISession session = connection.CreateSession())
+                {
+                    IDestination queue = session.GetQueue(testQueueName);
+
+                    using (IMessageConsumer consumer = session.CreateConsumer(queue))
+                    {
+                        connection.Start();
+                        IMessage msg;
+
+                        for (int i = 0; i < expectedNumberOfMessages; ++i)
+                        {
+                            msg = consumer.Receive(TimeSpan.FromMilliseconds(2000));
+                            Assert.IsNotNull(msg, "message is not in the queue !");
+                        }
+
+                        // next message should be empty
+                        msg = consumer.Receive(TimeSpan.FromMilliseconds(2000));
+                        Assert.IsNull(msg, "message found but not expected !");
+                        consumer.Close();
+                    }
+                }
+
+                connection.Close();
+            }
+        }
+
+        protected static void VerifyBrokerHasMessagesInQueue(string connectionURI)
+        {
+            INetTxConnectionFactory factory = new NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
+
+            using (INetTxConnection connection = factory.CreateNetTxConnection())
+            {
+                // check messages are present in the queue
+                using (INetTxSession session = connection.CreateNetTxSession())
+                {
+                    IDestination queue = session.GetQueue(testQueueName);
+
+                    using (IMessageConsumer consumer = session.CreateConsumer(queue))
+                    {
+                        connection.Start();
+
+                        for (int i = 0; i < MSG_COUNT; ++i)
+                        {
+                            IMessage msg = consumer.Receive(TimeSpan.FromMilliseconds(2000));
+                            Assert.IsNotNull(msg, "message is not in the queue !");
+                        }
+
+                        consumer.Close();
+                    }
+                }
+            }
+        }
+
+
+        #endregion
+
+        #region Transport Hools for controlling failure point.
+
+        public void FailOnPrepareTransportHook(ITransport transport, Command command)
+        {
+            if (command is TransactionInfo)
+            {
+                TransactionInfo txInfo = command as TransactionInfo;
+                if (txInfo.Type == (byte)TransactionType.Prepare)
+                {
+                    Thread.Sleep(1000);
+                    Tracer.Debug("Throwing Error on Prepare.");
+                    throw new Exception("Error writing Prepare command");
+                }
+            }
+        }
+
+        public void FailOnRollbackTransportHook(ITransport transport, Command command)
+        {
+            if (command is TransactionInfo)
+            {
+                TransactionInfo txInfo = command as TransactionInfo;
+                if (txInfo.Type == (byte)TransactionType.Rollback)
+                {
+                    Tracer.Debug("Throwing Error on Rollback.");
+                    throw new Exception("Error writing Rollback command");
+                }
+            }
+        }
+
+        public void FailOnCommitTransportHook(ITransport transport, Command command)
+        {
+            if (command is TransactionInfo)
+            {
+                TransactionInfo txInfo = command as TransactionInfo;
+                if (txInfo.Type == (byte)TransactionType.CommitTwoPhase)
+                {
+                    Tracer.Debug("Throwing Error on Commit.");
+                    throw new Exception("Error writing Commit command");
+                }
+            }
+        }
+
+        #endregion
+
+        #region Recovery Harness Hooks for controlling failure conditions
+
+        public void FailOnPreLogRecoveryHook(XATransactionId xid, byte[] recoveryInformatio)
+        {
+            Tracer.Debug("Throwing Error before the Recovery Information is Logged.");
+            throw new Exception("Intentional Error Logging Recovery Information");
+        }
+
+        #endregion
+
+        #region Produce Messages use cases
+
+        protected static void ReadFromDbAndProduceToQueueWithCommit(INetTxConnection connection)
+        {
+            IList entries = ExtractDataSet();
+
+            using (INetTxSession session = connection.CreateNetTxSession())
+            {
+                IQueue queue = session.GetQueue(testQueueName);
+
+                // enqueue several messages read from DB
+                try
+                {
+                    using (IMessageProducer producer = session.CreateProducer(queue))
+                    {
+                        producer.DeliveryMode = MsgDeliveryMode.Persistent;
+
+                        using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
+                        using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+                        {
+                            sqlConnection.Open();
+
+                            Assert.IsNotNull(Transaction.Current);
+
+                            Tracer.DebugFormat("Sending {0} messages to Broker in this TX", entries.Count);
+                            foreach (string textBody in entries)
+                            {
+                                producer.Send(session.CreateTextMessage(textBody));
+                            }
+
+                            using (SqlCommand sqlDeleteCommand = new SqlCommand(
+                                string.Format("DELETE FROM {0}", testTable), sqlConnection))
+                            {
+                                int count = sqlDeleteCommand.ExecuteNonQuery();
+                                Assert.AreEqual(entries.Count, count, "wrong number of rows deleted");
+                            }
+
+                            scoped.Complete();
+                        }
+                    }
+                }
+                catch (Exception e) // exception thrown in TransactionContext.Commit(Enlistment enlistment)
+                {
+                    Tracer.Debug("TX;Error from TransactionScope: " + e.Message);
+                    Tracer.Debug(e.ToString());
+                }
+            }
+        }
+
+        protected static void ReadFromDbAndProduceToQueueWithScopeAborted(INetTxConnection connection)
+        {
+            IList entries = ExtractDataSet();
+
+            using (INetTxSession session = connection.CreateNetTxSession())
+            {
+                IQueue queue = session.GetQueue(testQueueName);
+
+                // enqueue several messages read from DB
+                try
+                {
+                    using (IMessageProducer producer = session.CreateProducer(queue))
+                    {
+                        producer.DeliveryMode = MsgDeliveryMode.Persistent;
+
+                        using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
+                        using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+                        {
+                            sqlConnection.Open();
+
+                            Assert.IsNotNull(Transaction.Current);
+
+                            Tracer.DebugFormat("Sending {0} messages to Broker in this TX", entries.Count);
+                            foreach (string textBody in entries)
+                            {
+                                producer.Send(session.CreateTextMessage(textBody));
+                            }
+
+                            using (SqlCommand sqlDeleteCommand = new SqlCommand(
+                                string.Format("DELETE FROM {0}", testTable), sqlConnection))
+                            {
+                                int count = sqlDeleteCommand.ExecuteNonQuery();
+                                Assert.AreEqual(entries.Count, count, "wrong number of rows deleted");
+                            }
+                        }
+                    }
+                }
+                catch (Exception e)
+                {
+                    Tracer.Debug("TX;Error from TransactionScope: " + e.Message);
+                    Tracer.Debug(e.ToString());
+                }
+            }
+        }
+
+        #endregion
+
+        #region Consume Messages Use Cases
+
+        protected static void ReadFromQueueAndInsertIntoDbWithCommit(INetTxConnection connection)
+        {
+            using (INetTxSession session = connection.CreateNetTxSession())
+            {
+                IQueue queue = session.GetQueue(testQueueName);
+
+                // read message from queue and insert into db table
+                try
+                {
+                    using (IMessageConsumer consumer = session.CreateConsumer(queue))
+                    {
+                        using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
+                        using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+                        using (SqlCommand sqlInsertCommand = new SqlCommand())
+                        {
+                            sqlConnection.Open();
+                            sqlInsertCommand.Connection = sqlConnection;
+
+                            Assert.IsNotNull(Transaction.Current);
+
+                            for (int i = 0; i < MSG_COUNT; i++)
+                            {
+                                ITextMessage message = consumer.Receive() as ITextMessage;
+                                Assert.IsNotNull(message, "missing message");
+                                sqlInsertCommand.CommandText =
+                                    string.Format("INSERT INTO {0} VALUES ({1})", testTable, Convert.ToInt32(message.Text));
+                                sqlInsertCommand.ExecuteNonQuery();
+                            }
+
+                            scoped.Complete();
+                        }
+                    }
+                }
+                catch (Exception e)
+                {
+                    Tracer.Debug("TX;Error from TransactionScope: " + e.Message);
+                    Tracer.Debug(e.ToString());
+                }
+            }
+        }
+
+        protected static void ReadFromQueueAndInsertIntoDbWithScopeAborted(INetTxConnection connection)
+        {
+            using (INetTxSession session = connection.CreateNetTxSession())
+            {
+                IQueue queue = session.GetQueue(testQueueName);
+
+                // read message from queue and insert into db table
+                try
+                {
+                    using (IMessageConsumer consumer = session.CreateConsumer(queue))
+                    {
+                        using (TransactionScope scoped = new TransactionScope(TransactionScopeOption.RequiresNew))
+                        using (SqlConnection sqlConnection = new SqlConnection(sqlConnectionString))
+                        using (SqlCommand sqlInsertCommand = new SqlCommand())
+                        {
+                            sqlConnection.Open();
+                            sqlInsertCommand.Connection = sqlConnection;
+                            Assert.IsNotNull(Transaction.Current);
+
+                            for (int i = 0; i < MSG_COUNT; i++)
+                            {
+                                ITextMessage message = consumer.Receive() as ITextMessage;
+                                Assert.IsNotNull(message, "missing message");
+
+                                sqlInsertCommand.CommandText =
+                                    string.Format("INSERT INTO {0} VALUES ({1})", testTable, Convert.ToInt32(message.Text));
+                                sqlInsertCommand.ExecuteNonQuery();
+                            }
+                        }
+                    }
+                }
+                catch (Exception e)
+                {
+                    Tracer.Debug("TX;Error from TransactionScope: " + e.Message);
+                    Tracer.Debug(e.ToString());
+                }
+            }
+        }
+
+        #endregion
+    }
+}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/vs2008-activemq-test.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/vs2008-activemq-test.csproj?rev=1083101&r1=1083100&r2=1083101&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/vs2008-activemq-test.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/vs2008-activemq-test.csproj Fri Mar 18 23:06:32 2011
@@ -2,7 +2,7 @@
   <PropertyGroup>
     <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
     <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
-    <ProductVersion>9.0.21022</ProductVersion>
+    <ProductVersion>9.0.30729</ProductVersion>
     <SchemaVersion>2.0</SchemaVersion>
     <ProjectGuid>{EB943C69-2C9B-45E7-B95B-FB916E7057ED}</ProjectGuid>
     <OutputType>Library</OutputType>
@@ -56,6 +56,7 @@
       <HintPath>lib\NUnit\net-2.0\nunit.framework.dll</HintPath>
     </Reference>
     <Reference Include="System" />
+    <Reference Include="System.Data" />
     <Reference Include="System.Transactions" />
   </ItemGroup>
   <ItemGroup>
@@ -75,6 +76,10 @@
     <Compile Include="src\test\csharp\CommonAssemblyInfo.cs" />
     <Compile Include="src\test\csharp\ConnectionFactoryTest.cs" />
     <Compile Include="src\test\csharp\ConnectionMetaDataTest.cs" />
+    <Compile Include="src\test\csharp\DtcBasicTransactionsTest.cs" />
+    <Compile Include="src\test\csharp\DtcConsumerTransactionsTest.cs" />
+    <Compile Include="src\test\csharp\DtcProducerTransactionsTest.cs" />
+    <Compile Include="src\test\csharp\DtcTransactionsTestSupport.cs" />
     <Compile Include="src\test\csharp\MessageConsumerTest.cs" />
     <Compile Include="src\test\csharp\ExclusiveConsumerTest.cs" />
     <Compile Include="src\test\csharp\IndividualAckTest.cs" />
@@ -96,6 +101,8 @@
     <Compile Include="src\test\csharp\Threads\DedicatedTaskRunnerTest.cs" />
     <Compile Include="src\test\csharp\Threads\ThreadPoolExecutorTest.cs" />
     <Compile Include="src\test\csharp\Transactions\RecoveryFileLoggerTest.cs" />
+    <Compile Include="src\test\csharp\Transactions\RecoveryLoggerHarness.cs" />
+    <Compile Include="src\test\csharp\Transactions\RecoveryLoggerHarnessFactory.cs" />
     <Compile Include="src\test\csharp\Transport\failover\FailoverTransportTest.cs" />
     <Compile Include="src\test\csharp\Transport\Inactivity\InactivityMonitorTest.cs" />
     <Compile Include="src\test\csharp\Transport\Mock\MockTransportFactoryTest.cs" />



Mime
View raw message