activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jgo...@apache.org
Subject svn commit: r707747 [3/4] - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk: ./ src/main/csharp/ src/main/csharp/Commands/ src/main/csharp/State/ src/main/csharp/Threads/ src/main/csharp/Transport/ src/main/csharp/Transport/Failover/ src/main/cs...
Date Fri, 24 Oct 2008 21:10:23 GMT
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/Response.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/Response.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/Response.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/Response.cs Fri Oct 24 14:10:22 2008
@@ -20,47 +20,54 @@
 //         activemq-core module
 //
 
-using System;
-using System.Collections;
 
-using Apache.NMS.ActiveMQ.OpenWire;
-using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.State;
 
 namespace Apache.NMS.ActiveMQ.Commands
 {
-    /// <summary>
-    ///  The ActiveMQ Response Command
-    /// </summary>
-    public class Response : BaseCommand
-    {
-        public const byte ID_Response = 30;
-    			
-        int correlationId;
-
-		public override string ToString() {
-            return GetType().Name + "["
-                + " CorrelationId=" + CorrelationId
-                + " ]";
+	/// <summary>
+	///  The ActiveMQ Response Command
+	/// </summary>
+	public class Response : BaseCommand
+	{
+		public const byte ID_Response = 30;
+
+		int correlationId;
+
+		public override string ToString()
+		{
+			return GetType().Name + "["
+				+ " CorrelationId=" + CorrelationId
+				+ " ]";
 
 		}
 
-        public override byte GetDataStructureType() {
-            return ID_Response;
-        }
+		public override byte GetDataStructureType()
+		{
+			return ID_Response;
+		}
+
 
+		// Properties
+
+		public int CorrelationId
+		{
+			get { return correlationId; }
+			set { this.correlationId = value; }
+		}
 
-        // Properties
-
-        public int CorrelationId
-        {
-            get { return correlationId; }
-            set { this.correlationId = value; }
-        }
-		
-        public override bool IsResponse {
-			get { return true; }
+		public override bool IsResponse
+		{
+			get
+			{
+				return true;
+			}
 		}
-		
-		
-    }
+
+		public override Response visit(ICommandVisitor visitor)
+		{
+			return null;
+		}
+
+	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/SessionId.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/SessionId.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/SessionId.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/SessionId.cs Fri Oct 24 14:10:22 2008
@@ -20,72 +20,113 @@
 //         activemq-core module
 //
 
-using System;
-using System.Collections;
 
-using Apache.NMS.ActiveMQ.OpenWire;
-using Apache.NMS.ActiveMQ.Commands;
 
 namespace Apache.NMS.ActiveMQ.Commands
 {
-    /// <summary>
-    ///  The ActiveMQ SessionId Command
-    /// </summary>
-    public class SessionId : BaseDataStructure, DataStructure
-    {
-        public const byte ID_SessionId = 121;
-    			
-        string connectionId;
-        long value;
-
-		public override int GetHashCode() {
-            int answer = 0;
-            answer = (answer * 37) + HashCode(ConnectionId);
-            answer = (answer * 37) + HashCode(Value);
-            return answer;
-
+	/// <summary>
+	///  The ActiveMQ SessionId Command
+	/// </summary>
+	public class SessionId : BaseDataStructure, DataStructure
+	{
+		public const byte ID_SessionId = 121;
+
+		string connectionId;
+		long value;
+		ConnectionId parentId;
+
+		public override int GetHashCode()
+		{
+			int answer = 0;
+			answer = (answer * 37) + HashCode(ConnectionId);
+			answer = (answer * 37) + HashCode(Value);
+			return answer;
 		}
 
-		public override bool Equals(object that) {
-	    	if (that is SessionId) {
-	    	    return Equals((SessionId) that);
+		public override bool Equals(object that)
+		{
+			if(that is SessionId)
+			{
+				return Equals((SessionId) that);
 			}
 			return false;
-    	}
+		}
 
-		public virtual bool Equals(SessionId that) {
-            if (! Equals(this.ConnectionId, that.ConnectionId)) return false;
-            if (! Equals(this.Value, that.Value)) return false;
-            return true;
+		public virtual bool Equals(SessionId that)
+		{
+			if(!Equals(this.ConnectionId, that.ConnectionId))
+				return false;
+			if(!Equals(this.Value, that.Value))
+				return false;
+			return true;
+		}
 
+		public override string ToString()
+		{
+			return GetType().Name + "["
+				+ " ConnectionId=" + ConnectionId
+				+ " Value=" + Value
+				+ " ]";
 		}
 
-		public override string ToString() {
-            return GetType().Name + "["
-                + " ConnectionId=" + ConnectionId
-                + " Value=" + Value
-                + " ]";
+		public override byte GetDataStructureType()
+		{
+			return ID_SessionId;
+		}
+
+		// Properties
+
+		public string ConnectionId
+		{
+			get { return connectionId; }
+			set { this.connectionId = value; }
+		}
 
+		public long Value
+		{
+			get { return value; }
+			set { this.value = value; }
 		}
 
-        public override byte GetDataStructureType() {
-            return ID_SessionId;
-        }
+		public ConnectionId ParentId
+		{
+			get
+			{
+				if(parentId == null)
+				{
+					parentId = new ConnectionId(this);
+				}
+				return parentId;
+			}
+		}
 
+		public SessionId()
+			: base()
+		{
+		}
 
-        // Properties
+		public SessionId(ConnectionId connectionId, long sessionId)
+		{
+			this.connectionId = connectionId.Value;
+			this.value = sessionId;
+		}
 
-        public string ConnectionId
-        {
-            get { return connectionId; }
-            set { this.connectionId = value; }            
-        }
+		public SessionId(SessionId id)
+		{
+			this.connectionId = id.ConnectionId;
+			this.value = id.Value;
+		}
 
-        public long Value
-        {
-            get { return value; }
-            set { this.value = value; }            
-        }
+		public SessionId(ProducerId id)
+		{
+			this.connectionId = id.ConnectionId;
+			this.value = id.Value;
+		}
 
-    }
+		public SessionId(ConsumerId id)
+		{
+			this.connectionId = id.ConnectionId;
+			this.value = id.SessionId;
+		}
+	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/SessionInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/SessionInfo.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/SessionInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/SessionInfo.cs Fri Oct 24 14:10:22 2008
@@ -20,42 +20,55 @@
 //         activemq-core module
 //
 
-using System;
-using System.Collections;
 
-using Apache.NMS.ActiveMQ.OpenWire;
-using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.State;
 
 namespace Apache.NMS.ActiveMQ.Commands
 {
-    /// <summary>
-    ///  The ActiveMQ SessionInfo Command
-    /// </summary>
-    public class SessionInfo : BaseCommand
-    {
-        public const byte ID_SessionInfo = 4;
-    			
-        SessionId sessionId;
+	/// <summary>
+	///  The ActiveMQ SessionInfo Command
+	/// </summary>
+	public class SessionInfo : BaseCommand
+	{
+		public const byte ID_SessionInfo = 4;
+
+		SessionId sessionId;
+
+		public override string ToString()
+		{
+			return GetType().Name + "["
+				+ " SessionId=" + SessionId
+				+ " ]";
 
-		public override string ToString() {
-            return GetType().Name + "["
-                + " SessionId=" + SessionId
-                + " ]";
+		}
 
+		public override byte GetDataStructureType()
+		{
+			return ID_SessionInfo;
 		}
 
-        public override byte GetDataStructureType() {
-            return ID_SessionInfo;
-        }
 
+		// Properties
+
+		public SessionId SessionId
+		{
+			get { return sessionId; }
+			set { this.sessionId = value; }
+		}
 
-        // Properties
+		public SessionInfo(ConnectionInfo connectionInfo, long sessionId)
+		{
+			this.sessionId = new SessionId(connectionInfo.ConnectionId, sessionId);
+		}
 
-        public SessionId SessionId
-        {
-            get { return sessionId; }
-            set { this.sessionId = value; }            
-        }
+		public SessionInfo()
+			: base()
+		{
+		}
 
-    }
+		public override Response visit(ICommandVisitor visitor)
+		{
+			return visitor.processAddSession(this);
+		}
+	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ShutdownInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ShutdownInfo.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ShutdownInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/ShutdownInfo.cs Fri Oct 24 14:10:22 2008
@@ -20,34 +20,46 @@
 //         activemq-core module
 //
 
-using System;
-using System.Collections;
 
-using Apache.NMS.ActiveMQ.OpenWire;
-using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.State;
 
 namespace Apache.NMS.ActiveMQ.Commands
 {
-    /// <summary>
-    ///  The ActiveMQ ShutdownInfo Command
-    /// </summary>
-    public class ShutdownInfo : BaseCommand
-    {
-        public const byte ID_ShutdownInfo = 11;
-    			
-
-		public override string ToString() {
-            return GetType().Name + "["
-                + " ]";
+	/// <summary>
+	///  The ActiveMQ ShutdownInfo Command
+	/// </summary>
+	public class ShutdownInfo : BaseCommand
+	{
+		public const byte ID_ShutdownInfo = 11;
 
+
+		public override string ToString()
+		{
+			return GetType().Name + "["
+				+ " ]";
+
+		}
+
+		public override byte GetDataStructureType()
+		{
+			return ID_ShutdownInfo;
 		}
 
-        public override byte GetDataStructureType() {
-            return ID_ShutdownInfo;
-        }
 
+		// Properties
 
-        // Properties
+		public override bool IsShutdownInfo
+		{
+			get
+			{
+				return true;
+			}
+		}
+
+		public override Response visit(ICommandVisitor visitor)
+		{
+			return visitor.processShutdown(this);
+		}
 
-    }
+	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/TransactionInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/TransactionInfo.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/TransactionInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/TransactionInfo.cs Fri Oct 24 14:10:22 2008
@@ -20,58 +20,88 @@
 //         activemq-core module
 //
 
-using System;
-using System.Collections;
 
-using Apache.NMS.ActiveMQ.OpenWire;
-using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.State;
 
 namespace Apache.NMS.ActiveMQ.Commands
 {
-    /// <summary>
-    ///  The ActiveMQ TransactionInfo Command
-    /// </summary>
-    public class TransactionInfo : BaseCommand
-    {
-        public const byte ID_TransactionInfo = 7;
-    			
-        ConnectionId connectionId;
-        TransactionId transactionId;
-        byte type;
-
-		public override string ToString() {
-            return GetType().Name + "["
-                + " ConnectionId=" + ConnectionId
-                + " TransactionId=" + TransactionId
-                + " Type=" + Type
-                + " ]";
+	/// <summary>
+	///  The ActiveMQ TransactionInfo Command
+	/// </summary>
+	public class TransactionInfo : BaseCommand
+	{
+		public const byte ID_TransactionInfo = 7;
+
+		ConnectionId connectionId;
+		TransactionId transactionId;
+		byte type;
+
+		public const byte BEGIN = 0;
+		public const byte PREPARE = 1;
+		public const byte COMMIT_ONE_PHASE = 2;
+		public const byte COMMIT_TWO_PHASE = 3;
+		public const byte ROLLBACK = 4;
+		public const byte RECOVER = 5;
+		public const byte FORGET = 6;
+		public const byte END = 7;
+
+		public override string ToString()
+		{
+			return GetType().Name + "["
+				+ " ConnectionId=" + ConnectionId
+				+ " TransactionId=" + TransactionId
+				+ " Type=" + Type
+				+ " ]";
+		}
+
+		public override byte GetDataStructureType()
+		{
+			return ID_TransactionInfo;
+		}
+
+		// Properties
 
+		public ConnectionId ConnectionId
+		{
+			get { return connectionId; }
+			set { this.connectionId = value; }
 		}
 
-        public override byte GetDataStructureType() {
-            return ID_TransactionInfo;
-        }
-
-
-        // Properties
-
-        public ConnectionId ConnectionId
-        {
-            get { return connectionId; }
-            set { this.connectionId = value; }            
-        }
-
-        public TransactionId TransactionId
-        {
-            get { return transactionId; }
-            set { this.transactionId = value; }            
-        }
-
-        public byte Type
-        {
-            get { return type; }
-            set { this.type = value; }            
-        }
+		public TransactionId TransactionId
+		{
+			get { return transactionId; }
+			set { this.transactionId = value; }
+		}
+
+		public byte Type
+		{
+			get { return type; }
+			set { this.type = value; }
+		}
 
-    }
+		public override Response visit(ICommandVisitor visitor)
+		{
+			switch(type)
+			{
+				case TransactionInfo.BEGIN:
+					return visitor.processBeginTransaction(this);
+				case TransactionInfo.END:
+					return visitor.processEndTransaction(this);
+				case TransactionInfo.PREPARE:
+					return visitor.processPrepareTransaction(this);
+				case TransactionInfo.COMMIT_ONE_PHASE:
+					return visitor.processCommitTransactionOnePhase(this);
+				case TransactionInfo.COMMIT_TWO_PHASE:
+					return visitor.processCommitTransactionTwoPhase(this);
+				case TransactionInfo.ROLLBACK:
+					return visitor.processRollbackTransaction(this);
+				case TransactionInfo.RECOVER:
+					return visitor.processRecoverTransactions(this);
+				case TransactionInfo.FORGET:
+					return visitor.processForgetTransaction(this);
+				default:
+					throw new IOException("Transaction info type unknown: " + type);
+			}
+		}
+	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/WireFormatInfo.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/WireFormatInfo.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/WireFormatInfo.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Commands/WireFormatInfo.cs Fri Oct 24 14:10:22 2008
@@ -15,9 +15,8 @@
 * limitations under the License.
 */
 
-using System;
 using Apache.NMS.ActiveMQ.OpenWire;
-using Apache.NMS;
+using Apache.NMS.ActiveMQ.State;
 
 namespace Apache.NMS.ActiveMQ.Commands
 {
@@ -29,14 +28,14 @@
 	{
 		public const byte ID_WireFormatInfo = 1;
 		static private byte[] MAGIC = new byte[] {
-			'A'&0xFF,
-			'c'&0xFF,
-			't'&0xFF,
-			'i'&0xFF,
-			'v'&0xFF,
-			'e'&0xFF,
-			'M'&0xFF,
-			'Q'&0xFF };
+						'A'&0xFF,
+						'c'&0xFF,
+						't'&0xFF,
+						'i'&0xFF,
+						'v'&0xFF,
+						'e'&0xFF,
+						'M'&0xFF,
+						'Q'&0xFF };
 
 		byte[] magic = MAGIC;
 		int version;
@@ -44,16 +43,18 @@
 
 		private PrimitiveMap properties;
 
-		public override string ToString() {
+		public override string ToString()
+		{
 			return GetType().Name + "["
-				+ " Magic=" + Magic
-				+ " Version=" + Version
-				+ " MarshalledProperties=" + Properties.ToString()
-				+ " ]";
+					+ " Magic=" + Magic
+					+ " Version=" + Version
+					+ " MarshalledProperties=" + Properties.ToString()
+					+ " ]";
 
 		}
 
-		public override byte GetDataStructureType() {
+		public override byte GetDataStructureType()
+		{
 			return ID_WireFormatInfo;
 		}
 
@@ -79,7 +80,7 @@
 					return false;
 				}
 
-				for(int i = 0; i < magic.Length; i++ )
+				for(int i = 0; i < magic.Length; i++)
 				{
 					if(magic[i] != MAGIC[i])
 					{
@@ -147,8 +148,8 @@
 			{
 				object prop = Properties["MaxInactivityDuration"];
 				return (null != prop
-							? (long) prop
-							: 0);
+										? (long) prop
+										: 0);
 			}
 			set { Properties["MaxInactivityDuration"] = value; }
 		}
@@ -158,8 +159,8 @@
 			{
 				object prop = Properties["MaxInactivityDurationInitialDelay"];
 				return (null != prop
-							? (long) prop
-							: 0);
+										? (long) prop
+										: 0);
 			}
 			set { Properties["MaxInactivityDurationInitialDelay"] = value; }
 		}
@@ -169,8 +170,8 @@
 			{
 				object prop = Properties["CacheSize"];
 				return (null != prop
-							? (int) prop
-							: 0);
+										? (int) prop
+										: 0);
 			}
 			set { Properties.SetInt("CacheSize", value); }
 		}
@@ -190,5 +191,10 @@
 				MarshalledProperties = properties.Marshal();
 			}
 		}
+
+		public override Response visit(ICommandVisitor visitor)
+		{
+			return visitor.processWireFormat(this);
+		}
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs Fri Oct 24 14:10:22 2008
@@ -14,13 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-using Apache.NMS.ActiveMQ.Commands;
-using Apache.NMS.ActiveMQ.Transport;
-using Apache.NMS;
-using Apache.NMS.Util;
+
 using System;
 using System.Collections;
 using System.Threading;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.Transport;
+using Apache.NMS.Util;
 
 namespace Apache.NMS.ActiveMQ
 {
@@ -170,8 +170,8 @@
 			Session session = new Session(this, info, sessionAcknowledgementMode);
 
 			// Set properties on session using parameters prefixed with "session."
-			System.Collections.Specialized.StringDictionary map = URISupport.ParseQuery(this.brokerUri.Query);
-			URISupport.SetProperties(session, map, "session.");
+			URISupport.CompositeData c = URISupport.parseComposite(this.brokerUri);
+			URISupport.SetProperties(session, c.Parameters, "session.");
 
 			if(IsStarted)
 			{
@@ -384,7 +384,7 @@
 		/// </summary>
 		public LocalTransactionId CreateLocalTransactionId()
 		{
-			LocalTransactionId id= new LocalTransactionId();
+			LocalTransactionId id = new LocalTransactionId();
 			id.ConnectionId = ConnectionId;
 			id.Value = Interlocked.Increment(ref localTransactionCounter);
 			return id;

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs Fri Oct 24 14:10:22 2008
@@ -14,45 +14,45 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+using System;
 using Apache.NMS.ActiveMQ.Commands;
 using Apache.NMS.ActiveMQ.Transport;
-using Apache.NMS.ActiveMQ.Transport.Tcp;
-using Apache.NMS;
 using Apache.NMS.Util;
-using System;
 
 namespace Apache.NMS.ActiveMQ
 {
-    /// <summary>
-    /// Represents a connection with a message broker
-    /// </summary>
-    public class ConnectionFactory : IConnectionFactory
-    {
+	/// <summary>
+	/// Represents a connection with a message broker
+	/// </summary>
+	public class ConnectionFactory : IConnectionFactory
+	{
 		public const string DEFAULT_BROKER_URL = "activemq:tcp://localhost:61616";
 		public const string ENV_BROKER_URL = "ACTIVEMQ_BROKER_URL";
-		
-        private Uri brokerUri;
-        private string connectionUserName;
-        private string connectionPassword;
-        private string clientId;
-        
+
+		private Uri brokerUri;
+		private string connectionUserName;
+		private string connectionPassword;
+		private string clientId;
+
 		public static string GetDefaultBrokerUrl()
 		{
 #if (PocketPC||NETCF||NETCF_2_0)
-            return DEFAULT_BROKER_URL;
+			return DEFAULT_BROKER_URL;
 #else
-            string answer = Environment.GetEnvironmentVariable(ENV_BROKER_URL);
-			if (answer == null) {
+			string answer = Environment.GetEnvironmentVariable(ENV_BROKER_URL);
+			if(answer == null)
+			{
 				answer = DEFAULT_BROKER_URL;
 			}
 			return answer;
 #endif
 		}
-		
-        public ConnectionFactory()
+
+		public ConnectionFactory()
 			: this(GetDefaultBrokerUrl())
-        {
-        }
+		{
+		}
 
 		public ConnectionFactory(string brokerUri)
 			: this(brokerUri, null)
@@ -76,83 +76,85 @@
 		}
 
 		public IConnection CreateConnection()
-        {
-            return CreateConnection(connectionUserName, connectionPassword);
-        }
-
-    	public IConnection CreateConnection(string userName, string password)
-        {
-            Uri uri = brokerUri;
-            // Do we need to strip off the activemq prefix??
-            if("activemq".Equals(brokerUri.Scheme))
-            {
-                uri = new Uri(brokerUri.AbsolutePath + brokerUri.Query);
-            }
+		{
+			return CreateConnection(connectionUserName, connectionPassword);
+		}
+
+		public IConnection CreateConnection(string userName, string password)
+		{
+			Uri uri = brokerUri;
+			// Do we need to strip off the activemq prefix??
+			if("activemq".Equals(brokerUri.Scheme))
+			{
+				uri = new Uri(brokerUri.AbsolutePath + brokerUri.Query);
+			}
 
 			ConnectionInfo info = CreateConnectionInfo(userName, password);
-			ITransportFactory tcpTransportFactory = new TcpTransportFactory();
-			ITransport transport = tcpTransportFactory.CreateTransport(uri);
+			ITransport transport = TransportFactory.CreateTransport(uri);
 			Connection connection = new Connection(uri, transport, info);
 
 			// Set properties on connection using parameters prefixed with "connection."
-			System.Collections.Specialized.StringDictionary map = URISupport.ParseQuery(brokerUri.Query);
-			URISupport.SetProperties(connection, map, "connection.");
+			// Since this could be a composite Uri, assume the connection-specific parameters
+			// are associated with the outer-most specification of the composite Uri. What's nice
+			// is that this works with simple Uri as well.
+			URISupport.CompositeData c = URISupport.parseComposite(brokerUri);
+			URISupport.SetProperties(connection, c.Parameters, "connection.");
 
 			return connection;
-        }
-        
-        // Properties
-        
-        public Uri BrokerUri
-        {
-            get { return brokerUri; }
-            set { brokerUri = value; }
-        }
-                
-        public string UserName
-        {
-            get { return connectionUserName; }
-            set { connectionUserName = value; }
-        }
-        
-        public string Password
-        {
-            get { return connectionPassword; }
-            set { connectionPassword = value; }
-        }
+		}
+
+		// Properties
+
+		public Uri BrokerUri
+		{
+			get { return brokerUri; }
+			set { brokerUri = value; }
+		}
+
+		public string UserName
+		{
+			get { return connectionUserName; }
+			set { connectionUserName = value; }
+		}
+
+		public string Password
+		{
+			get { return connectionPassword; }
+			set { connectionPassword = value; }
+		}
 
 		public string ClientId
-        {
-            get { return clientId; }
-            set { clientId = value; }
-        }
-        
-        // Implementation methods
-        
-        protected virtual ConnectionInfo CreateConnectionInfo(string userName, string password)
-        {
-            ConnectionInfo answer = new ConnectionInfo();
-            ConnectionId connectionId = new ConnectionId();
-            connectionId.Value = CreateNewGuid();
-            
-            answer.ConnectionId = connectionId;
-            answer.UserName = userName;
-            answer.Password = password;
-            if(clientId == null)
-            {
-                answer.ClientId = CreateNewGuid();
-            }
+		{
+			get { return clientId; }
+			set { clientId = value; }
+		}
+
+		// Implementation methods
+
+		protected virtual ConnectionInfo CreateConnectionInfo(string userName, string password)
+		{
+			ConnectionInfo answer = new ConnectionInfo();
+			ConnectionId connectionId = new ConnectionId();
+			connectionId.Value = CreateNewGuid();
+
+			answer.ConnectionId = connectionId;
+			answer.UserName = userName;
+			answer.Password = password;
+			if(clientId == null)
+			{
+				answer.ClientId = CreateNewGuid();
+			}
 			else
 			{
 				answer.ClientId = clientId;
 			}
-            return answer;
-        }
-        
-        protected static string CreateNewGuid()
-        {
-            return Guid.NewGuid().ToString();
-        }
-        
-    }
+			return answer;
+		}
+
+		protected static string CreateNewGuid()
+		{
+			return Guid.NewGuid().ToString();
+		}
+
+	}
 }

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/IOException.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/IOException.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/IOException.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/IOException.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+using Apache.NMS.ActiveMQ.Commands;
+using System.Text;
+using Apache.NMS;
+
+namespace Apache.NMS.ActiveMQ
+{
+
+	/// <summary>
+	/// Exception thrown when an IO error occurs
+	/// </summary>
+	public class IOException : NMSException
+	{
+		public IOException()
+			: base("IO Exception failed with missing exception log")
+		{
+		}
+
+		public IOException(String msg)
+			: base(msg)
+		{
+		}
+
+		public IOException(String msg, Exception inner)
+			: base(msg, inner)
+		{
+		}
+	}
+}
+
+

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=707747&r1=707746&r2=707747&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Fri Oct 24 14:10:22 2008
@@ -445,7 +445,7 @@
 		{
 			DestinationInfo command = new DestinationInfo();
 			command.ConnectionId = Connection.ConnectionId;
-			command.OperationType = 0; // 0 is add
+			command.OperationType = DestinationInfo.ADD_OPERATION_TYPE ; // 0 is add
 			command.Destination = tempDestination;
 
 			this.DoSend(command);
@@ -455,7 +455,7 @@
 		{
 			DestinationInfo command = new DestinationInfo();
 			command.ConnectionId = Connection.ConnectionId;
-			command.OperationType = 1; // 1 is remove
+			command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE ; // 1 is remove
 			command.Destination = tempDestination;
 
 			this.DoSend(command);

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+	public class CommandVisitorAdapter : ICommandVisitor
+	{
+
+		public virtual Response processAddConnection(ConnectionInfo info)
+		{
+			return null;
+		}
+
+		public virtual Response processAddConsumer(ConsumerInfo info)
+		{
+			return null;
+		}
+
+		public virtual Response processAddDestination(DestinationInfo info)
+		{
+			return null;
+		}
+
+		public virtual Response processAddProducer(ProducerInfo info)
+		{
+			return null;
+		}
+
+		public virtual Response processAddSession(SessionInfo info)
+		{
+			return null;
+		}
+
+		public virtual Response processBeginTransaction(TransactionInfo info)
+		{
+			return null;
+		}
+
+		public virtual Response processBrokerInfo(BrokerInfo info)
+		{
+			return null;
+		}
+
+		public virtual Response processCommitTransactionOnePhase(TransactionInfo info)
+		{
+			return null;
+		}
+
+		public virtual Response processCommitTransactionTwoPhase(TransactionInfo info)
+		{
+			return null;
+		}
+
+		public virtual Response processEndTransaction(TransactionInfo info)
+		{
+			return null;
+		}
+
+		public virtual Response processFlush(FlushCommand command)
+		{
+			return null;
+		}
+
+		public virtual Response processForgetTransaction(TransactionInfo info)
+		{
+			return null;
+		}
+
+		public virtual Response processKeepAlive(KeepAliveInfo info)
+		{
+			return null;
+		}
+
+		public virtual Response processMessage(Message send)
+		{
+			return null;
+		}
+
+		public virtual Response processMessageAck(MessageAck ack)
+		{
+			return null;
+		}
+
+		public virtual Response processMessageDispatchNotification(MessageDispatchNotification notification)
+		{
+			return null;
+		}
+
+		public virtual Response processMessagePull(MessagePull pull)
+		{
+			return null;
+		}
+
+		public virtual Response processPrepareTransaction(TransactionInfo info)
+		{
+			return null;
+		}
+
+		public virtual Response processProducerAck(ProducerAck ack)
+		{
+			return null;
+		}
+
+		public virtual Response processRecoverTransactions(TransactionInfo info)
+		{
+			return null;
+		}
+
+		public virtual Response processRemoveConnection(ConnectionId id)
+		{
+			return null;
+		}
+
+		public virtual Response processRemoveConsumer(ConsumerId id)
+		{
+			return null;
+		}
+
+		public virtual Response processRemoveDestination(DestinationInfo info)
+		{
+			return null;
+		}
+
+		public virtual Response processRemoveProducer(ProducerId id)
+		{
+			return null;
+		}
+
+		public virtual Response processRemoveSession(SessionId id)
+		{
+			return null;
+		}
+
+		public virtual Response processRemoveSubscription(RemoveSubscriptionInfo info)
+		{
+			return null;
+		}
+
+		public virtual Response processRollbackTransaction(TransactionInfo info)
+		{
+			return null;
+		}
+
+		public virtual Response processShutdown(ShutdownInfo info)
+		{
+			return null;
+		}
+
+		public virtual Response processWireFormat(WireFormatInfo info)
+		{
+			return null;
+		}
+
+		public virtual Response processMessageDispatch(MessageDispatch dispatch)
+		{
+			return null;
+		}
+
+		public virtual Response processControlCommand(ControlCommand command)
+		{
+			return null;
+		}
+
+		public virtual Response processConnectionControl(ConnectionControl control)
+		{
+			return null;
+		}
+
+		public virtual Response processConnectionError(ConnectionError error)
+		{
+			return null;
+		}
+
+		public virtual Response processConsumerControl(ConsumerControl control)
+		{
+			return null;
+		}
+
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionState.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+	public class ConnectionState
+	{
+
+		ConnectionInfo info;
+		private SynchronizedDictionary<TransactionId, TransactionState> transactions = new SynchronizedDictionary<TransactionId, TransactionState>();
+		private SynchronizedDictionary<SessionId, SessionState> sessions = new SynchronizedDictionary<SessionId, SessionState>();
+		private SynchronizedCollection<DestinationInfo> tempDestinations = new SynchronizedCollection<DestinationInfo>();
+		private AtomicBoolean _shutdown = new AtomicBoolean(false);
+
+		public ConnectionState(ConnectionInfo info)
+		{
+			this.info = info;
+			// Add the default session id.
+			addSession(new SessionInfo(info, -1));
+		}
+
+		public override String ToString()
+		{
+			return info.ToString();
+		}
+
+		public void reset(ConnectionInfo info)
+		{
+			this.info = info;
+			transactions.Clear();
+			sessions.Clear();
+			tempDestinations.Clear();
+			_shutdown.Value = false;
+		}
+
+		public void addTempDestination(DestinationInfo info)
+		{
+			checkShutdown();
+			tempDestinations.Add(info);
+		}
+
+		public void removeTempDestination(ActiveMQDestination destination)
+		{
+			for(int i = tempDestinations.Count - 1; i >= 0; i--)
+			{
+				DestinationInfo di = tempDestinations[i];
+				if(di.Destination.Equals(destination))
+				{
+					tempDestinations.RemoveAt(i);
+				}
+			}
+		}
+
+		public void addTransactionState(TransactionId id)
+		{
+			checkShutdown();
+			transactions.Add(id, new TransactionState(id));
+		}
+
+		/*
+		public TransactionState getTransactionState(TransactionId id) {
+			return transactions[id];
+		}
+
+		public SynchronizedCollection<TransactionState> getTransactionStates() {
+			return transactions.Values;
+		}
+
+		public SessionState getSessionState(SessionId id) {
+			return sessions[id];
+		}
+
+		*/
+
+		public TransactionState this[TransactionId id]
+		{
+			get
+			{
+				return transactions[id];
+			}
+		}
+
+		public SynchronizedCollection<TransactionState> TransactionStates
+		{
+			get
+			{
+				return transactions.Values;
+			}
+		}
+
+		public SessionState this[SessionId id]
+		{
+			get
+			{
+				return sessions[id];
+			}
+		}
+
+		public TransactionState removeTransactionState(TransactionId id)
+		{
+			TransactionState ret = transactions[id];
+			transactions.Remove(id);
+			return ret;
+		}
+
+		public void addSession(SessionInfo info)
+		{
+			checkShutdown();
+			sessions.Add(info.SessionId, new SessionState(info));
+		}
+
+		public SessionState removeSession(SessionId id)
+		{
+			SessionState ret = sessions[id];
+			sessions.Remove(id);
+			return ret;
+		}
+
+		public ConnectionInfo Info
+		{
+			get
+			{
+				return info;
+			}
+		}
+
+		public SynchronizedCollection<SessionId> SessionIds
+		{
+			get
+			{
+				return sessions.Keys;
+			}
+		}
+
+		public SynchronizedCollection<DestinationInfo> TempDestinations
+		{
+			get
+			{
+				return tempDestinations;
+			}
+		}
+
+		public SynchronizedCollection<SessionState> SessionStates
+		{
+			get
+			{
+				return sessions.Values;
+			}
+		}
+
+		private void checkShutdown()
+		{
+			if(_shutdown.Value)
+			{
+				throw new ApplicationException("Disposed");
+			}
+		}
+
+		public void shutdown()
+		{
+			if(_shutdown.CompareAndSet(false, true))
+			{
+				foreach(SessionState ss in sessions.Values)
+				{
+					ss.shutdown();
+				}
+			}
+		}
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,656 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.Transport;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+	/// <summary>
+	/// Tracks the state of a connection so a newly established transport can be
+	/// re-initialized to the state that was tracked.
+	/// </summary>
+	public class ConnectionStateTracker : CommandVisitorAdapter
+	{
+
+		private static Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
+
+		protected Dictionary<ConnectionId, ConnectionState> connectionStates = new Dictionary<ConnectionId, ConnectionState>();
+
+		private bool _trackTransactions;
+		private bool _restoreSessions = true;
+		private bool _restoreConsumers = true;
+		private bool _restoreProducers = true;
+		private bool _restoreTransaction = true;
+		private bool _trackMessages = true;
+		private int _maxCacheSize = 256;
+		private int currentCacheSize;
+		private Dictionary<MessageId, Message> messageCache = new Dictionary<MessageId, Message>();
+		private Queue<MessageId> messageCacheFIFO = new Queue<MessageId>();
+
+		protected void RemoveEldestInCache()
+		{
+			System.Collections.ICollection ic = messageCacheFIFO;
+			lock(ic.SyncRoot)
+			{
+				while(messageCacheFIFO.Count > MaxCacheSize)
+				{
+					messageCache.Remove(messageCacheFIFO.Dequeue());
+					currentCacheSize = currentCacheSize - 1;
+				}
+			}
+		}
+
+		private class RemoveTransactionAction : ThreadSimulator
+		{
+			private TransactionInfo info;
+			private ConnectionStateTracker cst;
+
+			public RemoveTransactionAction(TransactionInfo info, ConnectionStateTracker aCst)
+			{
+				this.info = info;
+				this.cst = aCst;
+			}
+
+			public override void Run()
+			{
+				ConnectionId connectionId = info.ConnectionId;
+				ConnectionState cs = cst.connectionStates[connectionId];
+				cs.removeTransactionState(info.TransactionId);
+			}
+		}
+
+		/// <summary>
+		/// </summary>
+		/// <param name="command"></param>
+		/// <returns>null if the command is not state tracked.</returns>
+		public Tracked track(Command command)
+		{
+			try
+			{
+				return (Tracked) command.visit(this);
+			}
+			catch(IOException e)
+			{
+				throw e;
+			}
+			catch(Exception e)
+			{
+				throw new IOException(e.Message);
+			}
+		}
+
+		public void trackBack(Command command)
+		{
+			if(TrackMessages && command != null && command.IsMessage)
+			{
+				Message message = (Message) command;
+				if(message.TransactionId == null)
+				{
+					currentCacheSize = currentCacheSize + 1;
+				}
+			}
+		}
+
+		public void DoRestore(ITransport transport)
+		{
+			// Restore the connections.
+			foreach(ConnectionState connectionState in connectionStates.Values)
+			{
+				transport.Oneway(connectionState.Info);
+				DoRestoreTempDestinations(transport, connectionState);
+
+				if(RestoreSessions)
+				{
+					DoRestoreSessions(transport, connectionState);
+				}
+
+				if(RestoreTransaction)
+				{
+					DoRestoreTransactions(transport, connectionState);
+				}
+			}
+			//now flush messages
+			foreach(Message msg in messageCache.Values)
+			{
+				transport.Oneway(msg);
+			}
+		}
+
+		private void DoRestoreTransactions(ITransport transport, ConnectionState connectionState)
+		{
+			SynchronizedCollection<TransactionState> transactionStates = connectionState.TransactionStates;
+			foreach(TransactionState transactionState in transactionStates)
+			{
+				foreach(Command command in transactionState.Commands)
+				{
+					transport.Oneway(command);
+				}
+			}
+		}
+
+		/// <summary>
+		/// </summary>
+		/// <param name="transport"></param>
+		/// <param name="connectionState"></param>
+		protected void DoRestoreSessions(ITransport transport, ConnectionState connectionState)
+		{
+			// Restore the connection's sessions
+			foreach(SessionState sessionState in connectionState.SessionStates)
+			{
+				transport.Oneway(sessionState.Info);
+
+				if(RestoreProducers)
+				{
+					DoRestoreProducers(transport, sessionState);
+				}
+
+				if(RestoreConsumers)
+				{
+					DoRestoreConsumers(transport, sessionState);
+				}
+			}
+		}
+
+		/// <summary>
+		/// </summary>
+		/// <param name="transport"></param>
+		/// <param name="sessionState"></param>
+		protected void DoRestoreConsumers(ITransport transport, SessionState sessionState)
+		{
+			// Restore the session's consumers
+			foreach(ConsumerState consumerState in sessionState.ConsumerStates)
+			{
+				transport.Oneway(consumerState.Info);
+			}
+		}
+
+		/// <summary>
+		/// </summary>
+		/// <param name="transport"></param>
+		/// <param name="sessionState"></param>
+		protected void DoRestoreProducers(ITransport transport, SessionState sessionState)
+		{
+			// Restore the session's producers
+
+			foreach(ProducerState producerState in sessionState.ProducerStates)
+			{
+				transport.Oneway(producerState.Info);
+			}
+		}
+
+		/// <summary>
+		/// </summary>
+		/// <param name="transport"></param>
+		/// <param name="connectionState"></param>
+		protected void DoRestoreTempDestinations(ITransport transport, ConnectionState connectionState)
+		{
+			// Restore the connection's temp destinations.
+			foreach(DestinationInfo destinationInfo in connectionState.TempDestinations)
+			{
+				transport.Oneway(destinationInfo);
+			}
+		}
+
+		public override Response processAddDestination(DestinationInfo info)
+		{
+			if(info != null)
+			{
+				ConnectionState cs = connectionStates[info.ConnectionId];
+				if(cs != null && info.Destination.IsTemporary)
+				{
+					cs.addTempDestination(info);
+				}
+			}
+			return TRACKED_RESPONSE_MARKER;
+		}
+
+		public override Response processRemoveDestination(DestinationInfo info)
+		{
+			if(info != null)
+			{
+				ConnectionState cs = connectionStates[info.ConnectionId];
+				if(cs != null && info.Destination.IsTemporary)
+				{
+					cs.removeTempDestination(info.Destination);
+				}
+			}
+			return TRACKED_RESPONSE_MARKER;
+		}
+
+		public override Response processAddProducer(ProducerInfo info)
+		{
+			if(info != null && info.ProducerId != null)
+			{
+				SessionId sessionId = info.ProducerId.ParentId;
+				if(sessionId != null)
+				{
+					ConnectionId connectionId = sessionId.ParentId;
+					if(connectionId != null)
+					{
+						ConnectionState cs = connectionStates[connectionId];
+						if(cs != null)
+						{
+							SessionState ss = cs[sessionId];
+							if(ss != null)
+							{
+								ss.addProducer(info);
+							}
+						}
+					}
+				}
+			}
+			return TRACKED_RESPONSE_MARKER;
+		}
+
+		public override Response processRemoveProducer(ProducerId id)
+		{
+			if(id != null)
+			{
+				SessionId sessionId = id.ParentId;
+				if(sessionId != null)
+				{
+					ConnectionId connectionId = sessionId.ParentId;
+					if(connectionId != null)
+					{
+						ConnectionState cs = connectionStates[connectionId];
+						if(cs != null)
+						{
+							SessionState ss = cs[sessionId];
+							if(ss != null)
+							{
+								ss.removeProducer(id);
+							}
+						}
+					}
+				}
+			}
+			return TRACKED_RESPONSE_MARKER;
+		}
+
+		public override Response processAddConsumer(ConsumerInfo info)
+		{
+			if(info != null)
+			{
+				SessionId sessionId = info.ConsumerId.ParentId;
+				if(sessionId != null)
+				{
+					ConnectionId connectionId = sessionId.ParentId;
+					if(connectionId != null)
+					{
+						ConnectionState cs = connectionStates[connectionId];
+						if(cs != null)
+						{
+							SessionState ss = cs[sessionId];
+							if(ss != null)
+							{
+								ss.addConsumer(info);
+							}
+						}
+					}
+				}
+			}
+			return TRACKED_RESPONSE_MARKER;
+		}
+
+		public override Response processRemoveConsumer(ConsumerId id)
+		{
+			if(id != null)
+			{
+				SessionId sessionId = id.ParentId;
+				if(sessionId != null)
+				{
+					ConnectionId connectionId = sessionId.ParentId;
+					if(connectionId != null)
+					{
+						ConnectionState cs = connectionStates[connectionId];
+						if(cs != null)
+						{
+							SessionState ss = cs[sessionId];
+							if(ss != null)
+							{
+								ss.removeConsumer(id);
+							}
+						}
+					}
+				}
+			}
+			return TRACKED_RESPONSE_MARKER;
+		}
+
+		public override Response processAddSession(SessionInfo info)
+		{
+			if(info != null)
+			{
+				ConnectionId connectionId = info.SessionId.ParentId;
+				if(connectionId != null)
+				{
+					ConnectionState cs = connectionStates[connectionId];
+					if(cs != null)
+					{
+						cs.addSession(info);
+					}
+				}
+			}
+			return TRACKED_RESPONSE_MARKER;
+		}
+
+		public override Response processRemoveSession(SessionId id)
+		{
+			if(id != null)
+			{
+				ConnectionId connectionId = id.ParentId;
+				if(connectionId != null)
+				{
+					ConnectionState cs = connectionStates[connectionId];
+					if(cs != null)
+					{
+						cs.removeSession(id);
+					}
+				}
+			}
+			return TRACKED_RESPONSE_MARKER;
+		}
+
+		public override Response processAddConnection(ConnectionInfo info)
+		{
+			if(info != null)
+			{
+				connectionStates.Add(info.ConnectionId, new ConnectionState(info));
+			}
+			return TRACKED_RESPONSE_MARKER;
+		}
+
+		public override Response processRemoveConnection(ConnectionId id)
+		{
+			if(id != null)
+			{
+				connectionStates.Remove(id);
+			}
+			return TRACKED_RESPONSE_MARKER;
+		}
+
+		public override Response processMessage(Message send)
+		{
+			if(send != null)
+			{
+				if(TrackTransactions && send.TransactionId != null)
+				{
+					ConnectionId connectionId = send.ProducerId.ParentId.ParentId;
+					if(connectionId != null)
+					{
+						ConnectionState cs = connectionStates[connectionId];
+						if(cs != null)
+						{
+							TransactionState transactionState = cs[send.TransactionId];
+							if(transactionState != null)
+							{
+								transactionState.addCommand(send);
+							}
+						}
+					}
+					return TRACKED_RESPONSE_MARKER;
+				}
+				else if(TrackMessages)
+				{
+					messageCache.Add(send.MessageId, (Message) send.Clone());
+					RemoveEldestInCache();
+				}
+			}
+			return null;
+		}
+
+		public override Response processMessageAck(MessageAck ack)
+		{
+			if(TrackTransactions && ack != null && ack.TransactionId != null)
+			{
+				ConnectionId connectionId = ack.ConsumerId.ParentId.ParentId;
+				if(connectionId != null)
+				{
+					ConnectionState cs = connectionStates[connectionId];
+					if(cs != null)
+					{
+						TransactionState transactionState = cs[ack.TransactionId];
+						if(transactionState != null)
+						{
+							transactionState.addCommand(ack);
+						}
+					}
+				}
+				return TRACKED_RESPONSE_MARKER;
+			}
+			return null;
+		}
+
+		public override Response processBeginTransaction(TransactionInfo info)
+		{
+			if(TrackTransactions && info != null && info.TransactionId != null)
+			{
+				ConnectionId connectionId = info.ConnectionId;
+				if(connectionId != null)
+				{
+					ConnectionState cs = connectionStates[connectionId];
+					if(cs != null)
+					{
+						cs.addTransactionState(info.TransactionId);
+						TransactionState state = cs[info.TransactionId];
+						state.addCommand(info);
+					}
+				}
+				return TRACKED_RESPONSE_MARKER;
+			}
+			return null;
+		}
+
+		public override Response processPrepareTransaction(TransactionInfo info)
+		{
+			if(TrackTransactions && info != null)
+			{
+				ConnectionId connectionId = info.ConnectionId;
+				if(connectionId != null)
+				{
+					ConnectionState cs = connectionStates[connectionId];
+					if(cs != null)
+					{
+						TransactionState transactionState = cs[info.TransactionId];
+						if(transactionState != null)
+						{
+							transactionState.addCommand(info);
+						}
+					}
+				}
+				return TRACKED_RESPONSE_MARKER;
+			}
+			return null;
+		}
+
+		public override Response processCommitTransactionOnePhase(TransactionInfo info)
+		{
+			if(TrackTransactions && info != null)
+			{
+				ConnectionId connectionId = info.ConnectionId;
+				if(connectionId != null)
+				{
+					ConnectionState cs = connectionStates[connectionId];
+					if(cs != null)
+					{
+						TransactionState transactionState = cs[info.TransactionId];
+						if(transactionState != null)
+						{
+							transactionState.addCommand(info);
+							return new Tracked(new RemoveTransactionAction(info, this));
+						}
+					}
+				}
+			}
+			return null;
+		}
+
+		public override Response processCommitTransactionTwoPhase(TransactionInfo info)
+		{
+			if(TrackTransactions && info != null)
+			{
+				ConnectionId connectionId = info.ConnectionId;
+				if(connectionId != null)
+				{
+					ConnectionState cs = connectionStates[connectionId];
+					if(cs != null)
+					{
+						TransactionState transactionState = cs[info.TransactionId];
+						if(transactionState != null)
+						{
+							transactionState.addCommand(info);
+							return new Tracked(new RemoveTransactionAction(info, this));
+						}
+					}
+				}
+			}
+			return null;
+		}
+
+		public override Response processRollbackTransaction(TransactionInfo info)
+		{
+			if(TrackTransactions && info != null)
+			{
+				ConnectionId connectionId = info.ConnectionId;
+				if(connectionId != null)
+				{
+					ConnectionState cs = connectionStates[connectionId];
+					if(cs != null)
+					{
+						TransactionState transactionState = cs[info.TransactionId];
+						if(transactionState != null)
+						{
+							transactionState.addCommand(info);
+							return new Tracked(new RemoveTransactionAction(info, this));
+						}
+					}
+				}
+			}
+			return null;
+		}
+
+		public override Response processEndTransaction(TransactionInfo info)
+		{
+			if(TrackTransactions && info != null)
+			{
+				ConnectionId connectionId = info.ConnectionId;
+				if(connectionId != null)
+				{
+					ConnectionState cs = connectionStates[connectionId];
+					if(cs != null)
+					{
+						TransactionState transactionState = cs[info.TransactionId];
+						if(transactionState != null)
+						{
+							transactionState.addCommand(info);
+						}
+					}
+				}
+				return TRACKED_RESPONSE_MARKER;
+			}
+			return null;
+		}
+
+		public bool RestoreConsumers
+		{
+			get
+			{
+				return _restoreConsumers;
+			}
+			set
+			{
+				_restoreConsumers = value;
+			}
+		}
+
+		public bool RestoreProducers
+		{
+			get
+			{
+				return _restoreProducers;
+			}
+			set
+			{
+				_restoreProducers = value;
+			}
+		}
+
+		public bool RestoreSessions
+		{
+			get
+			{
+				return _restoreSessions;
+			}
+			set
+			{
+				_restoreSessions = value;
+			}
+		}
+
+		public bool TrackTransactions
+		{
+			get
+			{
+				return _trackTransactions;
+			}
+			set
+			{
+				_trackTransactions = value;
+			}
+		}
+
+		public bool RestoreTransaction
+		{
+			get
+			{
+				return _restoreTransaction;
+			}
+			set
+			{
+				_restoreTransaction = value;
+			}
+		}
+
+		public bool TrackMessages
+		{
+			get
+			{
+				return _trackMessages;
+			}
+			set
+			{
+				_trackMessages = value;
+			}
+		}
+
+		public int MaxCacheSize
+		{
+			get
+			{
+				return _maxCacheSize;
+			}
+			set
+			{
+				_maxCacheSize = value;
+			}
+		}
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConsumerState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConsumerState.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConsumerState.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConsumerState.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+
+	public class ConsumerState
+	{
+		ConsumerInfo info;
+
+		public ConsumerState(ConsumerInfo info)
+		{
+			this.info = info;
+		}
+
+		public override String ToString()
+		{
+			return info.ToString();
+		}
+
+		public ConsumerInfo Info
+		{
+			get
+			{
+				return info;
+			}
+		}
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+	public interface ICommandVisitor
+	{
+
+		Response processAddConnection(ConnectionInfo info);
+
+		Response processAddSession(SessionInfo info);
+
+		Response processAddProducer(ProducerInfo info);
+
+		Response processAddConsumer(ConsumerInfo info);
+
+		Response processRemoveConnection(ConnectionId id);
+
+		Response processRemoveSession(SessionId id);
+
+		Response processRemoveProducer(ProducerId id);
+
+		Response processRemoveConsumer(ConsumerId id);
+
+		Response processAddDestination(DestinationInfo info);
+
+		Response processRemoveDestination(DestinationInfo info);
+
+		Response processRemoveSubscription(RemoveSubscriptionInfo info);
+
+		Response processMessage(Message send);
+
+		Response processMessageAck(MessageAck ack);
+
+		Response processMessagePull(MessagePull pull);
+
+		Response processBeginTransaction(TransactionInfo info);
+
+		Response processPrepareTransaction(TransactionInfo info);
+
+		Response processCommitTransactionOnePhase(TransactionInfo info);
+
+		Response processCommitTransactionTwoPhase(TransactionInfo info);
+
+		Response processRollbackTransaction(TransactionInfo info);
+
+		Response processWireFormat(WireFormatInfo info);
+
+		Response processKeepAlive(KeepAliveInfo info);
+
+		Response processShutdown(ShutdownInfo info);
+
+		Response processFlush(FlushCommand command);
+
+		Response processBrokerInfo(BrokerInfo info);
+
+		Response processRecoverTransactions(TransactionInfo info);
+
+		Response processForgetTransaction(TransactionInfo info);
+
+		Response processEndTransaction(TransactionInfo info);
+
+		Response processMessageDispatchNotification(MessageDispatchNotification notification);
+
+		Response processProducerAck(ProducerAck ack);
+
+		Response processMessageDispatch(MessageDispatch dispatch);
+
+		Response processControlCommand(ControlCommand command);
+
+		Response processConnectionError(ConnectionError error);
+
+		Response processConnectionControl(ConnectionControl control);
+
+		Response processConsumerControl(ConsumerControl control);
+
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ProducerState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ProducerState.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ProducerState.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ProducerState.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+	public class ProducerState
+	{
+		ProducerInfo info;
+
+		public ProducerState(ProducerInfo info)
+		{
+			this.info = info;
+		}
+
+		public override String ToString()
+		{
+			return info.ToString();
+		}
+
+		public ProducerInfo Info
+		{
+			get
+			{
+				return info;
+			}
+		}
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SessionState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SessionState.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SessionState.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SessionState.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+	public class SessionState
+	{
+		SessionInfo info;
+
+		private SynchronizedDictionary<ProducerId, ProducerState> producers = new SynchronizedDictionary<ProducerId, ProducerState>();
+		private SynchronizedDictionary<ConsumerId, ConsumerState> consumers = new SynchronizedDictionary<ConsumerId, ConsumerState>();
+		private AtomicBoolean _shutdown = new AtomicBoolean(false);
+
+		public SessionState(SessionInfo info)
+		{
+			this.info = info;
+		}
+
+		public override String ToString()
+		{
+			return info.ToString();
+		}
+
+		public void addProducer(ProducerInfo info)
+		{
+			checkShutdown();
+			producers.Add(info.ProducerId, new ProducerState(info));
+		}
+
+		public ProducerState removeProducer(ProducerId id)
+		{
+			ProducerState ret = producers[id];
+			producers.Remove(id);
+			return ret;
+		}
+
+		public void addConsumer(ConsumerInfo info)
+		{
+			checkShutdown();
+			consumers.Add(info.ConsumerId, new ConsumerState(info));
+		}
+
+		public ConsumerState removeConsumer(ConsumerId id)
+		{
+			ConsumerState ret = consumers[id];
+			consumers.Remove(id);
+			return ret;
+		}
+
+		public SessionInfo Info
+		{
+			get
+			{
+				return info;
+			}
+		}
+
+		public SynchronizedCollection<ConsumerId> ConsumerIds
+		{
+			get
+			{
+				return consumers.Keys;
+			}
+		}
+
+		public SynchronizedCollection<ProducerId> ProducerIds
+		{
+			get
+			{
+				return producers.Keys;
+			}
+		}
+
+		public SynchronizedCollection<ProducerState> ProducerStates
+		{
+			get
+			{
+				return producers.Values;
+			}
+		}
+
+		public ProducerState getProducerState(ProducerId producerId)
+		{
+			return producers[producerId];
+		}
+
+		public ProducerState this[ProducerId producerId]
+		{
+			get
+			{
+				return producers[producerId];
+			}
+		}
+
+		public SynchronizedCollection<ConsumerState> ConsumerStates
+		{
+			get
+			{
+				return consumers.Values;
+			}
+		}
+
+		public ConsumerState getConsumerState(ConsumerId consumerId)
+		{
+			return consumers[consumerId];
+		}
+
+		public ConsumerState this[ConsumerId consumerId]
+		{
+			get
+			{
+				return consumers[consumerId];
+			}
+		}
+
+		private void checkShutdown()
+		{
+			if(_shutdown.Value)
+			{
+				throw new ApplicationException("Disposed");
+			}
+		}
+
+		public void shutdown()
+		{
+			_shutdown.Value = false;
+		}
+
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SynchronizedObjects.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SynchronizedObjects.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SynchronizedObjects.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/SynchronizedObjects.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+using System.Collections.Generic;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+	public class SynchronizedCollection<TValue>
+		where TValue : class
+	{
+		private Object myLock = new Object();
+		private ArrayList _collection;
+
+		public SynchronizedCollection()
+		{
+			_collection = new ArrayList();
+		}
+
+		public SynchronizedCollection(ICollection c)
+		{
+			_collection = new ArrayList(c);
+		}
+
+		public int Count
+		{
+			get
+			{
+				lock(myLock)
+				{
+					return _collection.Count;
+				}
+			}
+		}
+
+		public bool IsReadOnly
+		{
+			get
+			{
+				return false;
+			}
+		}
+
+		public int Add(TValue v)
+		{
+			lock(myLock)
+			{
+				return _collection.Add(v);
+			}
+		}
+
+		public void Clear()
+		{
+			lock(myLock)
+			{
+				_collection.Clear();
+			}
+		}
+
+		public bool Contains(TValue v)
+		{
+			lock(myLock)
+			{
+				return _collection.Contains(v);
+			}
+		}
+
+		public void CopyTo(TValue[] a, int index)
+		{
+			lock(myLock)
+			{
+				_collection.CopyTo(a, index);
+			}
+		}
+
+		public void Remove(TValue v)
+		{
+			lock(myLock)
+			{
+				_collection.Remove(v);
+			}
+		}
+
+		public void RemoveAt(int index)
+		{
+			lock(myLock)
+			{
+				_collection.RemoveAt(index);
+			}
+		}
+
+		public TValue this[int index]
+		{
+			get
+			{
+				TValue ret;
+				lock(myLock)
+				{
+					ret = (TValue) _collection[index];
+				}
+				return (TValue) ret;
+			}
+			set
+			{
+				lock(myLock)
+				{
+					_collection[index] = value;
+				}
+			}
+		}
+
+		public IEnumerator GetEnumerator()
+		{
+			lock(myLock)
+			{
+				return _collection.GetEnumerator();
+			}
+		}
+
+		public IEnumerator GetEnumerator(int index, int count)
+		{
+			lock(myLock)
+			{
+				return _collection.GetEnumerator(index, count);
+			}
+		}
+
+	}
+
+	public class SynchronizedDictionary<TKey, TValue>
+		where TKey : class
+		where TValue : class
+	{
+		private Object myLock = new Object();
+		private Dictionary<TKey, TValue> _dictionary = new Dictionary<TKey, TValue>();
+
+		public void Clear()
+		{
+			_dictionary.Clear();
+		}
+
+		public TValue this[TKey key]
+		{
+			get
+			{
+				TValue ret;
+				lock(myLock)
+				{
+					ret = _dictionary[key];
+				}
+				return ret;
+			}
+			set
+			{
+				lock(myLock)
+				{
+					_dictionary[key] = value;
+				}
+			}
+		}
+
+		public SynchronizedCollection<TKey> Keys
+		{
+			get
+			{
+				lock(myLock)
+				{
+					return new SynchronizedCollection<TKey>(_dictionary.Keys);
+				}
+			}
+		}
+
+		public SynchronizedCollection<TValue> Values
+		{
+			get
+			{
+				lock(myLock)
+				{
+					return new SynchronizedCollection<TValue>(_dictionary.Values);
+				}
+			}
+		}
+
+		public void Add(TKey k, TValue v)
+		{
+			lock(myLock)
+			{
+				_dictionary.Add(k, v);
+			}
+		}
+
+		public bool Remove(TKey v)
+		{
+			lock(myLock)
+			{
+				return _dictionary.Remove(v);
+			}
+		}
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ThreadSimulator.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ThreadSimulator.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ThreadSimulator.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ThreadSimulator.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+	public abstract class ThreadSimulator
+	{
+		public virtual void Run()
+		{
+			throw new ApplicationException("ThreadSimulator.Run() should be overridden.");
+		}
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/Tracked.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/Tracked.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/Tracked.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/Tracked.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+	public class Tracked : Response
+	{
+
+		private ThreadSimulator runnable = null;
+
+		public Tracked(ThreadSimulator runnable)
+		{
+			this.runnable = runnable;
+		}
+
+		public void onResponses()
+		{
+			if(runnable != null)
+			{
+				runnable.Run();
+				runnable = null;
+			}
+		}
+
+		virtual public bool WaitingForResponse
+		{
+			get
+			{
+				return runnable != null;
+			}
+		}
+
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/TransactionState.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/TransactionState.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/TransactionState.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/TransactionState.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.State
+{
+	public class TransactionState
+	{
+
+		private List<Command> commands = new List<Command>();
+		private TransactionId id;
+		private AtomicBoolean _shutdown = new AtomicBoolean(false);
+		private bool prepared;
+		private int preparedResult;
+
+		public TransactionState(TransactionId id)
+		{
+			this.id = id;
+		}
+
+		public override String ToString()
+		{
+			return id.ToString();
+		}
+
+		public void addCommand(Command operation)
+		{
+			checkShutdown();
+			commands.Add(operation);
+		}
+
+		public List<Command> Commands
+		{
+			get
+			{
+				return commands;
+			}
+		}
+
+		private void checkShutdown()
+		{
+			if(_shutdown.Value)
+			{
+				throw new ApplicationException("Disposed");
+			}
+		}
+
+		public void shutdown()
+		{
+			_shutdown.Value = false;
+		}
+
+		public TransactionId getId()
+		{
+			return id;
+		}
+
+		public bool Prepared
+		{
+			get
+			{
+				return prepared;
+			}
+			set
+			{
+				prepared = value;
+			}
+		}
+
+		public int PreparedResult
+		{
+			get
+			{
+				return preparedResult;
+			}
+			set
+			{
+				preparedResult = value;
+			}
+		}
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DefaultThreadPools.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DefaultThreadPools.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DefaultThreadPools.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/DefaultThreadPools.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.ActiveMQ.Threads
+{
+	public class DefaultThreadPools
+	{
+		/***
+		 * Java's execution model is different enough that I have left out
+		 * the Executure concept in this implementation. This must be
+		 * reviewed to see what is appropriate for the future.
+		 * -Allan Schrum
+		private static Executor DEFAULT_POOL = null;
+		static {
+		DEFAULT_POOL = new ScheduledThreadPoolExecutor(5, new ThreadFactory()
+						{
+							public Thread newThread(Runnable runnable)
+							{
+								Thread thread = new Thread(runnable, "ActiveMQ Default Thread Pool Thread");
+								thread.setDaemon(true);
+								return thread;
+							}
+						});
+		}    
+
+		public static Executor DefaultPool
+		{
+			get { return DEFAULT_POOL; }
+		}
+		***/
+
+		private static TaskRunnerFactory DEFAULT_TASK_RUNNER_FACTORY = new TaskRunnerFactory();
+
+		private DefaultThreadPools()
+		{
+		}
+
+		public static TaskRunnerFactory DefaultTaskRunnerFactory
+		{
+			get
+			{
+				return DEFAULT_TASK_RUNNER_FACTORY;
+			}
+		}
+
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/PooledTaskRunner.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Threading;
+
+namespace Apache.NMS.ActiveMQ.Threads
+{
+	class PooledTaskRunner : TaskRunner
+	{
+		private int maxIterationsPerRun;
+		private Task task;
+		private Object runable = new Object();
+		private bool queued;
+		private bool _shutdown;
+		private bool iterating;
+		private volatile System.Threading.Thread runningThread;
+
+		public void run(Object o)
+		{
+			PooledTaskRunner p = o as PooledTaskRunner;
+			p.runningThread = System.Threading.Thread.CurrentThread;
+			try
+			{
+				p.runTask();
+			}
+			finally
+			{
+				p.runningThread = null;
+			}
+		}
+
+		public PooledTaskRunner(Task task, int maxIterationsPerRun)
+		{
+			this.maxIterationsPerRun = maxIterationsPerRun;
+			this.task = task;
+			this._shutdown = false;
+			this.iterating = false;
+			this.queued = true;
+			ThreadPool.QueueUserWorkItem(new WaitCallback(run), this);
+		}
+
+		/**
+		 * We Expect MANY wakeup calls on the same TaskRunner.
+		 */
+		public void wakeup()
+		{
+			lock(runable)
+			{
+
+				// When we get in here, we make some assumptions of state:
+				// queued=false, iterating=false: wakeup() has not be called and
+				// therefore task is not executing.
+				// queued=true, iterating=false: wakeup() was called but, task
+				// execution has not started yet
+				// queued=false, iterating=true : wakeup() was called, which caused
+				// task execution to start.
+				// queued=true, iterating=true : wakeup() called after task
+				// execution was started.
+
+				if(queued || _shutdown)
+				{
+					return;
+				}
+
+				queued = true;
+
+				// The runTask() method will do this for me once we are done
+				// iterating.
+				if(!iterating)
+				{
+					ThreadPool.QueueUserWorkItem(new WaitCallback(run), this);
+				}
+			}
+		}
+
+		/**
+		 * shut down the task
+		 *
+		 */
+		public void shutdown(int timeout)
+		{
+			lock(runable)
+			{
+				_shutdown = true;
+				// the check on the thread is done
+				// because a call to iterate can result in
+				// shutDown() being called, which would wait forever
+				// waiting for iterating to finish
+				if(runningThread != System.Threading.Thread.CurrentThread)
+				{
+					if(iterating)
+					{
+						System.Threading.Thread.Sleep(timeout);
+					}
+				}
+			}
+		}
+
+		public void shutdown()
+		{
+			shutdown(0);
+		}
+
+		void runTask()
+		{
+
+			lock(runable)
+			{
+				queued = false;
+				if(_shutdown)
+				{
+					iterating = false;
+					return;
+				}
+				iterating = true;
+			}
+
+			// Don't synchronize while we are iterating so that
+			// multiple wakeup() calls can be executed concurrently.
+			bool done = false;
+			try
+			{
+				for(int i = 0; i < maxIterationsPerRun; i++)
+				{
+					if(!task.iterate())
+					{
+						done = true;
+						break;
+					}
+				}
+			}
+			finally
+			{
+				lock(runable)
+				{
+					iterating = false;
+					if(_shutdown)
+					{
+						queued = false;
+					}
+					else
+					{
+						// If we could not iterate all the items
+						// then we need to re-queue.
+						if(!done)
+						{
+							queued = true;
+						}
+
+						if(queued)
+						{
+							ThreadPool.QueueUserWorkItem(new WaitCallback(run), this);
+						}
+					}
+				}
+			}
+		}
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/Task.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.ActiveMQ.Threads
+{
+	/// <summary>
+	/// Represents a task that may take a few iterations to complete.
+	/// </summary>
+	public interface Task
+	{
+		bool iterate();
+	}
+}

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs?rev=707747&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Threads/TaskRunner.cs Fri Oct 24 14:10:22 2008
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+namespace Apache.NMS.ActiveMQ.Threads
+{
+	/// <summary>
+	/// Allows you to request a thread execute the associated Task.
+	/// </summary>
+	public interface TaskRunner
+	{
+		void wakeup();
+		void shutdown();
+		void shutdown(int timeout);
+	}
+}



Mime
View raw message