activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1205157 [2/2] - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/State/ test/csharp/Transport/failover/
Date Tue, 22 Nov 2011 20:44:52 GMT
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/Tracked.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/Tracked.cs?rev=1205157&r1=1205156&r2=1205157&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/Tracked.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/Tracked.cs Tue
Nov 22 20:44:50 2011
@@ -1,49 +1,46 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using Apache.NMS.ActiveMQ.Commands;
-
-namespace Apache.NMS.ActiveMQ.State
-{
-	public class Tracked : Response
-	{
-		private ThreadSimulator runnable = null;
-
-		public Tracked(ThreadSimulator runnable)
-		{
-			this.runnable = runnable;
-		}
-
-		public void onResponses()
-		{
-			if(runnable != null)
-			{
-				runnable.Run();
-				runnable = null;
-			}
-		}
-
-		virtual public bool WaitingForResponse
-		{
-			get
-			{
-				return runnable != null;
-			}
-		}
-
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+	public class Tracked : Response
+	{
+		private ThreadSimulator runnable = null;
+
+		public Tracked(ThreadSimulator runnable)
+		{
+			this.runnable = runnable;
+		}
+
+		public void onResponses()
+		{
+			if (runnable != null)
+			{
+				runnable.Run();
+				runnable = null;
+			}
+		}
+
+		virtual public bool WaitingForResponse
+		{
+			get { return runnable != null; }
+		}
+
+	}
+}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/TransactionState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/TransactionState.cs?rev=1205157&r1=1205156&r2=1205157&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/TransactionState.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/TransactionState.cs
Tue Nov 22 20:44:50 2011
@@ -1,100 +1,96 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using Apache.NMS.ActiveMQ.Commands;
-using Apache.NMS.Util;
-
-namespace Apache.NMS.ActiveMQ.State
-{
-	public class TransactionState
-	{
-
-		private readonly List<Command> commands = new List<Command>();
-		private readonly TransactionId id;
-		private readonly Atomic<bool> _shutdown = new Atomic<bool>(false);
-		private bool prepared;
-		private int preparedResult;
-
-		public TransactionState(TransactionId id)
-		{
-			this.id = id;
-		}
-
-		public override String ToString()
-		{
-			return id.ToString();
-		}
-
-		public void addCommand(Command operation)
-		{
-			checkShutdown();
-			commands.Add(operation);
-		}
-
-		public List<Command> Commands
-		{
-			get
-			{
-				return commands;
-			}
-		}
-
-		private void checkShutdown()
-		{
-			if(_shutdown.Value)
-			{
-				throw new ApplicationException("Disposed");
-			}
-		}
-
-		public void shutdown()
-		{
-			_shutdown.Value = true;
-		}
-
-		public TransactionId getId()
-		{
-			return id;
-		}
-
-		public bool Prepared
-		{
-			get
-			{
-				return prepared;
-			}
-			set
-			{
-				prepared = value;
-			}
-		}
-
-		public int PreparedResult
-		{
-			get
-			{
-				return preparedResult;
-			}
-			set
-			{
-				preparedResult = value;
-			}
-		}
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+	public class TransactionState
+	{
+		private readonly List<Command> commands = new List<Command>();
+		private readonly TransactionId id;
+		private readonly Atomic<bool> _shutdown = new Atomic<bool>(false);
+		private bool prepared;
+		private int preparedResult;
+        private readonly AtomicDictionary<ProducerId, ProducerState> producers = new
AtomicDictionary<ProducerId, ProducerState>();
+
+		public TransactionState(TransactionId id)
+		{
+			this.id = id;
+		}
+
+		public override String ToString()
+		{
+			return id.ToString();
+		}
+
+		public void addCommand(Command operation)
+		{
+			checkShutdown();
+			commands.Add(operation);
+		}
+
+		public List<Command> Commands
+		{
+			get { return commands; }
+		}
+
+		private void checkShutdown()
+		{
+			if(_shutdown.Value)
+			{
+				throw new ApplicationException("Disposed");
+			}
+		}
+
+		public void shutdown()
+		{
+			_shutdown.Value = true;
+		}
+
+		public TransactionId getId()
+		{
+			return id;
+		}
+
+		public bool Prepared
+		{
+			get { return prepared; }
+			set { prepared = value; }
+		}
+
+		public int PreparedResult
+		{
+			get { return preparedResult; }
+			set { preparedResult = value; }
+		}
+
+        public void AddProducer(ProducerState producer)
+        {
+            this.producers.Add(producer.Info.ProducerId, producer);
+        }
+
+        public AtomicCollection<ProducerState> ProducerStates
+        {
+            get { return producers.Values; }
+        }
+
+	}
+}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs?rev=1205157&r1=1205156&r2=1205157&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs
Tue Nov 22 20:44:50 2011
@@ -94,6 +94,62 @@ namespace Apache.NMS.ActiveMQ.Test
             Assert.IsTrue(this.resumed);
         }
 
+        [Test]
+        public void FailoverWithShortLivedProducerTest()
+        {
+            string uri = "failover:(tcpfaulty://${activemqhost}:61616?transport.useLogging=true)";
+            IConnectionFactory factory = new ConnectionFactory(NMSTestSupport.ReplaceEnvVar(uri));
+            using(connection = factory.CreateConnection() as Connection)
+            {
+                connection.ConnectionInterruptedListener +=
+                    new ConnectionInterruptedListener(TransportInterrupted);
+                connection.ConnectionResumedListener +=
+                    new ConnectionResumedListener(TransportResumed);
+
+                connection.Start();
+
+                ITransport transport = (connection as Connection).ITransport;
+                TcpFaultyTransport tcpFaulty = transport.Narrow(typeof(TcpFaultyTransport))
as TcpFaultyTransport;
+                Assert.IsNotNull(tcpFaulty);
+                tcpFaulty.OnewayCommandPreProcessor += this.FailOnCommitTransportHook;
+
+                using(ISession session = connection.CreateSession())
+                {
+                    IDestination destination = session.GetQueue(destinationName);
+                    PurgeQueue(connection, destination);
+                }
+
+                Tracer.Debug("Test is putting " + MSG_COUNT + " messages on the queue: "
+ destinationName);
+
+                using(ISession session = connection.CreateSession(AcknowledgementMode.Transactional))
+                {
+                    IDestination destination = session.GetQueue(destinationName);
+                    PutMsgIntoQueue(session, destination, false);
+                    session.Commit();
+                }
+
+                Assert.IsTrue(this.interrupted);
+                Assert.IsTrue(this.resumed);
+
+                Tracer.Debug("Test is attempting to read " + MSG_COUNT +
+                             " messages from the queue: " + destinationName);
+
+                using(ISession session = connection.CreateSession())
+                {
+                    IDestination destination = session.GetQueue(destinationName);
+                    IMessageConsumer consumer = session.CreateConsumer(destination);
+                    for (int i = 0; i < MSG_COUNT; ++i)
+                    {
+                        IMessage msg = consumer.Receive(TimeSpan.FromSeconds(5));
+                        Assert.IsNotNull(msg, "Should receive message[" + (i + 1) + "] after
commit failed once.");
+                    }
+                }
+            }
+
+            Assert.IsTrue(this.interrupted);
+            Assert.IsTrue(this.resumed);
+        }
+
         public void TransportInterrupted()
         {
             this.interrupted = true;
@@ -106,6 +162,11 @@ namespace Apache.NMS.ActiveMQ.Test
 
         private void PutMsgIntoQueue(ISession session, IDestination destination)
         {
+            PutMsgIntoQueue(session, destination, true);
+        }
+
+        private void PutMsgIntoQueue(ISession session, IDestination destination, bool commit)
+        {
             using(IMessageProducer producer = session.CreateProducer(destination))
             {
                 ITextMessage message = session.CreateTextMessage();
@@ -115,7 +176,7 @@ namespace Apache.NMS.ActiveMQ.Test
                     producer.Send(message);
                 }
 
-                if (session.Transacted)
+                if (session.Transacted && commit)
                 {
                     session.Commit();
                 }



Mime
View raw message