activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1205157 [1/2] - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/State/ test/csharp/Transport/failover/
Date Tue, 22 Nov 2011 20:44:52 GMT
Author: tabish
Date: Tue Nov 22 20:44:50 2011
New Revision: 1205157

URL: http://svn.apache.org/viewvc?rev=1205157&view=rev
Log:
Fix for: https://issues.apache.org/jira/browse/AMQNET-353 

Adds additional test case for this.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConsumerState.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ProducerState.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SessionState.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SynchronizedObjects.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/Tracked.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/TransactionState.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/Transport/failover/FailoverTransactionTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs?rev=1205157&r1=1205156&r2=1205157&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs Tue Nov 22 20:44:50 2011
@@ -1,204 +1,200 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using Apache.NMS.ActiveMQ.Commands;
-using Apache.NMS.Util;
-
-namespace Apache.NMS.ActiveMQ.State
-{
-	public class ConnectionState
-	{
-
-		ConnectionInfo info;
-		private readonly AtomicDictionary<TransactionId, TransactionState> transactions = new AtomicDictionary<TransactionId, TransactionState>();
-		private readonly AtomicDictionary<SessionId, SessionState> sessions = new AtomicDictionary<SessionId, SessionState>();
-		private readonly AtomicCollection<DestinationInfo> tempDestinations = new AtomicCollection<DestinationInfo>();
-		private readonly Atomic<bool> _shutdown = new Atomic<bool>(false);
-	    private bool connectionInterruptProcessingComplete = true;
-		private readonly Dictionary<ConsumerId, ConsumerInfo> recoveringPullConsumers = new Dictionary<ConsumerId, ConsumerInfo>();
-
-		public ConnectionState(ConnectionInfo info)
-		{
-			this.info = info;
-			// Add the default session id.
-			addSession(new SessionInfo(info, -1));
-		}
-
-		public override String ToString()
-		{
-			return info.ToString();
-		}
-
-		public void reset(ConnectionInfo info)
-		{
-			this.info = info;
-			transactions.Clear();
-			sessions.Clear();
-			tempDestinations.Clear();
-			_shutdown.Value = false;
-		}
-
-		public void addTempDestination(DestinationInfo info)
-		{
-			checkShutdown();
-			tempDestinations.Add(info);
-		}
-
-		public void removeTempDestination(IDestination destination)
-		{
-			for(int i = tempDestinations.Count - 1; i >= 0; i--)
-			{
-				DestinationInfo di = tempDestinations[i];
-				if(di.Destination.Equals(destination))
-				{
-					tempDestinations.RemoveAt(i);
-				}
-			}
-		}
-
-		public void addTransactionState(TransactionId id)
-		{
-			checkShutdown();
-			transactions.Add(id, new TransactionState(id));
-		}
-
-		public TransactionState this[TransactionId id]
-		{
-			get
-			{
-				TransactionState state;
-
-				if(transactions.TryGetValue(id, out state))
-				{
-					return state;
-				}
-
-				return null;
-			}
-		}
-
-		public AtomicCollection<TransactionState> TransactionStates
-		{
-			get
-			{
-				return transactions.Values;
-			}
-		}
-
-		public SessionState this[SessionId id]
-		{
-			get
-			{
-				SessionState sessionState;
-
-				if(sessions.TryGetValue(id, out sessionState))
-				{
-					return sessionState;
-				}
-
-#if DEBUG
-				// Useful for dignosing missing session ids
-				string sessionList = string.Empty;
-				foreach(SessionId sessionId in sessions.Keys)
-				{
-					sessionList += sessionId.ToString() + "\n";
-				}
-
-				System.Diagnostics.Debug.Assert(false,
-					string.Format("Session '{0}' did not exist in the sessions collection.\n\nSessions:-\n{1}", id, sessionList));
-#endif
-				return null;
-			}
-		}
-
-		public TransactionState removeTransactionState(TransactionId id)
-		{
-			TransactionState ret = null;
-
-			transactions.TryGetValue(id, out ret);
-			transactions.Remove(id);
-			return ret;
-		}
-
-		public void addSession(SessionInfo info)
-		{
-			checkShutdown();
-			sessions.Add(info.SessionId, new SessionState(info));
-		}
-
-		public SessionState removeSession(SessionId id)
-		{
-			SessionState ret = null;
-
-			sessions.TryGetValue(id, out ret);
-			sessions.Remove(id);
-			return ret;
-		}
-
-		public ConnectionInfo Info
-		{
-			get { return info; }
-		}
-
-		public AtomicCollection<SessionId> SessionIds
-		{
-			get { return sessions.Keys; }
-		}
-
-		public AtomicCollection<DestinationInfo> TempDestinations
-		{
-			get { return tempDestinations; }
-		}
-
-		public AtomicCollection<SessionState> SessionStates
-		{
-			get { return sessions.Values; }
-		}
-
-		private void checkShutdown()
-		{
-			if(_shutdown.Value)
-			{
-				throw new ApplicationException("Disposed");
-			}
-		}
-
-		public Dictionary<ConsumerId, ConsumerInfo> RecoveringPullConsumers
-		{
-			get { return this.recoveringPullConsumers; }
-		}
-		
-		public bool ConnectionInterruptProcessingComplete
-		{
-			get { return this.connectionInterruptProcessingComplete; }
-			set { this.connectionInterruptProcessingComplete = value; }
-		}
-
-		public void shutdown()
-		{
-			if(_shutdown.CompareAndSet(false, true))
-			{
-				foreach(SessionState ss in sessions.Values)
-				{
-					ss.shutdown();
-				}
-			}
-		}
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+	public class ConnectionState
+	{
+		private ConnectionInfo info;
+		private readonly AtomicDictionary<TransactionId, TransactionState> transactions = new AtomicDictionary<TransactionId, TransactionState>();
+		private readonly AtomicDictionary<SessionId, SessionState> sessions = new AtomicDictionary<SessionId, SessionState>();
+		private readonly AtomicCollection<DestinationInfo> tempDestinations = new AtomicCollection<DestinationInfo>();
+		private readonly Atomic<bool> _shutdown = new Atomic<bool>(false);
+	    private bool connectionInterruptProcessingComplete = true;
+		private readonly Dictionary<ConsumerId, ConsumerInfo> recoveringPullConsumers = new Dictionary<ConsumerId, ConsumerInfo>();
+
+		public ConnectionState(ConnectionInfo info)
+		{
+			this.info = info;
+			// Add the default session id.
+			addSession(new SessionInfo(info, -1));
+		}
+
+		public override String ToString()
+		{
+			return info.ToString();
+		}
+
+		public void reset(ConnectionInfo info)
+		{
+			this.info = info;
+			transactions.Clear();
+			sessions.Clear();
+			tempDestinations.Clear();
+			_shutdown.Value = false;
+		}
+
+		public void addTempDestination(DestinationInfo info)
+		{
+			checkShutdown();
+			tempDestinations.Add(info);
+		}
+
+		public void removeTempDestination(IDestination destination)
+		{
+			for(int i = tempDestinations.Count - 1; i >= 0; i--)
+			{
+				DestinationInfo di = tempDestinations[i];
+				if(di.Destination.Equals(destination))
+				{
+					tempDestinations.RemoveAt(i);
+				}
+			}
+		}
+
+		public void addTransactionState(TransactionId id)
+		{
+			checkShutdown();
+			transactions.Add(id, new TransactionState(id));
+		}
+
+		public TransactionState this[TransactionId id]
+		{
+			get
+			{
+				TransactionState state;
+
+				if(transactions.TryGetValue(id, out state))
+				{
+					return state;
+				}
+
+				return null;
+			}
+		}
+
+		public AtomicCollection<TransactionState> TransactionStates
+		{
+			get { return transactions.Values; }
+		}
+
+		public SessionState this[SessionId id]
+		{
+			get
+			{
+				SessionState sessionState;
+
+				if(sessions.TryGetValue(id, out sessionState))
+				{
+					return sessionState;
+				}
+
+#if DEBUG
+				// Useful for dignosing missing session ids
+				string sessionList = string.Empty;
+				foreach(SessionId sessionId in sessions.Keys)
+				{
+					sessionList += sessionId.ToString() + "\n";
+				}
+
+				System.Diagnostics.Debug.Assert(false,
+					string.Format("Session '{0}' did not exist in the sessions collection.\n\nSessions:-\n{1}", id, sessionList));
+#endif
+				return null;
+			}
+		}
+
+		public TransactionState removeTransactionState(TransactionId id)
+		{
+			TransactionState ret = null;
+
+			transactions.TryGetValue(id, out ret);
+			transactions.Remove(id);
+			return ret;
+		}
+
+		public void addSession(SessionInfo info)
+		{
+			checkShutdown();
+			sessions.Add(info.SessionId, new SessionState(info));
+		}
+
+		public SessionState removeSession(SessionId id)
+		{
+			SessionState ret = null;
+
+			sessions.TryGetValue(id, out ret);
+			sessions.Remove(id);
+			return ret;
+		}
+
+		public ConnectionInfo Info
+		{
+			get { return info; }
+		}
+
+		public AtomicCollection<SessionId> SessionIds
+		{
+			get { return sessions.Keys; }
+		}
+
+		public AtomicCollection<DestinationInfo> TempDestinations
+		{
+			get { return tempDestinations; }
+		}
+
+		public AtomicCollection<SessionState> SessionStates
+		{
+			get { return sessions.Values; }
+		}
+
+		private void checkShutdown()
+		{
+			if(_shutdown.Value)
+			{
+				throw new ApplicationException("Disposed");
+			}
+		}
+
+		public Dictionary<ConsumerId, ConsumerInfo> RecoveringPullConsumers
+		{
+			get { return this.recoveringPullConsumers; }
+		}
+		
+		public bool ConnectionInterruptProcessingComplete
+		{
+			get { return this.connectionInterruptProcessingComplete; }
+			set { this.connectionInterruptProcessingComplete = value; }
+		}
+
+		public void shutdown()
+		{
+			if(_shutdown.CompareAndSet(false, true))
+			{
+				foreach(SessionState ss in sessions.Values)
+				{
+					ss.shutdown();
+				}
+			}
+		}
+	}
+}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs?rev=1205157&r1=1205156&r2=1205157&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs Tue Nov 22 20:44:50 2011
@@ -1,761 +1,762 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using System.Collections.Generic;
-using Apache.NMS.ActiveMQ.Commands;
-using Apache.NMS.ActiveMQ.Transport;
-using System.Collections;
-
-namespace Apache.NMS.ActiveMQ.State
-{
-    /// <summary>
-    /// Tracks the state of a connection so a newly established transport can be
-    /// re-initialized to the state that was tracked.
-    /// </summary>
-    public class ConnectionStateTracker : CommandVisitorAdapter
-    {
-        private static readonly Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
-
-        protected Dictionary<ConnectionId, ConnectionState> connectionStates = new Dictionary<ConnectionId, ConnectionState>();
-
-        private bool _trackTransactions;
-        private bool _restoreSessions = true;
-        private bool _restoreConsumers = true;
-        private bool _restoreProducers = true;
-        private bool _restoreTransaction = true;
-        private bool _trackMessages = true;
-        private int _maxCacheSize = 256;
-        private int currentCacheSize;
-        private readonly Dictionary<MessageId, Message> messageCache = new Dictionary<MessageId, Message>();
-        private readonly Queue<MessageId> messageCacheFIFO = new Queue<MessageId>();
-
-        protected void RemoveEldestInCache()
-        {
-            System.Collections.ICollection ic = messageCacheFIFO;
-            lock(ic.SyncRoot)
-            {
-                while(messageCacheFIFO.Count > MaxCacheSize)
-                {
-                    messageCache.Remove(messageCacheFIFO.Dequeue());
-                    currentCacheSize = currentCacheSize - 1;
-                }
-            }
-        }
-
-        private class RemoveTransactionAction : ThreadSimulator
-        {
-            private readonly TransactionInfo info;
-            private readonly ConnectionStateTracker cst;
-
-            public RemoveTransactionAction(TransactionInfo info, ConnectionStateTracker aCst)
-            {
-                this.info = info;
-                this.cst = aCst;
-            }
-
-            public override void Run()
-            {
-                ConnectionState cs;
-
-				if(cst.connectionStates.TryGetValue(info.ConnectionId, out cs))
-				{
-					cs.removeTransactionState(info.TransactionId);
-				}
-            }
-        }
-
-        /// <summary>
-        /// </summary>
-        /// <param name="command"></param>
-        /// <returns>null if the command is not state tracked.</returns>
-        public Tracked Track(Command command)
-        {
-            try
-            {
-                return (Tracked) command.visit(this);
-            }
-            catch(IOException e)
-            {
-                throw e;
-            }
-            catch(Exception e)
-            {
-                throw new IOException(e.Message);
-            }
-        }
-
-        public void TrackBack(Command command)
-        {
-            if(TrackMessages && command != null && command.IsMessage)
-            {
-                Message message = (Message) command;
-                if(message.TransactionId == null)
-                {
-                    currentCacheSize = currentCacheSize + 1;
-                }
-            }
-        }
-
-        public void DoRestore(ITransport transport)
-        {
-            // Restore the connections.
-            foreach(ConnectionState connectionState in connectionStates.Values)
-            {
-                ConnectionInfo info = connectionState.Info;
-                info.FailoverReconnect = true;
-                transport.Oneway(info);
-
-                DoRestoreTempDestinations(transport, connectionState);
-
-                if(RestoreSessions)
-                {
-                    DoRestoreSessions(transport, connectionState);
-                }
-
-                if(RestoreTransaction)
-                {
-                    DoRestoreTransactions(transport, connectionState);
-                }
-            }
-            //now flush messages
-            foreach(Message msg in messageCache.Values)
-            {
-                transport.Oneway(msg);
-            }
-        }
-
-        private void DoRestoreTransactions(ITransport transport, ConnectionState connectionState)
-        {
-            AtomicCollection<TransactionState> transactionStates = connectionState.TransactionStates;
-            foreach(TransactionState transactionState in transactionStates)
-            {
-                foreach(Command command in transactionState.Commands)
-                {
-                    transport.Oneway(command);
-                }
-            }
-        }
-
-        /// <summary>
-        /// </summary>
-        /// <param name="transport"></param>
-        /// <param name="connectionState"></param>
-        protected void DoRestoreSessions(ITransport transport, ConnectionState connectionState)
-        {
-            // Restore the connection's sessions
-            foreach(SessionState sessionState in connectionState.SessionStates)
-            {
-                transport.Oneway(sessionState.Info);
-
-                if(RestoreProducers)
-                {
-                    DoRestoreProducers(transport, sessionState);
-                }
-
-                if(RestoreConsumers)
-                {
-                    DoRestoreConsumers(transport, sessionState);
-                }
-            }
-        }
-
-        /// <summary>
-        /// </summary>
-        /// <param name="transport"></param>
-        /// <param name="sessionState"></param>
-        protected void DoRestoreConsumers(ITransport transport, SessionState sessionState)
-        {
-            // Restore the session's consumers but possibly in pull only (prefetch 0 state) till
-            // recovery completes.
-
-			ConnectionState connectionState = null;
-			bool connectionInterruptionProcessingComplete = false;
-
-			if(connectionStates.TryGetValue(sessionState.Info.SessionId.ParentId, out connectionState))
-			{
-				connectionInterruptionProcessingComplete = connectionState.ConnectionInterruptProcessingComplete;
-			}
-			
-			// Restore the session's consumers
-            foreach(ConsumerState consumerState in sessionState.ConsumerStates)
-            {
-                ConsumerInfo infoToSend = consumerState.Info;
-
-                if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize > 0 && transport.WireFormat.Version > 5)
-                {
-                    infoToSend = consumerState.Info.Clone() as ConsumerInfo;
-					lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
-					{
-						if(!connectionState.RecoveringPullConsumers.ContainsKey(infoToSend.ConsumerId))
-						{
-							connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);
-						}
-					}
-                    infoToSend.PrefetchSize = 0;
-                    if(Tracer.IsDebugEnabled)
-                    {
-                        Tracer.Debug("restore consumer: " + infoToSend.ConsumerId +
-                                     " in pull mode pending recovery, overriding prefetch: " +
-                                     consumerState.Info.PrefetchSize);
-                    }
-                }
-
-                if(Tracer.IsDebugEnabled)
-                {
-                    Tracer.Debug("restore consumer: " + infoToSend.ConsumerId);
-                }
-
-                transport.Oneway(infoToSend);
-            }
-        }
-
-        /// <summary>
-        /// </summary>
-        /// <param name="transport"></param>
-        /// <param name="sessionState"></param>
-        protected void DoRestoreProducers(ITransport transport, SessionState sessionState)
-        {
-            // Restore the session's producers
-            foreach(ProducerState producerState in sessionState.ProducerStates)
-            {
-                transport.Oneway(producerState.Info);
-            }
-        }
-
-        /// <summary>
-        /// </summary>
-        /// <param name="transport"></param>
-        /// <param name="connectionState"></param>
-        protected void DoRestoreTempDestinations(ITransport transport, ConnectionState connectionState)
-        {
-            // Restore the connection's temp destinations.
-            foreach(DestinationInfo destinationInfo in connectionState.TempDestinations)
-            {
-                transport.Oneway(destinationInfo);
-            }
-        }
-
-        public override Response processAddDestination(DestinationInfo info)
-        {
-            if(info != null && info.Destination.IsTemporary)
-            {
-                ConnectionState cs;
-
-				if(connectionStates.TryGetValue(info.ConnectionId, out cs))
-				{
-					cs.addTempDestination(info);
-				}
-            }
-            return TRACKED_RESPONSE_MARKER;
-        }
-
-        public override Response processRemoveDestination(DestinationInfo info)
-        {
-            if(info != null && info.Destination.IsTemporary)
-            {
-                ConnectionState cs;
-				if(connectionStates.TryGetValue(info.ConnectionId, out cs))
-				{
-                    cs.removeTempDestination(info.Destination);
-                }
-            }
-            return TRACKED_RESPONSE_MARKER;
-        }
-
-        public override Response processAddProducer(ProducerInfo info)
-        {
-            if(info != null && info.ProducerId != null)
-            {
-                SessionId sessionId = info.ProducerId.ParentId;
-                if(sessionId != null)
-                {
-                    ConnectionId connectionId = sessionId.ParentId;
-                    if(connectionId != null)
-                    {
-                        ConnectionState cs;
-						
-						if(connectionStates.TryGetValue(connectionId, out cs))
-                        {
-                            SessionState ss = cs[sessionId];
-                            if(ss != null)
-                            {
-                                ss.addProducer(info);
-                            }
-                        }
-                    }
-                }
-            }
-            return TRACKED_RESPONSE_MARKER;
-        }
-
-        public override Response processRemoveProducer(ProducerId id)
-        {
-            if(id != null)
-            {
-                SessionId sessionId = id.ParentId;
-                if(sessionId != null)
-                {
-                    ConnectionId connectionId = sessionId.ParentId;
-                    if(connectionId != null)
-                    {
-						ConnectionState cs = null;
-						
-						if(connectionStates.TryGetValue(connectionId, out cs))
-                        {
-                            SessionState ss = cs[sessionId];
-                            if(ss != null)
-                            {
-                                ss.removeProducer(id);
-                            }
-                        }
-                    }
-                }
-            }
-            return TRACKED_RESPONSE_MARKER;
-        }
-
-        public override Response processAddConsumer(ConsumerInfo info)
-        {
-            if(info != null)
-            {
-                SessionId sessionId = info.ConsumerId.ParentId;
-                if(sessionId != null)
-                {
-                    ConnectionId connectionId = sessionId.ParentId;
-                    if(connectionId != null)
-                    {
-						ConnectionState cs = null;
-
-						if(connectionStates.TryGetValue(connectionId, out cs))
-                        {
-                            SessionState ss = cs[sessionId];
-                            if(ss != null)
-                            {
-                                ss.addConsumer(info);
-                            }
-                        }
-                    }
-                }
-            }
-            return TRACKED_RESPONSE_MARKER;
-        }
-
-        public override Response processRemoveConsumer(ConsumerId id)
-        {
-            if(id != null)
-            {
-                SessionId sessionId = id.ParentId;
-                if(sessionId != null)
-                {
-                    ConnectionId connectionId = sessionId.ParentId;
-                    if(connectionId != null)
-                    {
-						ConnectionState cs = null;
-
-						if(connectionStates.TryGetValue(connectionId, out cs))
-                        {
-                            SessionState ss = cs[sessionId];
-                            if(ss != null)
-                            {
-                                ss.removeConsumer(id);
-                            }
-                        }
-                    }
-                }
-            }
-            return TRACKED_RESPONSE_MARKER;
-        }
-
-        public override Response processAddSession(SessionInfo info)
-        {
-            if(info != null)
-            {
-                ConnectionId connectionId = info.SessionId.ParentId;
-                if(connectionId != null)
-                {
-					ConnectionState cs = null;
-
-					if(connectionStates.TryGetValue(connectionId, out cs))
-                    {
-                        cs.addSession(info);
-                    }
-                }
-            }
-            return TRACKED_RESPONSE_MARKER;
-        }
-
-        public override Response processRemoveSession(SessionId id)
-        {
-            if(id != null)
-            {
-                ConnectionId connectionId = id.ParentId;
-                if(connectionId != null)
-                {
-					ConnectionState cs = null;
-
-					if(connectionStates.TryGetValue(connectionId, out cs))
-                    {
-                        cs.removeSession(id);
-                    }
-                }
-            }
-            return TRACKED_RESPONSE_MARKER;
-        }
-
-        public override Response processAddConnection(ConnectionInfo info)
-        {
-            if(info != null)
-            {
-                connectionStates.Add(info.ConnectionId, new ConnectionState(info));
-            }
-            return TRACKED_RESPONSE_MARKER;
-        }
-
-        public override Response processRemoveConnection(ConnectionId id)
-        {
-            if(id != null)
-            {
-                connectionStates.Remove(id);
-            }
-            return TRACKED_RESPONSE_MARKER;
-        }
-
-        public override Response processMessage(Message send)
-        {
-            if(send != null)
-            {
-                if(TrackTransactions && send.TransactionId != null)
-                {
-                    ConnectionId connectionId = send.ProducerId.ParentId.ParentId;
-                    if(connectionId != null)
-                    {
-						ConnectionState cs = null;
-
-						if(connectionStates.TryGetValue(connectionId, out cs))
-                        {
-                            TransactionState transactionState = cs[send.TransactionId];
-                            if(transactionState != null)
-                            {
-                                transactionState.addCommand(send);
-                            }
-                        }
-                    }
-                    return TRACKED_RESPONSE_MARKER;
-                }
-                else if(TrackMessages)
-                {
-                    messageCache.Add(send.MessageId, (Message) send.Clone());
-                    RemoveEldestInCache();
-                }
-            }
-            return null;
-        }
-
-        public override Response processMessageAck(MessageAck ack)
-        {
-            if(TrackTransactions && ack != null && ack.TransactionId != null)
-            {
-                ConnectionId connectionId = ack.ConsumerId.ParentId.ParentId;
-                if(connectionId != null)
-                {
-					ConnectionState cs = null;
-
-					if(connectionStates.TryGetValue(connectionId, out cs))
-                    {
-                        TransactionState transactionState = cs[ack.TransactionId];
-                        if(transactionState != null)
-                        {
-                            transactionState.addCommand(ack);
-                        }
-                    }
-                }
-                return TRACKED_RESPONSE_MARKER;
-            }
-            return null;
-        }
-
-        public override Response processBeginTransaction(TransactionInfo info)
-        {
-            if(TrackTransactions && info != null && info.TransactionId != null)
-            {
-                ConnectionId connectionId = info.ConnectionId;
-                if(connectionId != null)
-                {
-					ConnectionState cs = null;
-
-					if(connectionStates.TryGetValue(connectionId, out cs))
-                    {
-                        cs.addTransactionState(info.TransactionId);
-                        TransactionState state = cs[info.TransactionId];
-                        state.addCommand(info);
-                    }
-                }
-                return TRACKED_RESPONSE_MARKER;
-            }
-            return null;
-        }
-
-        public override Response processPrepareTransaction(TransactionInfo info)
-        {
-            if(TrackTransactions && info != null)
-            {
-                ConnectionId connectionId = info.ConnectionId;
-                if(connectionId != null)
-                {
-					ConnectionState cs = null;
-
-					if(connectionStates.TryGetValue(connectionId, out cs))
-                    {
-                        TransactionState transactionState = cs[info.TransactionId];
-                        if(transactionState != null)
-                        {
-                            transactionState.addCommand(info);
-                        }
-                    }
-                }
-                return TRACKED_RESPONSE_MARKER;
-            }
-            return null;
-        }
-
-        public override Response processCommitTransactionOnePhase(TransactionInfo info)
-        {
-            if(TrackTransactions && info != null)
-            {
-                ConnectionId connectionId = info.ConnectionId;
-                if(connectionId != null)
-                {
-					ConnectionState cs = null;
-
-					if(connectionStates.TryGetValue(connectionId, out cs))
-                    {
-                        TransactionState transactionState = cs[info.TransactionId];
-                        if(transactionState != null)
-                        {
-                            transactionState.addCommand(info);
-                            return new Tracked(new RemoveTransactionAction(info, this));
-                        }
-                    }
-                }
-            }
-            return null;
-        }
-
-        public override Response processCommitTransactionTwoPhase(TransactionInfo info)
-        {
-            if(TrackTransactions && info != null)
-            {
-                ConnectionId connectionId = info.ConnectionId;
-                if(connectionId != null)
-                {
-					ConnectionState cs = null;
-
-                    if(cs != null)
-                    {
-                        TransactionState transactionState = cs[info.TransactionId];
-                        if(transactionState != null)
-                        {
-                            transactionState.addCommand(info);
-                            return new Tracked(new RemoveTransactionAction(info, this));
-                        }
-                    }
-                }
-            }
-            return null;
-        }
-
-        public override Response processRollbackTransaction(TransactionInfo info)
-        {
-            if(TrackTransactions && info != null)
-            {
-                ConnectionId connectionId = info.ConnectionId;
-                if(connectionId != null)
-                {
-					ConnectionState cs = null;
-
-					if(connectionStates.TryGetValue(connectionId, out cs))
-                    {
-                        TransactionState transactionState = cs[info.TransactionId];
-                        if(transactionState != null)
-                        {
-                            transactionState.addCommand(info);
-                            return new Tracked(new RemoveTransactionAction(info, this));
-                        }
-                    }
-                }
-            }
-            return null;
-        }
-
-        public override Response processEndTransaction(TransactionInfo info)
-        {
-            if(TrackTransactions && info != null)
-            {
-                ConnectionId connectionId = info.ConnectionId;
-                if(connectionId != null)
-                {
-					ConnectionState cs = null;
-
-					if(connectionStates.TryGetValue(connectionId, out cs))
-                    {
-                        TransactionState transactionState = cs[info.TransactionId];
-                        if(transactionState != null)
-                        {
-                            transactionState.addCommand(info);
-                        }
-                    }
-                }
-                return TRACKED_RESPONSE_MARKER;
-            }
-            return null;
-        }
-
-        public bool RestoreConsumers
-        {
-            get
-            {
-                return _restoreConsumers;
-            }
-            set
-            {
-                _restoreConsumers = value;
-            }
-        }
-
-        public bool RestoreProducers
-        {
-            get
-            {
-                return _restoreProducers;
-            }
-            set
-            {
-                _restoreProducers = value;
-            }
-        }
-
-        public bool RestoreSessions
-        {
-            get
-            {
-                return _restoreSessions;
-            }
-            set
-            {
-                _restoreSessions = value;
-            }
-        }
-
-        public bool TrackTransactions
-        {
-            get
-            {
-                return _trackTransactions;
-            }
-            set
-            {
-                _trackTransactions = value;
-            }
-        }
-
-        public bool RestoreTransaction
-        {
-            get
-            {
-                return _restoreTransaction;
-            }
-            set
-            {
-                _restoreTransaction = value;
-            }
-        }
-
-        public bool TrackMessages
-        {
-            get
-            {
-                return _trackMessages;
-            }
-            set
-            {
-                _trackMessages = value;
-            }
-        }
-
-        public int MaxCacheSize
-        {
-            get
-            {
-                return _maxCacheSize;
-            }
-            set
-            {
-                _maxCacheSize = value;
-            }
-        }
-
-        public void ConnectionInterruptProcessingComplete(ITransport transport, ConnectionId connectionId)
-        {
-			ConnectionState connectionState = null;
-
-			if(connectionStates.TryGetValue(connectionId, out connectionState))
-            {
-                connectionState.ConnectionInterruptProcessingComplete = true;
-
-				lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
-				{
-					foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in connectionState.RecoveringPullConsumers)
-					{
-						ConsumerControl control = new ConsumerControl();
-						control.ConsumerId = entry.Key;
-						control.Prefetch = entry.Value.PrefetchSize;
-						control.Destination = entry.Value.Destination;
-						try
-						{
-							if(Tracer.IsDebugEnabled)
-							{
-								Tracer.Debug("restored recovering consumer: " + control.ConsumerId +
-											 " with: " + control.Prefetch);
-							}
-							transport.Oneway(control);
-						}
-						catch(Exception ex)
-						{
-							if(Tracer.IsDebugEnabled)
-							{
-								Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId +
-											 " with: " + control.Prefetch + "Error: " + ex.Message);
-							}
-						}
-					}
-					connectionState.RecoveringPullConsumers.Clear();
-				}
-            }
-        }
-
-        public void TransportInterrupted(ConnectionId id)
-        {
-			ConnectionState connection = null;
-
-			if(connectionStates.TryGetValue(id, out connection))
-            {
-                connection.ConnectionInterruptProcessingComplete = false;
-            }
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.Transport;
+using System.Collections;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+    /// <summary>
+    /// Tracks the state of a connection so a newly established transport can be
+    /// re-initialized to the state that was tracked.
+    /// </summary>
+    public class ConnectionStateTracker : CommandVisitorAdapter
+    {
+        private static readonly Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
+
+        protected Dictionary<ConnectionId, ConnectionState> connectionStates = new Dictionary<ConnectionId, ConnectionState>();
+
+        private bool _trackTransactions;
+        private bool _trackTransactionProducers = true;
+        private bool _restoreSessions = true;
+        private bool _restoreConsumers = true;
+        private bool _restoreProducers = true;
+        private bool _restoreTransaction = true;
+        private bool _trackMessages = true;
+        private int _maxCacheSize = 256;
+        private int currentCacheSize;
+        private readonly Dictionary<MessageId, Message> messageCache = new Dictionary<MessageId, Message>();
+        private readonly Queue<MessageId> messageCacheFIFO = new Queue<MessageId>();
+
+        protected void RemoveEldestInCache()
+        {
+            System.Collections.ICollection ic = messageCacheFIFO;
+            lock(ic.SyncRoot)
+            {
+                while(messageCacheFIFO.Count > MaxCacheSize)
+                {
+                    messageCache.Remove(messageCacheFIFO.Dequeue());
+                    currentCacheSize = currentCacheSize - 1;
+                }
+            }
+        }
+
+        private class RemoveTransactionAction : ThreadSimulator
+        {
+            private readonly TransactionInfo info;
+            private readonly ConnectionStateTracker cst;
+
+            public RemoveTransactionAction(TransactionInfo info, ConnectionStateTracker aCst)
+            {
+                this.info = info;
+                this.cst = aCst;
+            }
+
+            public override void Run()
+            {
+                ConnectionState cs;
+
+				if(cst.connectionStates.TryGetValue(info.ConnectionId, out cs))
+				{
+					cs.removeTransactionState(info.TransactionId);
+				}
+            }
+        }
+
+        /// <summary>
+        /// </summary>
+        /// <param name="command"></param>
+        /// <returns>null if the command is not state tracked.</returns>
+        public Tracked Track(Command command)
+        {
+            try
+            {
+                return (Tracked) command.visit(this);
+            }
+            catch(IOException e)
+            {
+                throw e;
+            }
+            catch(Exception e)
+            {
+                throw new IOException(e.Message);
+            }
+        }
+
+        public void TrackBack(Command command)
+        {
+            if(TrackMessages && command != null && command.IsMessage)
+            {
+                Message message = (Message) command;
+                if(message.TransactionId == null)
+                {
+                    currentCacheSize = currentCacheSize + 1;
+                }
+            }
+        }
+
+        public void DoRestore(ITransport transport)
+        {
+            // Restore the connections.
+            foreach(ConnectionState connectionState in connectionStates.Values)
+            {
+                ConnectionInfo info = connectionState.Info;
+                info.FailoverReconnect = true;
+                transport.Oneway(info);
+
+                DoRestoreTempDestinations(transport, connectionState);
+
+                if(RestoreSessions)
+                {
+                    DoRestoreSessions(transport, connectionState);
+                }
+
+                if(RestoreTransaction)
+                {
+                    DoRestoreTransactions(transport, connectionState);
+                }
+            }
+            //now flush messages
+            foreach(Message msg in messageCache.Values)
+            {
+                transport.Oneway(msg);
+            }
+        }
+
+        private void DoRestoreTransactions(ITransport transport, ConnectionState connectionState)
+        {
+            AtomicCollection<TransactionState> transactionStates = connectionState.TransactionStates;
+
+            foreach(TransactionState transactionState in transactionStates)
+            {
+                // replay the add and remove of short lived producers that may have been
+                // involved in the transaction
+                foreach (ProducerState producerState in transactionState.ProducerStates)
+                {
+                    if (Tracer.IsDebugEnabled)
+                    {
+                        Tracer.Debug("tx replay producer :" + producerState.Info);
+                    }
+                    transport.Oneway(producerState.Info);
+                }
+    
+                foreach (Command command in transactionState.Commands)
+                {
+                    if (Tracer.IsDebugEnabled)
+                    {
+                        Tracer.Debug("tx replay: " + command);
+                    }
+                    transport.Oneway(command);
+                }
+    
+                foreach (ProducerState producerState in transactionState.ProducerStates)
+                {
+                    if (Tracer.IsDebugEnabled)
+                    {
+                        Tracer.Debug("tx remove replayed producer :" + producerState.Info);
+                    }
+
+                    RemoveInfo producerRemove = new RemoveInfo();
+                    producerRemove.ObjectId = producerState.Info.ProducerId;
+                    transport.Oneway(producerRemove);
+                }
+            }
+        }
+
+        /// <summary>
+        /// </summary>
+        /// <param name="transport"></param>
+        /// <param name="connectionState"></param>
+        protected void DoRestoreSessions(ITransport transport, ConnectionState connectionState)
+        {
+            // Restore the connection's sessions
+            foreach(SessionState sessionState in connectionState.SessionStates)
+            {
+                transport.Oneway(sessionState.Info);
+
+                if(RestoreProducers)
+                {
+                    DoRestoreProducers(transport, sessionState);
+                }
+
+                if(RestoreConsumers)
+                {
+                    DoRestoreConsumers(transport, sessionState);
+                }
+            }
+        }
+
+        /// <summary>
+        /// </summary>
+        /// <param name="transport"></param>
+        /// <param name="sessionState"></param>
+        protected void DoRestoreConsumers(ITransport transport, SessionState sessionState)
+        {
+            // Restore the session's consumers but possibly in pull only (prefetch 0 state) till
+            // recovery completes.
+
+			ConnectionState connectionState = null;
+			bool connectionInterruptionProcessingComplete = false;
+
+			if(connectionStates.TryGetValue(sessionState.Info.SessionId.ParentId, out connectionState))
+			{
+				connectionInterruptionProcessingComplete = connectionState.ConnectionInterruptProcessingComplete;
+			}
+			
+			// Restore the session's consumers
+            foreach(ConsumerState consumerState in sessionState.ConsumerStates)
+            {
+                ConsumerInfo infoToSend = consumerState.Info;
+
+                if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize > 0 && transport.WireFormat.Version > 5)
+                {
+                    infoToSend = consumerState.Info.Clone() as ConsumerInfo;
+					lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
+					{
+						if(!connectionState.RecoveringPullConsumers.ContainsKey(infoToSend.ConsumerId))
+						{
+							connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info);
+						}
+					}
+                    infoToSend.PrefetchSize = 0;
+                    if(Tracer.IsDebugEnabled)
+                    {
+                        Tracer.Debug("restore consumer: " + infoToSend.ConsumerId +
+                                     " in pull mode pending recovery, overriding prefetch: " +
+                                     consumerState.Info.PrefetchSize);
+                    }
+                }
+
+                if(Tracer.IsDebugEnabled)
+                {
+                    Tracer.Debug("restore consumer: " + infoToSend.ConsumerId);
+                }
+
+                transport.Oneway(infoToSend);
+            }
+        }
+
+        /// <summary>
+        /// </summary>
+        /// <param name="transport"></param>
+        /// <param name="sessionState"></param>
+        protected void DoRestoreProducers(ITransport transport, SessionState sessionState)
+        {
+            // Restore the session's producers
+            foreach(ProducerState producerState in sessionState.ProducerStates)
+            {
+                transport.Oneway(producerState.Info);
+            }
+        }
+
+        /// <summary>
+        /// </summary>
+        /// <param name="transport"></param>
+        /// <param name="connectionState"></param>
+        protected void DoRestoreTempDestinations(ITransport transport, ConnectionState connectionState)
+        {
+            // Restore the connection's temp destinations.
+            foreach(DestinationInfo destinationInfo in connectionState.TempDestinations)
+            {
+                transport.Oneway(destinationInfo);
+            }
+        }
+
+        public override Response processAddDestination(DestinationInfo info)
+        {
+            if(info != null && info.Destination.IsTemporary)
+            {
+                ConnectionState cs;
+
+				if(connectionStates.TryGetValue(info.ConnectionId, out cs))
+				{
+					cs.addTempDestination(info);
+				}
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+
+        public override Response processRemoveDestination(DestinationInfo info)
+        {
+            if(info != null && info.Destination.IsTemporary)
+            {
+                ConnectionState cs;
+				if(connectionStates.TryGetValue(info.ConnectionId, out cs))
+				{
+                    cs.removeTempDestination(info.Destination);
+                }
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+
+        public override Response processAddProducer(ProducerInfo info)
+        {
+            if(info != null && info.ProducerId != null)
+            {
+                SessionId sessionId = info.ProducerId.ParentId;
+                if(sessionId != null)
+                {
+                    ConnectionId connectionId = sessionId.ParentId;
+                    if(connectionId != null)
+                    {
+                        ConnectionState cs;
+						
+						if(connectionStates.TryGetValue(connectionId, out cs))
+                        {
+                            SessionState ss = cs[sessionId];
+                            if(ss != null)
+                            {
+                                ss.addProducer(info);
+                            }
+                        }
+                    }
+                }
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+
+        public override Response processRemoveProducer(ProducerId id)
+        {
+            if(id != null)
+            {
+                SessionId sessionId = id.ParentId;
+                if(sessionId != null)
+                {
+                    ConnectionId connectionId = sessionId.ParentId;
+                    if(connectionId != null)
+                    {
+						ConnectionState cs = null;
+						
+						if(connectionStates.TryGetValue(connectionId, out cs))
+                        {
+                            SessionState ss = cs[sessionId];
+                            if(ss != null)
+                            {
+                                ss.removeProducer(id);
+                            }
+                        }
+                    }
+                }
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+
+        public override Response processAddConsumer(ConsumerInfo info)
+        {
+            if(info != null)
+            {
+                SessionId sessionId = info.ConsumerId.ParentId;
+                if(sessionId != null)
+                {
+                    ConnectionId connectionId = sessionId.ParentId;
+                    if(connectionId != null)
+                    {
+						ConnectionState cs = null;
+
+						if(connectionStates.TryGetValue(connectionId, out cs))
+                        {
+                            SessionState ss = cs[sessionId];
+                            if(ss != null)
+                            {
+                                ss.addConsumer(info);
+                            }
+                        }
+                    }
+                }
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+
+        public override Response processRemoveConsumer(ConsumerId id)
+        {
+            if(id != null)
+            {
+                SessionId sessionId = id.ParentId;
+                if(sessionId != null)
+                {
+                    ConnectionId connectionId = sessionId.ParentId;
+                    if(connectionId != null)
+                    {
+						ConnectionState cs = null;
+
+						if(connectionStates.TryGetValue(connectionId, out cs))
+                        {
+                            SessionState ss = cs[sessionId];
+                            if(ss != null)
+                            {
+                                ss.removeConsumer(id);
+                            }
+                        }
+                    }
+                }
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+
+        public override Response processAddSession(SessionInfo info)
+        {
+            if(info != null)
+            {
+                ConnectionId connectionId = info.SessionId.ParentId;
+                if(connectionId != null)
+                {
+					ConnectionState cs = null;
+
+					if(connectionStates.TryGetValue(connectionId, out cs))
+                    {
+                        cs.addSession(info);
+                    }
+                }
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+
+        public override Response processRemoveSession(SessionId id)
+        {
+            if(id != null)
+            {
+                ConnectionId connectionId = id.ParentId;
+                if(connectionId != null)
+                {
+					ConnectionState cs = null;
+
+					if(connectionStates.TryGetValue(connectionId, out cs))
+                    {
+                        cs.removeSession(id);
+                    }
+                }
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+
+        public override Response processAddConnection(ConnectionInfo info)
+        {
+            if(info != null)
+            {
+                connectionStates.Add(info.ConnectionId, new ConnectionState(info));
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+
+        public override Response processRemoveConnection(ConnectionId id)
+        {
+            if(id != null)
+            {
+                connectionStates.Remove(id);
+            }
+            return TRACKED_RESPONSE_MARKER;
+        }
+
+        public override Response processMessage(Message send)
+        {
+            if(send != null)
+            {
+                if(TrackTransactions && send.TransactionId != null)
+                {
+                    ProducerId producerId = send.ProducerId;
+                    ConnectionId connectionId = producerId.ParentId.ParentId;
+                    if(connectionId != null)
+                    {
+						ConnectionState cs = null;
+
+						if(connectionStates.TryGetValue(connectionId, out cs))
+                        {
+                            TransactionState transactionState = cs[send.TransactionId];
+                            if(transactionState != null)
+                            {
+                                transactionState.addCommand(send);
+
+                                if (_trackTransactionProducers)
+                                {
+                                    SessionState ss = cs[producerId.ParentId];
+                                    ProducerState producerState = ss[producerId];
+                                    producerState.TransactionState = transactionState;
+                                }
+                            }
+                        }
+                    }
+                    return TRACKED_RESPONSE_MARKER;
+                }
+                else if(TrackMessages)
+                {
+                    messageCache.Add(send.MessageId, (Message) send.Clone());
+                    RemoveEldestInCache();
+                }
+            }
+            return null;
+        }
+
+        public override Response processMessageAck(MessageAck ack)
+        {
+            if(TrackTransactions && ack != null && ack.TransactionId != null)
+            {
+                ConnectionId connectionId = ack.ConsumerId.ParentId.ParentId;
+                if(connectionId != null)
+                {
+					ConnectionState cs = null;
+
+					if(connectionStates.TryGetValue(connectionId, out cs))
+                    {
+                        TransactionState transactionState = cs[ack.TransactionId];
+                        if(transactionState != null)
+                        {
+                            transactionState.addCommand(ack);
+                        }
+                    }
+                }
+                return TRACKED_RESPONSE_MARKER;
+            }
+            return null;
+        }
+
+        public override Response processBeginTransaction(TransactionInfo info)
+        {
+            if(TrackTransactions && info != null && info.TransactionId != null)
+            {
+                ConnectionId connectionId = info.ConnectionId;
+                if(connectionId != null)
+                {
+					ConnectionState cs = null;
+
+					if(connectionStates.TryGetValue(connectionId, out cs))
+                    {
+                        cs.addTransactionState(info.TransactionId);
+                        TransactionState state = cs[info.TransactionId];
+                        state.addCommand(info);
+                    }
+                }
+                return TRACKED_RESPONSE_MARKER;
+            }
+            return null;
+        }
+
+        public override Response processPrepareTransaction(TransactionInfo info)
+        {
+            if(TrackTransactions && info != null)
+            {
+                ConnectionId connectionId = info.ConnectionId;
+                if(connectionId != null)
+                {
+					ConnectionState cs = null;
+
+					if(connectionStates.TryGetValue(connectionId, out cs))
+                    {
+                        TransactionState transactionState = cs[info.TransactionId];
+                        if(transactionState != null)
+                        {
+                            transactionState.addCommand(info);
+                        }
+                    }
+                }
+                return TRACKED_RESPONSE_MARKER;
+            }
+            return null;
+        }
+
+        public override Response processCommitTransactionOnePhase(TransactionInfo info)
+        {
+            if(TrackTransactions && info != null)
+            {
+                ConnectionId connectionId = info.ConnectionId;
+                if(connectionId != null)
+                {
+					ConnectionState cs = null;
+
+					if(connectionStates.TryGetValue(connectionId, out cs))
+                    {
+                        TransactionState transactionState = cs[info.TransactionId];
+                        if(transactionState != null)
+                        {
+                            transactionState.addCommand(info);
+                            return new Tracked(new RemoveTransactionAction(info, this));
+                        }
+                    }
+                }
+            }
+            return null;
+        }
+
+        public override Response processCommitTransactionTwoPhase(TransactionInfo info)
+        {
+            if(TrackTransactions && info != null)
+            {
+                ConnectionId connectionId = info.ConnectionId;
+                if(connectionId != null)
+                {
+					ConnectionState cs = null;
+
+                    if(cs != null)
+                    {
+                        TransactionState transactionState = cs[info.TransactionId];
+                        if(transactionState != null)
+                        {
+                            transactionState.addCommand(info);
+                            return new Tracked(new RemoveTransactionAction(info, this));
+                        }
+                    }
+                }
+            }
+            return null;
+        }
+
+        public override Response processRollbackTransaction(TransactionInfo info)
+        {
+            if(TrackTransactions && info != null)
+            {
+                ConnectionId connectionId = info.ConnectionId;
+                if(connectionId != null)
+                {
+					ConnectionState cs = null;
+
+					if(connectionStates.TryGetValue(connectionId, out cs))
+                    {
+                        TransactionState transactionState = cs[info.TransactionId];
+                        if(transactionState != null)
+                        {
+                            transactionState.addCommand(info);
+                            return new Tracked(new RemoveTransactionAction(info, this));
+                        }
+                    }
+                }
+            }
+            return null;
+        }
+
+        public override Response processEndTransaction(TransactionInfo info)
+        {
+            if(TrackTransactions && info != null)
+            {
+                ConnectionId connectionId = info.ConnectionId;
+                if(connectionId != null)
+                {
+					ConnectionState cs = null;
+
+					if(connectionStates.TryGetValue(connectionId, out cs))
+                    {
+                        TransactionState transactionState = cs[info.TransactionId];
+                        if(transactionState != null)
+                        {
+                            transactionState.addCommand(info);
+                        }
+                    }
+                }
+                return TRACKED_RESPONSE_MARKER;
+            }
+            return null;
+        }
+
+        public bool RestoreConsumers
+        {
+            get { return _restoreConsumers; }
+            set { _restoreConsumers = value; }
+        }
+
+        public bool RestoreProducers
+        {
+            get { return _restoreProducers; }
+            set { _restoreProducers = value; }
+        }
+
+        public bool RestoreSessions
+        {
+            get { return _restoreSessions; }
+            set { _restoreSessions = value; }
+        }
+
+        public bool TrackTransactions
+        {
+            get { return _trackTransactions; }
+            set { _trackTransactions = value; }
+        }
+
+        public bool TrackTransactionProducers
+        {
+            get { return _trackTransactionProducers; }
+            set { _trackTransactionProducers = value; }
+        }
+
+        public bool RestoreTransaction
+        {
+            get { return _restoreTransaction; }
+            set { _restoreTransaction = value; }
+        }
+
+        public bool TrackMessages
+        {
+            get { return _trackMessages; }
+            set { _trackMessages = value; }
+        }
+
+        public int MaxCacheSize
+        {
+            get { return _maxCacheSize; }
+            set { _maxCacheSize = value; }
+        }
+
+        public void ConnectionInterruptProcessingComplete(ITransport transport, ConnectionId connectionId)
+        {
+			ConnectionState connectionState = null;
+
+			if(connectionStates.TryGetValue(connectionId, out connectionState))
+            {
+                connectionState.ConnectionInterruptProcessingComplete = true;
+
+				lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot)
+				{
+					foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in connectionState.RecoveringPullConsumers)
+					{
+						ConsumerControl control = new ConsumerControl();
+						control.ConsumerId = entry.Key;
+						control.Prefetch = entry.Value.PrefetchSize;
+						control.Destination = entry.Value.Destination;
+						try
+						{
+							if(Tracer.IsDebugEnabled)
+							{
+								Tracer.Debug("restored recovering consumer: " + control.ConsumerId +
+											 " with: " + control.Prefetch);
+							}
+							transport.Oneway(control);
+						}
+						catch(Exception ex)
+						{
+							if(Tracer.IsDebugEnabled)
+							{
+								Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId +
+											 " with: " + control.Prefetch + "Error: " + ex.Message);
+							}
+						}
+					}
+					connectionState.RecoveringPullConsumers.Clear();
+				}
+            }
+        }
+
+        public void TransportInterrupted(ConnectionId id)
+        {
+			ConnectionState connection = null;
+
+			if(connectionStates.TryGetValue(id, out connection))
+            {
+                connection.ConnectionInterruptProcessingComplete = false;
+            }
+        }
+    }
+}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConsumerState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConsumerState.cs?rev=1205157&r1=1205156&r2=1205157&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConsumerState.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConsumerState.cs Tue Nov 22 20:44:50 2011
@@ -1,47 +1,43 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-
-using Apache.NMS.ActiveMQ.Commands;
-
-namespace Apache.NMS.ActiveMQ.State
-{
-
-	public class ConsumerState
-	{
-	    readonly ConsumerInfo info;
-
-		public ConsumerState(ConsumerInfo info)
-		{
-			this.info = info;
-		}
-
-		public override String ToString()
-		{
-			return info.ToString();
-		}
-
-		public ConsumerInfo Info
-		{
-			get
-			{
-				return info;
-			}
-		}
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+	public class ConsumerState
+	{
+	    private readonly ConsumerInfo info;
+
+		public ConsumerState(ConsumerInfo info)
+		{
+			this.info = info;
+		}
+
+		public override String ToString()
+		{
+			return info.ToString();
+		}
+
+		public ConsumerInfo Info
+		{
+			get { return info; }
+		}
+	}
+}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ProducerState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ProducerState.cs?rev=1205157&r1=1205156&r2=1205157&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ProducerState.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ProducerState.cs Tue Nov 22 20:44:50 2011
@@ -1,46 +1,50 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-
-using Apache.NMS.ActiveMQ.Commands;
-
-namespace Apache.NMS.ActiveMQ.State
-{
-	public class ProducerState
-	{
-	    readonly ProducerInfo info;
-
-		public ProducerState(ProducerInfo info)
-		{
-			this.info = info;
-		}
-
-		public override String ToString()
-		{
-			return info.ToString();
-		}
-
-		public ProducerInfo Info
-		{
-			get
-			{
-				return info;
-			}
-		}
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+	public class ProducerState
+	{
+	    private readonly ProducerInfo info;
+        private TransactionState transactionState;
+
+		public ProducerState(ProducerInfo info)
+		{
+			this.info = info;
+		}
+
+		public override String ToString()
+		{
+			return info.ToString();
+		}
+
+		public ProducerInfo Info
+		{
+            get { return info; }
+		}
+
+        public TransactionState TransactionState
+        {
+            get { return this.transactionState; }
+            set { this.transactionState = value; }
+        }
+	}
+}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SessionState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SessionState.cs?rev=1205157&r1=1205156&r2=1205157&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SessionState.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SessionState.cs Tue Nov 22 20:44:50 2011
@@ -1,148 +1,132 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using Apache.NMS.ActiveMQ.Commands;
-using Apache.NMS.Util;
-
-namespace Apache.NMS.ActiveMQ.State
-{
-	public class SessionState
-	{
-	    readonly SessionInfo info;
-
-		private readonly AtomicDictionary<ProducerId, ProducerState> producers = new AtomicDictionary<ProducerId, ProducerState>();
-		private readonly AtomicDictionary<ConsumerId, ConsumerState> consumers = new AtomicDictionary<ConsumerId, ConsumerState>();
-		private readonly Atomic<bool> _shutdown = new Atomic<bool>(false);
-
-		public SessionState(SessionInfo info)
-		{
-			this.info = info;
-		}
-
-		public override String ToString()
-		{
-			return info.ToString();
-		}
-
-		public void addProducer(ProducerInfo info)
-		{
-			checkShutdown();
-			producers.Add(info.ProducerId, new ProducerState(info));
-		}
-
-		public ProducerState removeProducer(ProducerId id)
-		{
-			ProducerState ret = producers[id];
-			producers.Remove(id);
-			return ret;
-		}
-
-		public void addConsumer(ConsumerInfo info)
-		{
-			checkShutdown();
-			consumers.Add(info.ConsumerId, new ConsumerState(info));
-		}
-
-		public ConsumerState removeConsumer(ConsumerId id)
-		{
-			ConsumerState ret = consumers[id];
-			consumers.Remove(id);
-			return ret;
-		}
-
-		public SessionInfo Info
-		{
-			get
-			{
-				return info;
-			}
-		}
-
-		public AtomicCollection<ConsumerId> ConsumerIds
-		{
-			get
-			{
-				return consumers.Keys;
-			}
-		}
-
-		public AtomicCollection<ProducerId> ProducerIds
-		{
-			get
-			{
-				return producers.Keys;
-			}
-		}
-
-		public AtomicCollection<ProducerState> ProducerStates
-		{
-			get
-			{
-				return producers.Values;
-			}
-		}
-
-		public ProducerState getProducerState(ProducerId producerId)
-		{
-			return producers[producerId];
-		}
-
-		public ProducerState this[ProducerId producerId]
-		{
-			get
-			{
-				return producers[producerId];
-			}
-		}
-
-		public AtomicCollection<ConsumerState> ConsumerStates
-		{
-			get
-			{
-				return consumers.Values;
-			}
-		}
-
-		public ConsumerState getConsumerState(ConsumerId consumerId)
-		{
-			return consumers[consumerId];
-		}
-
-		public ConsumerState this[ConsumerId consumerId]
-		{
-			get
-			{
-				return consumers[consumerId];
-			}
-		}
-
-		private void checkShutdown()
-		{
-			if(_shutdown.Value)
-			{
-				throw new ApplicationException("Disposed");
-			}
-		}
-
-		public void shutdown()
-		{
-			_shutdown.Value = true;
-		}
-
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+	public class SessionState
+	{
+	    readonly SessionInfo info;
+
+		private readonly AtomicDictionary<ProducerId, ProducerState> producers = new AtomicDictionary<ProducerId, ProducerState>();
+		private readonly AtomicDictionary<ConsumerId, ConsumerState> consumers = new AtomicDictionary<ConsumerId, ConsumerState>();
+		private readonly Atomic<bool> _shutdown = new Atomic<bool>(false);
+
+		public SessionState(SessionInfo info)
+		{
+			this.info = info;
+		}
+
+		public override String ToString()
+		{
+			return info.ToString();
+		}
+
+		public void addProducer(ProducerInfo info)
+		{
+			checkShutdown();
+			producers.Add(info.ProducerId, new ProducerState(info));
+		}
+
+		public ProducerState removeProducer(ProducerId id)
+		{
+			ProducerState ret = producers[id];
+			producers.Remove(id);
+            if (ret.TransactionState != null)
+            {
+                ret.TransactionState.AddProducer(ret);
+            }
+
+			return ret;
+		}
+
+		public void addConsumer(ConsumerInfo info)
+		{
+			checkShutdown();
+			consumers.Add(info.ConsumerId, new ConsumerState(info));
+		}
+
+		public ConsumerState removeConsumer(ConsumerId id)
+		{
+			ConsumerState ret = consumers[id];
+			consumers.Remove(id);
+			return ret;
+		}
+
+		public SessionInfo Info
+		{
+            get { return info; }
+		}
+
+		public AtomicCollection<ConsumerId> ConsumerIds
+		{
+			get { return consumers.Keys; }
+		}
+
+		public AtomicCollection<ProducerId> ProducerIds
+		{
+			get { return producers.Keys; }
+		}
+
+		public AtomicCollection<ProducerState> ProducerStates
+		{
+			get { return producers.Values; }
+		}
+
+		public ProducerState getProducerState(ProducerId producerId)
+		{
+			return producers[producerId];
+		}
+
+		public ProducerState this[ProducerId producerId]
+		{
+			get { return producers[producerId]; }
+		}
+
+		public AtomicCollection<ConsumerState> ConsumerStates
+		{
+			get { return consumers.Values; }
+		}
+
+		public ConsumerState getConsumerState(ConsumerId consumerId)
+		{
+			return consumers[consumerId];
+		}
+
+		public ConsumerState this[ConsumerId consumerId]
+		{
+            get { return consumers[consumerId]; }
+		}
+
+		private void checkShutdown()
+		{
+			if(_shutdown.Value)
+			{
+				throw new ApplicationException("Disposed");
+			}
+		}
+
+		public void shutdown()
+		{
+			_shutdown.Value = true;
+		}
+
+	}
+}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SynchronizedObjects.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SynchronizedObjects.cs?rev=1205157&r1=1205156&r2=1205157&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SynchronizedObjects.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SynchronizedObjects.cs Tue Nov 22 20:44:50 2011
@@ -1,226 +1,222 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System.Collections;
-using System.Collections.Generic;
-
-namespace Apache.NMS.ActiveMQ.State
-{
-	public class AtomicCollection<TValue>
-		where TValue : class
-	{
-		private readonly ArrayList _collection = new ArrayList();
-
-		public AtomicCollection()
-		{
-		}
-
-		public AtomicCollection(ICollection c)
-		{
-			lock(c.SyncRoot)
-			{
-				foreach(object obj in c)
-				{
-					_collection.Add(obj);
-				}
-			}
-		}
-
-		public int Count
-		{
-			get
-			{
-				lock(_collection.SyncRoot)
-				{
-					return _collection.Count;
-				}
-			}
-		}
-
-		public bool IsReadOnly
-		{
-			get
-			{
-				return false;
-			}
-		}
-
-		public int Add(TValue v)
-		{
-			lock(_collection.SyncRoot)
-			{
-				return _collection.Add(v);
-			}
-		}
-
-		public void Clear()
-		{
-			lock(_collection.SyncRoot)
-			{
-				_collection.Clear();
-			}
-		}
-
-		public bool Contains(TValue v)
-		{
-			lock(_collection.SyncRoot)
-			{
-				return _collection.Contains(v);
-			}
-		}
-
-		public void CopyTo(TValue[] a, int index)
-		{
-			lock(_collection.SyncRoot)
-			{
-				_collection.CopyTo(a, index);
-			}
-		}
-
-		public void Remove(TValue v)
-		{
-			lock(_collection.SyncRoot)
-			{
-				_collection.Remove(v);
-			}
-		}
-
-		public void RemoveAt(int index)
-		{
-			lock(_collection.SyncRoot)
-			{
-				_collection.RemoveAt(index);
-			}
-		}
-
-		public TValue this[int index]
-		{
-			get
-			{
-				TValue ret;
-				lock(_collection.SyncRoot)
-				{
-					ret = (TValue) _collection[index];
-				}
-				return (TValue) ret;
-			}
-			set
-			{
-				lock(_collection.SyncRoot)
-				{
-					_collection[index] = value;
-				}
-			}
-		}
-
-		public IEnumerator GetEnumerator()
-		{
-			lock(_collection.SyncRoot)
-			{
-				return _collection.GetEnumerator();
-			}
-		}
-
-#if !NETCF
-		public IEnumerator GetEnumerator(int index, int count)
-		{
-			lock(_collection.SyncRoot)
-			{
-				return _collection.GetEnumerator(index, count);
-			}
-		}
-#endif
-	}
-
-	public class AtomicDictionary<TKey, TValue>
-		where TKey : class
-		where TValue : class
-	{
-		private readonly Dictionary<TKey, TValue> _dictionary = new Dictionary<TKey, TValue>();
-
-		public void Clear()
-		{
-			_dictionary.Clear();
-		}
-
-		public TValue this[TKey key]
-		{
-			get
-			{
-				TValue ret;
-				lock(((ICollection) _dictionary).SyncRoot)
-				{
-					ret = _dictionary[key];
-				}
-				return ret;
-			}
-			set
-			{
-				lock(((ICollection) _dictionary).SyncRoot)
-				{
-					_dictionary[key] = value;
-				}
-			}
-		}
-
-		public bool TryGetValue(TKey key, out TValue val)
-		{
-			lock(((ICollection) _dictionary).SyncRoot)
-			{
-				return _dictionary.TryGetValue(key, out val);
-			}
-		}
-
-		public AtomicCollection<TKey> Keys
-		{
-			get
-			{
-				lock(((ICollection) _dictionary).SyncRoot)
-				{
-					return new AtomicCollection<TKey>(_dictionary.Keys);
-				}
-			}
-		}
-
-		public AtomicCollection<TValue> Values
-		{
-			get
-			{
-				lock(((ICollection) _dictionary).SyncRoot)
-				{
-					return new AtomicCollection<TValue>(_dictionary.Values);
-				}
-			}
-		}
-
-		public void Add(TKey k, TValue v)
-		{
-			lock(((ICollection) _dictionary).SyncRoot)
-			{
-				_dictionary.Add(k, v);
-			}
-		}
-
-		public bool Remove(TKey v)
-		{
-			lock(((ICollection) _dictionary).SyncRoot)
-			{
-				return _dictionary.Remove(v);
-			}
-		}
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Collections;
+using System.Collections.Generic;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+	public class AtomicCollection<TValue> where TValue : class
+	{
+		private readonly ArrayList _collection = new ArrayList();
+
+		public AtomicCollection()
+		{
+		}
+
+		public AtomicCollection(ICollection c)
+		{
+			lock(c.SyncRoot)
+			{
+				foreach(object obj in c)
+				{
+					_collection.Add(obj);
+				}
+			}
+		}
+
+		public int Count
+		{
+			get
+			{
+				lock(_collection.SyncRoot)
+				{
+					return _collection.Count;
+				}
+			}
+		}
+
+		public bool IsReadOnly
+		{
+			get { return false; }
+		}
+
+		public int Add(TValue v)
+		{
+			lock(_collection.SyncRoot)
+			{
+				return _collection.Add(v);
+			}
+		}
+
+		public void Clear()
+		{
+			lock(_collection.SyncRoot)
+			{
+				_collection.Clear();
+			}
+		}
+
+		public bool Contains(TValue v)
+		{
+			lock(_collection.SyncRoot)
+			{
+				return _collection.Contains(v);
+			}
+		}
+
+		public void CopyTo(TValue[] a, int index)
+		{
+			lock(_collection.SyncRoot)
+			{
+				_collection.CopyTo(a, index);
+			}
+		}
+
+		public void Remove(TValue v)
+		{
+			lock(_collection.SyncRoot)
+			{
+				_collection.Remove(v);
+			}
+		}
+
+		public void RemoveAt(int index)
+		{
+			lock(_collection.SyncRoot)
+			{
+				_collection.RemoveAt(index);
+			}
+		}
+
+		public TValue this[int index]
+		{
+			get
+			{
+				TValue ret;
+				lock(_collection.SyncRoot)
+				{
+					ret = (TValue) _collection[index];
+				}
+				return (TValue) ret;
+			}
+			set
+			{
+				lock(_collection.SyncRoot)
+				{
+					_collection[index] = value;
+				}
+			}
+		}
+
+		public IEnumerator GetEnumerator()
+		{
+			lock(_collection.SyncRoot)
+			{
+				return _collection.GetEnumerator();
+			}
+		}
+
+#if !NETCF
+		public IEnumerator GetEnumerator(int index, int count)
+		{
+			lock(_collection.SyncRoot)
+			{
+				return _collection.GetEnumerator(index, count);
+			}
+		}
+#endif
+	}
+
+	public class AtomicDictionary<TKey, TValue>
+		where TKey : class
+		where TValue : class
+	{
+		private readonly Dictionary<TKey, TValue> _dictionary = new Dictionary<TKey, TValue>();
+
+		public void Clear()
+		{
+			_dictionary.Clear();
+		}
+
+		public TValue this[TKey key]
+		{
+			get
+			{
+				TValue ret;
+				lock(((ICollection) _dictionary).SyncRoot)
+				{
+					ret = _dictionary[key];
+				}
+				return ret;
+			}
+			set
+			{
+				lock(((ICollection) _dictionary).SyncRoot)
+				{
+					_dictionary[key] = value;
+				}
+			}
+		}
+
+		public bool TryGetValue(TKey key, out TValue val)
+		{
+			lock(((ICollection) _dictionary).SyncRoot)
+			{
+				return _dictionary.TryGetValue(key, out val);
+			}
+		}
+
+		public AtomicCollection<TKey> Keys
+		{
+			get
+			{
+				lock(((ICollection) _dictionary).SyncRoot)
+				{
+					return new AtomicCollection<TKey>(_dictionary.Keys);
+				}
+			}
+		}
+
+		public AtomicCollection<TValue> Values
+		{
+			get
+			{
+				lock(((ICollection) _dictionary).SyncRoot)
+				{
+					return new AtomicCollection<TValue>(_dictionary.Values);
+				}
+			}
+		}
+
+		public void Add(TKey k, TValue v)
+		{
+			lock(((ICollection) _dictionary).SyncRoot)
+			{
+				_dictionary.Add(k, v);
+			}
+		}
+
+		public bool Remove(TKey v)
+		{
+			lock(((ICollection) _dictionary).SyncRoot)
+			{
+				return _dictionary.Remove(v);
+			}
+		}
+	}
+}



Mime
View raw message