activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jgo...@apache.org
Subject svn commit: r980561 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport: Failover/FailoverTransport.cs ICompositeTransport.cs ITransport.cs
Date Thu, 29 Jul 2010 20:22:55 GMT
Author: jgomes
Date: Thu Jul 29 20:22:54 2010
New Revision: 980561

URL: http://svn.apache.org/viewvc?rev=980561&view=rev
Log:
Changed use of HashSet to Dictionary.  HashSet is a .NET 3.5 only class.
Fixed XML comment errors.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=980561&r1=980560&r2=980561&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs Thu Jul 29 20:22:54 2010
@@ -26,719 +26,719 @@ using Apache.NMS.Util;
 
 namespace Apache.NMS.ActiveMQ.Transport.Failover
 {
-    /// <summary>
-    /// A Transport that is made reliable by being able to fail over to another
-    /// transport when a transport failure is detected.
-    /// </summary>
-    public class FailoverTransport : ICompositeTransport, IComparable
-    {
-        private static int idCounter = 0;
-        private int id;
-
-        private bool disposed;
-        private bool connected;
-        private List<Uri> uris = new List<Uri>();
+	/// <summary>
+	/// A Transport that is made reliable by being able to fail over to another
+	/// transport when a transport failure is detected.
+	/// </summary>
+	public class FailoverTransport : ICompositeTransport, IComparable
+	{
+		private static int idCounter = 0;
+		private int id;
+
+		private bool disposed;
+		private bool connected;
+		private List<Uri> uris = new List<Uri>();
 		private List<Uri> updated = new List<Uri>();
-        private CommandHandler commandHandler;
-        private ExceptionHandler exceptionHandler;
-        private InterruptedHandler interruptedHandler;
-        private ResumedHandler resumedHandler;
-
-        private Mutex reconnectMutex = new Mutex();
-        private Mutex backupMutex = new Mutex();
-        private Mutex sleepMutex = new Mutex();
-        private ConnectionStateTracker stateTracker = new ConnectionStateTracker();
-        private Dictionary<int, Command> requestMap = new Dictionary<int, Command>();
-
-        private Uri connectedTransportURI;
-        private Uri failedConnectTransportURI;
-        private AtomicReference<ITransport> connectedTransport = new AtomicReference<ITransport>(null);
-        private TaskRunner reconnectTask = null;
-        private bool started;
-
-        private int timeout = -1;
-        private int initialReconnectDelay = 10;
-        private int maxReconnectDelay = 1000 * 30;
-        private int backOffMultiplier = 2;
-        private bool useExponentialBackOff = true;
-        private bool randomize = true;
-        private bool initialized;
-        private int maxReconnectAttempts;
-        private int connectFailures;
-        private int reconnectDelay = 10;
-        private int asyncTimeout = 45000;
-        private bool asyncConnect = false;        
-        private Exception connectionFailure;
-        private bool firstConnection = true;
-        private bool backup = false;
-        private List<BackupTransport> backups = new List<BackupTransport>();
-        private int backupPoolSize = 1;
-        private bool trackMessages = false;
-        private int maxCacheSize = 256;
-        private volatile Exception failure;
-        private readonly object mutex = new object();
+		private CommandHandler commandHandler;
+		private ExceptionHandler exceptionHandler;
+		private InterruptedHandler interruptedHandler;
+		private ResumedHandler resumedHandler;
+
+		private Mutex reconnectMutex = new Mutex();
+		private Mutex backupMutex = new Mutex();
+		private Mutex sleepMutex = new Mutex();
+		private ConnectionStateTracker stateTracker = new ConnectionStateTracker();
+		private Dictionary<int, Command> requestMap = new Dictionary<int, Command>();
+
+		private Uri connectedTransportURI;
+		private Uri failedConnectTransportURI;
+		private AtomicReference<ITransport> connectedTransport = new AtomicReference<ITransport>(null);
+		private TaskRunner reconnectTask = null;
+		private bool started;
+
+		private int timeout = -1;
+		private int initialReconnectDelay = 10;
+		private int maxReconnectDelay = 1000 * 30;
+		private int backOffMultiplier = 2;
+		private bool useExponentialBackOff = true;
+		private bool randomize = true;
+		private bool initialized;
+		private int maxReconnectAttempts;
+		private int connectFailures;
+		private int reconnectDelay = 10;
+		private int asyncTimeout = 45000;
+		private bool asyncConnect = false;
+		private Exception connectionFailure;
+		private bool firstConnection = true;
+		private bool backup = false;
+		private List<BackupTransport> backups = new List<BackupTransport>();
+		private int backupPoolSize = 1;
+		private bool trackMessages = false;
+		private int maxCacheSize = 256;
+		private volatile Exception failure;
+		private readonly object mutex = new object();
 		private bool reconnectSupported = true;
 		private bool updateURIsSupported = true;
-		
-        public FailoverTransport()
-        {
-            id = idCounter++;
-
-            stateTracker.TrackTransactions = true;
-        }
-
-        ~FailoverTransport()
-        {
-            Dispose(false);
-        }
-        
-        #region FailoverTask
-        
-        private class FailoverTask : Task
-        {
-            private FailoverTransport parent;
-
-            public FailoverTask(FailoverTransport p)
-            {
-                parent = p;
-            }
-
-            public bool Iterate()
-            {
-                bool result = false;
-                bool buildBackup = true;
-                bool doReconnect = !parent.disposed && parent.connectionFailure == null;
-                try
-                {
-                    parent.backupMutex.WaitOne();
-                    if(parent.ConnectedTransport == null && doReconnect)
-                    {
-                        result = parent.DoConnect();
-                        buildBackup = false;
-                    }
-                }
-                finally
-                {
-                    parent.backupMutex.ReleaseMutex();
-                }
-
-                if(buildBackup)
-                {
-                    parent.BuildBackups();
-                }
-                else
-                {
-                    //build backups on the next iteration
-                    result = true;
-                    try
-                    {
-                        parent.reconnectTask.Wakeup();
-                    }
-                    catch(ThreadInterruptedException)
-                    {
-                        Tracer.Debug("Reconnect task has been interrupted.");
-                    }
-                }
-                return result;
-            }
-        }
-
-        #endregion
-
-        #region Property Accessors
-
-        public CommandHandler Command
-        {
-            get { return commandHandler; }
-            set { commandHandler = value; }
-        }
-
-        public ExceptionHandler Exception
-        {
-            get { return exceptionHandler; }
-            set { exceptionHandler = value; }
-        }
-
-        public InterruptedHandler Interrupted
-        {
-            get { return interruptedHandler; }
-            set { this.interruptedHandler = value; }
-        }
-
-        public ResumedHandler Resumed
-        {
-            get { return resumedHandler; }
-            set { this.resumedHandler = value; }
-        }
-
-        internal Exception Failure
-        {
-            get{ return failure; }
-            set
-            {
-                lock(mutex)
-                {
-                    failure = value;
-                }
-            }
-        }
-
-        public int Timeout
-        {
-            get { return this.timeout; }
-            set { this.timeout = value; }
-        }
-
-        public int InitialReconnectDelay
-        {
-            get { return initialReconnectDelay; }
-            set { initialReconnectDelay = value; }
-        }
-
-        public int MaxReconnectDelay
-        {
-            get { return maxReconnectDelay; }
-            set { maxReconnectDelay = value; }
-        }
-
-        public int ReconnectDelay
-        {
-            get { return reconnectDelay; }
-            set { reconnectDelay = value; }
-        }
-
-        public int ReconnectDelayExponent
-        {
-            get { return backOffMultiplier; }
-            set { backOffMultiplier = value; }
-        }
-
-        public ITransport ConnectedTransport
-        {
-            get { return connectedTransport.Value; }
-            set { connectedTransport.Value = value; }
-        }
-
-        public Uri ConnectedTransportURI
-        {
-            get { return connectedTransportURI; }
-            set { connectedTransportURI = value; }
-        }
-
-        public int MaxReconnectAttempts
-        {
-            get { return maxReconnectAttempts; }
-            set { maxReconnectAttempts = value; }
-        }
-
-        public bool Randomize
-        {
-            get { return randomize; }
-            set { randomize = value; }
-        }
-
-        public bool Backup
-        {
-            get { return backup; }
-            set { backup = value; }
-        }
-
-        public int BackupPoolSize
-        {
-            get { return backupPoolSize; }
-            set { backupPoolSize = value; }
-        }
-
-        public bool TrackMessages
-        {
-            get { return trackMessages; }
-            set { trackMessages = value; }
-        }
-
-        public int MaxCacheSize
-        {
-            get { return maxCacheSize; }
-            set { maxCacheSize = value; }
-        }
-
-        public bool UseExponentialBackOff
-        {
-            get { return useExponentialBackOff; }
-            set { useExponentialBackOff = value; }
-        }
-
-        /// <summary>
-        /// Gets or sets a value indicating whether to asynchronously connect to sockets
-        /// </summary>
-        /// <value><c>true</c> if [async connect]; otherwise, <c>false</c>.</value>
-        public bool AsyncConnect
-        {
-            set{ asyncConnect = value; }
-        }
-
-        /// <summary>
-        /// If doing an asynchronous connect, the milliseconds before timing out if no connection can be made
-        /// </summary>
-        /// <value>The async timeout.</value>
-        public int AsyncTimeout
-        {
-            set{ asyncTimeout = value; }
-        }
-
-        #endregion
-        
-        public bool IsFaultTolerant
-        {
-            get { return true; }
-        }
-
-        public bool IsDisposed
-        {
-            get { return disposed; }
-        }
-
-        public bool IsConnected
-        {
-            get { return connected; }
-        }
-
-        public bool IsStarted
-        {
-            get { return started; }
-        }
-
-	    public bool IsReconnectSupported
-		{
-			get{ return this.reconnectSupported; }
-		}
-
-	    public bool IsUpdateURIsSupported
-		{
-			get{ return this.updateURIsSupported; }
-		}
-		
-        /// <summary>
-        /// </summary>
-        /// <param name="command"></param>
-        /// <returns>Returns true if the command is one sent when a connection is being closed.</returns>
-        private bool IsShutdownCommand(Command command)
-        {
-            return (command != null && (command.IsShutdownInfo || command is RemoveInfo));
-        }
-        
-        public void OnException(ITransport sender, Exception error)
-        {
-            try
-            {
-                HandleTransportFailure(error);
-            }
-            catch(Exception e)
-            {
-                e.GetType();
-                // What to do here?
-            }
-        }
-
-        public void disposedOnCommand(ITransport sender, Command c)
-        {
-        }
-
-        public void disposedOnException(ITransport sender, Exception e)
-        {
-        }
-
-        public void HandleTransportFailure(Exception e)
-        {
-            ITransport transport = connectedTransport.GetAndSet(null);
-            if(transport != null)
-            {
-                transport.Command = new CommandHandler(disposedOnCommand);
-                transport.Exception = new ExceptionHandler(disposedOnException);
-                try
-                {
-                    transport.Stop();
-                }
-                catch(Exception ex)
-                {
-                    ex.GetType();	// Ignore errors but this lets us see the error during debugging
-                }
-
-                lock(reconnectMutex)
-                {
-                    bool reconnectOk = false;
-                    if(started)
-                    {
-                        Tracer.WarnFormat("Transport failed to {0}, attempting to automatically reconnect due to: {1}", ConnectedTransportURI.ToString(), e.Message);
-                        reconnectOk = true;
-                    }
 
-                    initialized = false;
-                    failedConnectTransportURI = ConnectedTransportURI;
-                    ConnectedTransportURI = null;
-                    connected = false;
-					
+		public FailoverTransport()
+		{
+			id = idCounter++;
+
+			stateTracker.TrackTransactions = true;
+		}
+
+		~FailoverTransport()
+		{
+			Dispose(false);
+		}
+
+		#region FailoverTask
+
+		private class FailoverTask : Task
+		{
+			private FailoverTransport parent;
+
+			public FailoverTask(FailoverTransport p)
+			{
+				parent = p;
+			}
+
+			public bool Iterate()
+			{
+				bool result = false;
+				bool buildBackup = true;
+				bool doReconnect = !parent.disposed && parent.connectionFailure == null;
+				try
+				{
+					parent.backupMutex.WaitOne();
+					if(parent.ConnectedTransport == null && doReconnect)
+					{
+						result = parent.DoConnect();
+						buildBackup = false;
+					}
+				}
+				finally
+				{
+					parent.backupMutex.ReleaseMutex();
+				}
+
+				if(buildBackup)
+				{
+					parent.BuildBackups();
+				}
+				else
+				{
+					//build backups on the next iteration
+					result = true;
+					try
+					{
+						parent.reconnectTask.Wakeup();
+					}
+					catch(ThreadInterruptedException)
+					{
+						Tracer.Debug("Reconnect task has been interrupted.");
+					}
+				}
+				return result;
+			}
+		}
+
+		#endregion
+
+		#region Property Accessors
+
+		public CommandHandler Command
+		{
+			get { return commandHandler; }
+			set { commandHandler = value; }
+		}
+
+		public ExceptionHandler Exception
+		{
+			get { return exceptionHandler; }
+			set { exceptionHandler = value; }
+		}
+
+		public InterruptedHandler Interrupted
+		{
+			get { return interruptedHandler; }
+			set { this.interruptedHandler = value; }
+		}
+
+		public ResumedHandler Resumed
+		{
+			get { return resumedHandler; }
+			set { this.resumedHandler = value; }
+		}
+
+		internal Exception Failure
+		{
+			get { return failure; }
+			set
+			{
+				lock(mutex)
+				{
+					failure = value;
+				}
+			}
+		}
+
+		public int Timeout
+		{
+			get { return this.timeout; }
+			set { this.timeout = value; }
+		}
+
+		public int InitialReconnectDelay
+		{
+			get { return initialReconnectDelay; }
+			set { initialReconnectDelay = value; }
+		}
+
+		public int MaxReconnectDelay
+		{
+			get { return maxReconnectDelay; }
+			set { maxReconnectDelay = value; }
+		}
+
+		public int ReconnectDelay
+		{
+			get { return reconnectDelay; }
+			set { reconnectDelay = value; }
+		}
+
+		public int ReconnectDelayExponent
+		{
+			get { return backOffMultiplier; }
+			set { backOffMultiplier = value; }
+		}
+
+		public ITransport ConnectedTransport
+		{
+			get { return connectedTransport.Value; }
+			set { connectedTransport.Value = value; }
+		}
+
+		public Uri ConnectedTransportURI
+		{
+			get { return connectedTransportURI; }
+			set { connectedTransportURI = value; }
+		}
+
+		public int MaxReconnectAttempts
+		{
+			get { return maxReconnectAttempts; }
+			set { maxReconnectAttempts = value; }
+		}
+
+		public bool Randomize
+		{
+			get { return randomize; }
+			set { randomize = value; }
+		}
+
+		public bool Backup
+		{
+			get { return backup; }
+			set { backup = value; }
+		}
+
+		public int BackupPoolSize
+		{
+			get { return backupPoolSize; }
+			set { backupPoolSize = value; }
+		}
+
+		public bool TrackMessages
+		{
+			get { return trackMessages; }
+			set { trackMessages = value; }
+		}
+
+		public int MaxCacheSize
+		{
+			get { return maxCacheSize; }
+			set { maxCacheSize = value; }
+		}
+
+		public bool UseExponentialBackOff
+		{
+			get { return useExponentialBackOff; }
+			set { useExponentialBackOff = value; }
+		}
+
+		/// <summary>
+		/// Gets or sets a value indicating whether to asynchronously connect to sockets
+		/// </summary>
+		/// <value><c>true</c> if [async connect]; otherwise, <c>false</c>.</value>
+		public bool AsyncConnect
+		{
+			set { asyncConnect = value; }
+		}
+
+		/// <summary>
+		/// If doing an asynchronous connect, the milliseconds before timing out if no connection can be made
+		/// </summary>
+		/// <value>The async timeout.</value>
+		public int AsyncTimeout
+		{
+			set { asyncTimeout = value; }
+		}
+
+		#endregion
+
+		public bool IsFaultTolerant
+		{
+			get { return true; }
+		}
+
+		public bool IsDisposed
+		{
+			get { return disposed; }
+		}
+
+		public bool IsConnected
+		{
+			get { return connected; }
+		}
+
+		public bool IsStarted
+		{
+			get { return started; }
+		}
+
+		public bool IsReconnectSupported
+		{
+			get { return this.reconnectSupported; }
+		}
+
+		public bool IsUpdateURIsSupported
+		{
+			get { return this.updateURIsSupported; }
+		}
+
+		/// <summary>
+		/// </summary>
+		/// <param name="command"></param>
+		/// <returns>Returns true if the command is one sent when a connection is being closed.</returns>
+		private bool IsShutdownCommand(Command command)
+		{
+			return (command != null && (command.IsShutdownInfo || command is RemoveInfo));
+		}
+
+		public void OnException(ITransport sender, Exception error)
+		{
+			try
+			{
+				HandleTransportFailure(error);
+			}
+			catch(Exception e)
+			{
+				e.GetType();
+				// What to do here?
+			}
+		}
+
+		public void disposedOnCommand(ITransport sender, Command c)
+		{
+		}
+
+		public void disposedOnException(ITransport sender, Exception e)
+		{
+		}
+
+		public void HandleTransportFailure(Exception e)
+		{
+			ITransport transport = connectedTransport.GetAndSet(null);
+			if(transport != null)
+			{
+				transport.Command = new CommandHandler(disposedOnCommand);
+				transport.Exception = new ExceptionHandler(disposedOnException);
+				try
+				{
+					transport.Stop();
+				}
+				catch(Exception ex)
+				{
+					ex.GetType();	// Ignore errors but this lets us see the error during debugging
+				}
+
+				lock(reconnectMutex)
+				{
+					bool reconnectOk = false;
+					if(started)
+					{
+						Tracer.WarnFormat("Transport failed to {0}, attempting to automatically reconnect due to: {1}", ConnectedTransportURI.ToString(), e.Message);
+						reconnectOk = true;
+					}
+
+					initialized = false;
+					failedConnectTransportURI = ConnectedTransportURI;
+					ConnectedTransportURI = null;
+					connected = false;
+
 					stateTracker.TransportInterrupted();
-					
-	                if(this.Interrupted != null)
-	                {
-	                    this.Interrupted(transport);
-	                }
-					
-                    if(reconnectOk)
-                    {
-                        reconnectTask.Wakeup();
-                    }
+
+					if(this.Interrupted != null)
+					{
+						this.Interrupted(transport);
+					}
+
+					if(reconnectOk)
+					{
+						reconnectTask.Wakeup();
+					}
 				}
-            }
-        }
+			}
+		}
 
-        public void Start()
-        {
-            lock(reconnectMutex)
-            {
-                if(started)
-                {
-                    Tracer.Debug("FailoverTransport Already Started.");
-                    return;
-                }
-
-                Tracer.Debug("FailoverTransport Started.");
-                started = true;
-                stateTracker.MaxCacheSize = MaxCacheSize;
-                stateTracker.TrackMessages = TrackMessages;
-                if(ConnectedTransport != null)
-                {
-                    stateTracker.DoRestore(ConnectedTransport);
-                }
-                else
-                {
-                    Reconnect(false);
-                }
-            }
-        }
-
-        public virtual void Stop()
-        {
-            ITransport transportToStop = null;
-
-            lock(reconnectMutex)
-            {
-                if(!started)
-                {
-                    Tracer.Debug("FailoverTransport Already Stopped.");
-                    return;
-                }
-
-                Tracer.Debug("FailoverTransport Stopped.");
-                started = false;
-                disposed = true;
-                connected = false;
-                foreach(BackupTransport t in backups)
-                {
-                    t.Disposed = true;
-                }
-                backups.Clear();
-
-                if(ConnectedTransport != null)
-                {
-                    transportToStop = connectedTransport.GetAndSet(null);
-                }
-            }
-
-            try
-            {
-                sleepMutex.WaitOne();
-            }
-            finally
-            {
-                sleepMutex.ReleaseMutex();
-            }
-
-            if(reconnectTask != null)
-            {
-                reconnectTask.Shutdown();
-            }
-
-            if(transportToStop != null)
-            {
-                transportToStop.Stop();
-            }
-        }
-
-        public FutureResponse AsyncRequest(Command command)
-        {
-            throw new ApplicationException("FailoverTransport does not implement AsyncRequest(Command)");
-        }
-
-        public Response Request(Command command)
-        {
-            throw new ApplicationException("FailoverTransport does not implement Request(Command)");
-        }
-
-        public Response Request(Command command, TimeSpan ts)
-        {
-            throw new ApplicationException("FailoverTransport does not implement Request(Command, TimeSpan)");
-        }
-        
-        public void OnCommand(ITransport sender, Command command)
-        {
-            if(command != null)
-            {
-                if(command.IsResponse)
-                {
-                    Object oo = null;
-                    lock(((ICollection) requestMap).SyncRoot)
-                    {
-                        int v = ((Response) command).CorrelationId;
-                        try
-                        {
-                            if(requestMap.ContainsKey(v))
-                            {
-                                oo = requestMap[v];
-                                requestMap.Remove(v);
-                            }
-                        }
-                        catch
-                        {
-                        }
-                    }
+		public void Start()
+		{
+			lock(reconnectMutex)
+			{
+				if(started)
+				{
+					Tracer.Debug("FailoverTransport Already Started.");
+					return;
+				}
 
-                    Tracked t = oo as Tracked;
-                    if(t != null)
-                    {
-                        t.onResponses();
-                    }
-                }
+				Tracer.Debug("FailoverTransport Started.");
+				started = true;
+				stateTracker.MaxCacheSize = MaxCacheSize;
+				stateTracker.TrackMessages = TrackMessages;
+				if(ConnectedTransport != null)
+				{
+					stateTracker.DoRestore(ConnectedTransport);
+				}
+				else
+				{
+					Reconnect(false);
+				}
+			}
+		}
+
+		public virtual void Stop()
+		{
+			ITransport transportToStop = null;
+
+			lock(reconnectMutex)
+			{
+				if(!started)
+				{
+					Tracer.Debug("FailoverTransport Already Stopped.");
+					return;
+				}
+
+				Tracer.Debug("FailoverTransport Stopped.");
+				started = false;
+				disposed = true;
+				connected = false;
+				foreach(BackupTransport t in backups)
+				{
+					t.Disposed = true;
+				}
+				backups.Clear();
+
+				if(ConnectedTransport != null)
+				{
+					transportToStop = connectedTransport.GetAndSet(null);
+				}
+			}
+
+			try
+			{
+				sleepMutex.WaitOne();
+			}
+			finally
+			{
+				sleepMutex.ReleaseMutex();
+			}
+
+			if(reconnectTask != null)
+			{
+				reconnectTask.Shutdown();
+			}
+
+			if(transportToStop != null)
+			{
+				transportToStop.Stop();
+			}
+		}
+
+		public FutureResponse AsyncRequest(Command command)
+		{
+			throw new ApplicationException("FailoverTransport does not implement AsyncRequest(Command)");
+		}
+
+		public Response Request(Command command)
+		{
+			throw new ApplicationException("FailoverTransport does not implement Request(Command)");
+		}
+
+		public Response Request(Command command, TimeSpan ts)
+		{
+			throw new ApplicationException("FailoverTransport does not implement Request(Command, TimeSpan)");
+		}
+
+		public void OnCommand(ITransport sender, Command command)
+		{
+			if(command != null)
+			{
+				if(command.IsResponse)
+				{
+					Object oo = null;
+					lock(((ICollection) requestMap).SyncRoot)
+					{
+						int v = ((Response) command).CorrelationId;
+						try
+						{
+							if(requestMap.ContainsKey(v))
+							{
+								oo = requestMap[v];
+								requestMap.Remove(v);
+							}
+						}
+						catch
+						{
+						}
+					}
+
+					Tracked t = oo as Tracked;
+					if(t != null)
+					{
+						t.onResponses();
+					}
+				}
+
+				if(!initialized)
+				{
+					initialized = true;
+				}
 
-                if(!initialized)
-                {
-                    initialized = true;
-                }
-				
 				if(command.IsConnectionControl)
 				{
 					this.HandleConnectionControl(command as ConnectionControl);
 				}
-            }
+			}
 
-            this.Command(sender, command);
-        }
+			this.Command(sender, command);
+		}
 
-        public void Oneway(Command command)
-        {
-            Exception error = null;
-            
-            lock(reconnectMutex)
-            {
-                if(IsShutdownCommand(command) && ConnectedTransport == null)
-                {
-                    if(command.IsShutdownInfo)
-                    {
-                        // Skipping send of ShutdownInfo command when not connected.
-                        return;
-                    }
+		public void Oneway(Command command)
+		{
+			Exception error = null;
 
-                    if(command is RemoveInfo)
-                    {
-                        // Simulate response to RemoveInfo command
-                        Response response = new Response();
-                        response.CorrelationId = command.CommandId;
-                        OnCommand(this, response);
-                        return;
-                    }
-                }
-                
-                // Keep trying until the message is sent.
-                for(int i = 0; !disposed; i++)
-                {
-                    try
-                    {
-                        // Wait for transport to be connected.
-                        ITransport transport = ConnectedTransport;
-                        DateTime start = DateTime.Now;                            
-                        bool timedout = false;
-                        while(transport == null && !disposed && connectionFailure == null)
-                        {
-                            Tracer.Info("Waiting for transport to reconnect.");
+			lock(reconnectMutex)
+			{
+				if(IsShutdownCommand(command) && ConnectedTransport == null)
+				{
+					if(command.IsShutdownInfo)
+					{
+						// Skipping send of ShutdownInfo command when not connected.
+						return;
+					}
 
-                            int elapsed = (int)(DateTime.Now - start).TotalMilliseconds;
-                            if( this.timeout > 0 && elapsed > timeout )
-                            {
-                                timedout = true;
-                                Tracer.DebugFormat("FailoverTransport.oneway - timed out after {0} mills", elapsed );
-                                break;
-                            }
-                            
-                            // Release so that the reconnect task can run
-                            try
-                            {
-                                // Wait for something
-                                Monitor.Wait(reconnectMutex, 1000);
-                            }
-                            catch(ThreadInterruptedException e)
-                            {
-                                Tracer.DebugFormat("Interrupted: {0}", e.Message);
-                            }                            
+					if(command is RemoveInfo)
+					{
+						// Simulate response to RemoveInfo command
+						Response response = new Response();
+						response.CorrelationId = command.CommandId;
+						OnCommand(this, response);
+						return;
+					}
+				}
 
-                            transport = ConnectedTransport;
-                        }
+				// Keep trying until the message is sent.
+				for(int i = 0; !disposed; i++)
+				{
+					try
+					{
+						// Wait for transport to be connected.
+						ITransport transport = ConnectedTransport;
+						DateTime start = DateTime.Now;
+						bool timedout = false;
+						while(transport == null && !disposed && connectionFailure == null)
+						{
+							Tracer.Info("Waiting for transport to reconnect.");
 
-                        if(transport == null)
-                        {
-                            // Previous loop may have exited due to use being disposed.
-                            if(disposed)
-                            {
-                                error = new IOException("Transport disposed.");
-                            }
-                            else if(connectionFailure != null)
-                            {
-                                error = connectionFailure;
-                            }
-                            else if(timedout)
-                            {
-                                error = new IOException("Failover oneway timed out after "+ timeout +" milliseconds.");
-                            }
-                            else
-                            {
-                                error = new IOException("Unexpected failure.");
-                            }
-                            break;
-                        }
+							int elapsed = (int) (DateTime.Now - start).TotalMilliseconds;
+							if(this.timeout > 0 && elapsed > timeout)
+							{
+								timedout = true;
+								Tracer.DebugFormat("FailoverTransport.oneway - timed out after {0} mills", elapsed);
+								break;
+							}
+
+							// Release so that the reconnect task can run
+							try
+							{
+								// Wait for something
+								Monitor.Wait(reconnectMutex, 1000);
+							}
+							catch(ThreadInterruptedException e)
+							{
+								Tracer.DebugFormat("Interrupted: {0}", e.Message);
+							}
 
-                        // If it was a request and it was not being tracked by
-                        // the state tracker, then hold it in the requestMap so
-                        // that we can replay it later.
-                        Tracked tracked = stateTracker.track(command);
-                        lock(((ICollection) requestMap).SyncRoot)
-                        {
-                            if(tracked != null && tracked.WaitingForResponse)
-                            {
-                                requestMap.Add(command.CommandId, tracked);
-                            }
-                            else if(tracked == null && command.ResponseRequired)
-                            {
-                                requestMap.Add(command.CommandId, command);
-                            }
-                        }
+							transport = ConnectedTransport;
+						}
 
-                        // Send the message.
-                        try
-                        {
-                            transport.Oneway(command);
-                            stateTracker.trackBack(command);
-                        }
-                        catch(Exception e)
-                        {
-                            // If the command was not tracked.. we will retry in
-                            // this method
-                            if(tracked == null)
-                            {
+						if(transport == null)
+						{
+							// Previous loop may have exited due to use being disposed.
+							if(disposed)
+							{
+								error = new IOException("Transport disposed.");
+							}
+							else if(connectionFailure != null)
+							{
+								error = connectionFailure;
+							}
+							else if(timedout)
+							{
+								error = new IOException("Failover oneway timed out after " + timeout + " milliseconds.");
+							}
+							else
+							{
+								error = new IOException("Unexpected failure.");
+							}
+							break;
+						}
+
+						// If it was a request and it was not being tracked by
+						// the state tracker, then hold it in the requestMap so
+						// that we can replay it later.
+						Tracked tracked = stateTracker.track(command);
+						lock(((ICollection) requestMap).SyncRoot)
+						{
+							if(tracked != null && tracked.WaitingForResponse)
+							{
+								requestMap.Add(command.CommandId, tracked);
+							}
+							else if(tracked == null && command.ResponseRequired)
+							{
+								requestMap.Add(command.CommandId, command);
+							}
+						}
 
-                                // since we will retry in this method.. take it
-                                // out of the request map so that it is not
-                                // sent 2 times on recovery
-                                if(command.ResponseRequired)
-                                {
-                                    lock(((ICollection) requestMap).SyncRoot)
-                                    {
-                                        requestMap.Remove(command.CommandId);
-                                    }
-                                }
-
-                                // Rethrow the exception so it will handled by
-                                // the outer catch
-                                throw e;
-                            }
+						// Send the message.
+						try
+						{
+							transport.Oneway(command);
+							stateTracker.trackBack(command);
+						}
+						catch(Exception e)
+						{
+							// If the command was not tracked.. we will retry in
+							// this method
+							if(tracked == null)
+							{
+
+								// since we will retry in this method.. take it
+								// out of the request map so that it is not
+								// sent 2 times on recovery
+								if(command.ResponseRequired)
+								{
+									lock(((ICollection) requestMap).SyncRoot)
+									{
+										requestMap.Remove(command.CommandId);
+									}
+								}
+
+								// Rethrow the exception so it will handled by
+								// the outer catch
+								throw e;
+							}
 
-                        }
+						}
 
-                        return;
+						return;
 
-                    }
-                    catch(Exception e)
-                    {
-                        Tracer.DebugFormat("Send Oneway attempt: {0} failed: Message = {1}", i, e.Message);
-                        Tracer.DebugFormat("Failed Message Was: {0}", command);
-                        HandleTransportFailure(e);
-                    }
-                }
-            }
+					}
+					catch(Exception e)
+					{
+						Tracer.DebugFormat("Send Oneway attempt: {0} failed: Message = {1}", i, e.Message);
+						Tracer.DebugFormat("Failed Message Was: {0}", command);
+						HandleTransportFailure(e);
+					}
+				}
+			}
 
-            if(!disposed)
-            {
-                if(error != null)
-                {
-                    throw error;
-                }
-            }
-        }
-
-        public void Add(bool rebalance, Uri[] u)
-        {
-            lock(uris)
-            {
-                for(int i = 0; i < u.Length; i++)
-                {
-                    if(!uris.Contains(u[i]))
-                    {
-                        uris.Add(u[i]);
-                    }
-                }
-            }
+			if(!disposed)
+			{
+				if(error != null)
+				{
+					throw error;
+				}
+			}
+		}
 
-            Reconnect(rebalance);
-        }
+		public void Add(bool rebalance, Uri[] u)
+		{
+			lock(uris)
+			{
+				for(int i = 0; i < u.Length; i++)
+				{
+					if(!uris.Contains(u[i]))
+					{
+						uris.Add(u[i]);
+					}
+				}
+			}
 
-        public void Remove(bool rebalance, Uri[] u)
-        {
-            lock(uris)
-            {
-                for(int i = 0; i < u.Length; i++)
-                {
-                    uris.Remove(u[i]);
-                }
-            }
-
-            Reconnect(rebalance);
-        }
-
-        public void Add(bool rebalance, String u)
-        {
-            try
-            {
-                Uri uri = new Uri(u);
-                lock(uris)
-                {
-                    if(!uris.Contains(uri))
-                    {
-                        uris.Add(uri);
+			Reconnect(rebalance);
+		}
+
+		public void Remove(bool rebalance, Uri[] u)
+		{
+			lock(uris)
+			{
+				for(int i = 0; i < u.Length; i++)
+				{
+					uris.Remove(u[i]);
+				}
+			}
+
+			Reconnect(rebalance);
+		}
+
+		public void Add(bool rebalance, String u)
+		{
+			try
+			{
+				Uri uri = new Uri(u);
+				lock(uris)
+				{
+					if(!uris.Contains(uri))
+					{
+						uris.Add(uri);
 						Reconnect(rebalance);
-                    }
-                }				
-            }
-            catch(Exception e)
-            {
-                Tracer.ErrorFormat("Failed to parse URI '{0}': {1}", u, e.Message);
-            }
-        }
-
-        public void Reconnect(Uri uri)
-        {
-            Add(true, new Uri[] { uri });
-        }
-        
-        public void Reconnect(bool rebalance)
-        {
-            lock(reconnectMutex)
-            {
-                if(started)
-                {
-                    if(reconnectTask == null)
-                    {
-                        Tracer.Debug("Creating reconnect task");
-                        reconnectTask = DefaultThreadPools.DefaultTaskRunnerFactory.CreateTaskRunner(new FailoverTask(this),
-                                            "ActiveMQ Failover Worker: " + this.GetHashCode().ToString());
-                    }
+					}
+				}
+			}
+			catch(Exception e)
+			{
+				Tracer.ErrorFormat("Failed to parse URI '{0}': {1}", u, e.Message);
+			}
+		}
+
+		public void Reconnect(Uri uri)
+		{
+			Add(true, new Uri[] { uri });
+		}
+
+		public void Reconnect(bool rebalance)
+		{
+			lock(reconnectMutex)
+			{
+				if(started)
+				{
+					if(reconnectTask == null)
+					{
+						Tracer.Debug("Creating reconnect task");
+						reconnectTask = DefaultThreadPools.DefaultTaskRunnerFactory.CreateTaskRunner(new FailoverTask(this),
+											"ActiveMQ Failover Worker: " + this.GetHashCode().ToString());
+					}
 
                     if(rebalance)
                     {
@@ -758,574 +758,575 @@ namespace Apache.NMS.ActiveMQ.Transport.
                         }
                     }
 
-                    Tracer.Debug("Waking up reconnect task");
-                    try
-                    {
-                        reconnectTask.Wakeup();
-                    }
-                    catch(ThreadInterruptedException)
-                    {
-                    }
-                }
-                else
-                {
-                    Tracer.Debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
-                }
-            }
-        }
-
-        private List<Uri> ConnectList
-        {
-            get
-            {
-                List<Uri> l = new List<Uri>(uris);
-                bool removed = false;
-                if(failedConnectTransportURI != null)
-                {
-                    removed = l.Remove(failedConnectTransportURI);
-                }
-
-                if(Randomize)
-                {
-                    // Randomly, reorder the list by random swapping
-                    Random r = new Random(DateTime.Now.Millisecond);
-                    for(int i = 0; i < l.Count; i++)
-                    {
-                        int p = r.Next(l.Count);
-                        Uri t = l[p];
-                        l[p] = l[i];
-                        l[i] = t;
-                    }
-                }
+					Tracer.Debug("Waking up reconnect task");
+					try
+					{
+						reconnectTask.Wakeup();
+					}
+					catch(ThreadInterruptedException)
+					{
+					}
+				}
+				else
+				{
+					Tracer.Debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
+				}
+			}
+		}
 
-                if(removed)
-                {
-                    l.Add(failedConnectTransportURI);
-                }
-
-                return l;
-            }
-        }
-
-        protected void RestoreTransport(ITransport t)
-        {
-            Tracer.Info("Restoring previous transport connection.");
-            t.Start();
-            
-            //send information to the broker - informing it we are an ft client
-            ConnectionControl cc = new ConnectionControl();
-            cc.FaultTolerant = true;
-            t.Oneway(cc);
-            stateTracker.DoRestore(t);
-
-            Tracer.Info("Sending queued commands...");
-            Dictionary<int, Command> tmpMap = null;
-            lock(((ICollection) requestMap).SyncRoot)
-            {
-                tmpMap = new Dictionary<int, Command>(requestMap);
-            }
-
-            foreach(Command command in tmpMap.Values)
-            {
-                t.Oneway(command);
-            }
-        }
-        
-        public Uri RemoteAddress
-        {
-            get
-            {
-                if(ConnectedTransport != null)
-                {
-                    return ConnectedTransport.RemoteAddress;
-                }
-                return null;
-            }
-        }
-
-        public Object Narrow(Type type)
-        {
-            if(this.GetType().Equals(type))
-            {
-                return this;
-            }
-            else if(ConnectedTransport != null)
-            {
-                return ConnectedTransport.Narrow(type);
-            }
-
-            return null;
-        }
-
-        private bool DoConnect()
-        {
-            lock(reconnectMutex)
-            {
-                if(ConnectedTransport != null || disposed || connectionFailure != null)
-                {
-                    return false;
-                }
-                else
-                {
-                    List<Uri> connectList = ConnectList;
-                    if(connectList.Count == 0)
-                    {
-                        Failure = new NMSConnectionException("No URIs available for connection.");
-                    }
-                    else
-                    {
-                        if(!UseExponentialBackOff)
-                        {
-                            ReconnectDelay = InitialReconnectDelay;
-                        }
+		private List<Uri> ConnectList
+		{
+			get
+			{
+				List<Uri> l = new List<Uri>(uris);
+				bool removed = false;
+				if(failedConnectTransportURI != null)
+				{
+					removed = l.Remove(failedConnectTransportURI);
+				}
 
-                        try
-                        {
-                            backupMutex.WaitOne();
-                            if(Backup && backups.Count != 0)
-                            {
-                                BackupTransport bt = backups[0];
-                                backups.RemoveAt(0);
-                                ITransport t = bt.Transport;
-                                Uri uri = bt.Uri;
-                                t.Command = new CommandHandler(OnCommand);
-                                t.Exception = new ExceptionHandler(OnException);
-                                try
-                                {
-                                    if(started)
-                                    {
-                                        RestoreTransport(t);
-                                    }
-                                    ReconnectDelay = InitialReconnectDelay;
-                                    failedConnectTransportURI = null;
-                                    ConnectedTransportURI = uri;
-                                    ConnectedTransport = t;
-                                    connectFailures = 0;
-                                    connected = true;
-                                    if(this.Resumed != null)
-                                    {
-                                        this.Resumed(t);
-                                    }
-                                    Tracer.InfoFormat("Successfully reconnected to backup {0}", uri.ToString());
-                                    return false;
-                                }
-                                catch(Exception e)
-                                {
-                                    Tracer.DebugFormat("Backup transport failed: {0}", e.Message);
-                                }
-                            }
-                        }
-                        finally
-                        {
-                            backupMutex.ReleaseMutex();
-                        }
+				if(Randomize)
+				{
+					// Randomly, reorder the list by random swapping
+					Random r = new Random(DateTime.Now.Millisecond);
+					for(int i = 0; i < l.Count; i++)
+					{
+						int p = r.Next(l.Count);
+						Uri t = l[p];
+						l[p] = l[i];
+						l[i] = t;
+					}
+				}
 
-                        ManualResetEvent allDone = new ManualResetEvent(false);
-                        ITransport transport = null;
-                        Uri chosenUri = null;
-                        object syncLock = new object();
+				if(removed)
+				{
+					l.Add(failedConnectTransportURI);
+				}
 
-                        try
-                        {
-                            foreach(Uri uri in connectList)
-                            {
-                                if(ConnectedTransport != null || disposed)
-                                {
-                                    break;
-                                }
-
-                                Tracer.DebugFormat("Attempting connect to: {0}", uri);
-
-                                if(asyncConnect)
-                                {
-                                    // set connector up
-                                    Connector connector = new Connector(
-                                        delegate(ITransport transportToUse, Uri uriToUse) {
-                                            if(transport == null)
-                                            {
-                                                lock(syncLock)
-                                                {
-                                                    if(transport == null)
-                                                    {
-                                                        //the transport has not yet been set asynchronously so set it
-                                                        transport = transportToUse;
-                                                        chosenUri = uriToUse;
-                                                    }
-                                                    //notify issuing thread to move on
-                                                    allDone.Set();
-                                                }
-                                            }
-                                        }, uri, this);
-
-                                    // initiate a thread to try connecting to broker
-                                    Thread thread = new Thread(connector.DoConnect);
-                                    thread.Name = uri.ToString();
-                                    thread.Start();
-                                }
-                                else
-                                {
-                                    // synchronous connect
-                                    try
-                                    {
-                                        Tracer.DebugFormat("Attempting connect to: {0}", uri.ToString());
-                                        transport = TransportFactory.CompositeConnect(uri);
-                                        chosenUri = transport.RemoteAddress;
-                                        break;
-                                    }
-                                    catch(Exception e)
-                                    {
-                                        Failure = e;
-                                        Tracer.DebugFormat("Connect fail to: {0}, reason: {1}", uri, e.Message);
-                                    }
-                                }
-                            }
+				return l;
+			}
+		}
 
-                            if(asyncConnect)
-                            {
-                                // now wait for transport to be populated, but timeout eventually
-                                allDone.WaitOne(asyncTimeout, false);
-                            }
+		protected void RestoreTransport(ITransport t)
+		{
+			Tracer.Info("Restoring previous transport connection.");
+			t.Start();
 
-                            if(transport != null)
-                            {
-                                transport.Command = new CommandHandler(OnCommand);
-                                transport.Exception = new ExceptionHandler(OnException);
-                                transport.Start();
-
-                                if(started)
-                                {
-                                    RestoreTransport(transport);
-                                }
-
-                                if(this.Resumed != null)
-                                {
-                                    this.Resumed(transport);
-                                }
-
-                                Tracer.Debug("Connection established");
-                                ReconnectDelay = InitialReconnectDelay;
-                                ConnectedTransportURI = chosenUri;
-                                ConnectedTransport = transport;
-                                connectFailures = 0;
-                                connected = true;
-
-                                if(firstConnection)
-                                {
-                                    firstConnection = false;
-                                    Tracer.InfoFormat("Successfully connected to: {0}", chosenUri.ToString());
-                                }
-                                else
-                                {
-                                    Tracer.InfoFormat("Successfully reconnected to: {0}", chosenUri.ToString());
-                                }
+			//send information to the broker - informing it we are an ft client
+			ConnectionControl cc = new ConnectionControl();
+			cc.FaultTolerant = true;
+			t.Oneway(cc);
+			stateTracker.DoRestore(t);
+
+			Tracer.Info("Sending queued commands...");
+			Dictionary<int, Command> tmpMap = null;
+			lock(((ICollection) requestMap).SyncRoot)
+			{
+				tmpMap = new Dictionary<int, Command>(requestMap);
+			}
 
-                                return false;
-                            }
+			foreach(Command command in tmpMap.Values)
+			{
+				t.Oneway(command);
+			}
+		}
 
-                            if(asyncConnect)
-                            {
-                                Tracer.DebugFormat("Connect failed after waiting for asynchronous callback.");
-                            }
+		public Uri RemoteAddress
+		{
+			get
+			{
+				if(ConnectedTransport != null)
+				{
+					return ConnectedTransport.RemoteAddress;
+				}
+				return null;
+			}
+		}
 
-                        }
-                        catch(Exception e)
-                        {
-                            Failure = e;
-                            Tracer.DebugFormat("Connect attempt failed.  Reason: {0}", e.Message);
-                        }
-                    }
+		public Object Narrow(Type type)
+		{
+			if(this.GetType().Equals(type))
+			{
+				return this;
+			}
+			else if(ConnectedTransport != null)
+			{
+				return ConnectedTransport.Narrow(type);
+			}
 
-                    if(MaxReconnectAttempts > 0 && ++connectFailures >= MaxReconnectAttempts)
-                    {
-                        Tracer.ErrorFormat("Failed to connect to transport after {0} attempt(s)", connectFailures);
-                        connectionFailure = Failure;
-                        this.Exception(this, connectionFailure);
-                        return false;
-                    }
-                }
-            }
+			return null;
+		}
 
-            if(!disposed)
-            {
-                Tracer.DebugFormat("Waiting {0}ms before attempting connection.", ReconnectDelay);
-                lock(sleepMutex)
-                {
-                    try
-                    {
-                        Thread.Sleep(ReconnectDelay);
-                    }
-                    catch(ThreadInterruptedException)
-                    {
-                    }
-                }
+		private bool DoConnect()
+		{
+			lock(reconnectMutex)
+			{
+				if(ConnectedTransport != null || disposed || connectionFailure != null)
+				{
+					return false;
+				}
+				else
+				{
+					List<Uri> connectList = ConnectList;
+					if(connectList.Count == 0)
+					{
+						Failure = new NMSConnectionException("No URIs available for connection.");
+					}
+					else
+					{
+						if(!UseExponentialBackOff)
+						{
+							ReconnectDelay = InitialReconnectDelay;
+						}
 
-                if(UseExponentialBackOff)
-                {
-                    // Exponential increment of reconnect delay.
-                    ReconnectDelay *= ReconnectDelayExponent;
-                    if(ReconnectDelay > MaxReconnectDelay)
-                    {
-                        ReconnectDelay = MaxReconnectDelay;
-                    }
-                }
-            }
-            return !disposed;
-        }
-
-        /// <summary>
-        /// This class is a helper for the asynchronous connect option
-        /// </summary>
-        public class Connector
-        {
-            /// <summary>
-            /// callback to properly set chosen transport
-            /// </summary>
-            SetTransport _setTransport;
-
-            /// <summary>
-            /// Uri to try connecting to
-            /// </summary>
-            Uri _uri;
-
-            /// <summary>
-            /// Failover transport issuing the connection attempt
-            /// </summary>
-            private FailoverTransport _transport;
-
-            /// <summary>
-            /// Initializes a new instance of the <see cref="Connector"/> class.
-            /// </summary>
-            /// <param name="setTransport">The set transport.</param>
-            /// <param name="uri">The URI.</param>
-            /// <param name="transport">The transport.</param>
-            public Connector(SetTransport setTransport, Uri uri, FailoverTransport transport)
-            {
-                _uri = uri;
-                _setTransport = setTransport;
-                _transport = transport;
-            }
-
-            /// <summary>
-            /// Does the connect.
-            /// </summary>
-            public void DoConnect()
-            {
-                try
-                {
-                    TransportFactory.AsyncCompositeConnect(_uri, _setTransport);
-                }
-                catch(Exception e)
-                {
-                    _transport.Failure = e;
-                    Tracer.DebugFormat("Connect fail to: {0}, reason: {1}", _uri, e.Message);
-                }
-
-            }
-        }
-
-        private bool BuildBackups()
-        {
-            lock(backupMutex)
-            {
-                if(!disposed && Backup && backups.Count < BackupPoolSize)
-                {
-                    List<Uri> connectList = ConnectList;
-                    foreach(BackupTransport bt in backups)
-                    {
-                        if(bt.Disposed)
-                        {
-                            backups.Remove(bt);
-                        }
-                    }
+						try
+						{
+							backupMutex.WaitOne();
+							if(Backup && backups.Count != 0)
+							{
+								BackupTransport bt = backups[0];
+								backups.RemoveAt(0);
+								ITransport t = bt.Transport;
+								Uri uri = bt.Uri;
+								t.Command = new CommandHandler(OnCommand);
+								t.Exception = new ExceptionHandler(OnException);
+								try
+								{
+									if(started)
+									{
+										RestoreTransport(t);
+									}
+									ReconnectDelay = InitialReconnectDelay;
+									failedConnectTransportURI = null;
+									ConnectedTransportURI = uri;
+									ConnectedTransport = t;
+									connectFailures = 0;
+									connected = true;
+									if(this.Resumed != null)
+									{
+										this.Resumed(t);
+									}
+									Tracer.InfoFormat("Successfully reconnected to backup {0}", uri.ToString());
+									return false;
+								}
+								catch(Exception e)
+								{
+									Tracer.DebugFormat("Backup transport failed: {0}", e.Message);
+								}
+							}
+						}
+						finally
+						{
+							backupMutex.ReleaseMutex();
+						}
 
-                    foreach(Uri uri in connectList)
-                    {
-                        if(ConnectedTransportURI != null && !ConnectedTransportURI.Equals(uri))
-                        {
-                            try
-                            {
-                                BackupTransport bt = new BackupTransport(this);
-                                bt.Uri = uri;
-                                if(!backups.Contains(bt))
-                                {
-                                    ITransport t = TransportFactory.CompositeConnect(uri);
-                                    t.Command = new CommandHandler(bt.OnCommand);
-                                    t.Exception = new ExceptionHandler(bt.OnException);
-                                    t.Start();
-                                    bt.Transport = t;
-                                    backups.Add(bt);
-                                }
-                            }
-                            catch(Exception e)
-                            {
-                                Tracer.DebugFormat("Failed to build backup: {0}", e.Message);
-                            }
-                        }
+						ManualResetEvent allDone = new ManualResetEvent(false);
+						ITransport transport = null;
+						Uri chosenUri = null;
+						object syncLock = new object();
 
-                        if(backups.Count < BackupPoolSize)
-                        {
-                            break;
-                        }
-                    }
-                }
-            }
+						try
+						{
+							foreach(Uri uri in connectList)
+							{
+								if(ConnectedTransport != null || disposed)
+								{
+									break;
+								}
+
+								Tracer.DebugFormat("Attempting connect to: {0}", uri);
+
+								if(asyncConnect)
+								{
+									// set connector up
+									Connector connector = new Connector(
+										delegate(ITransport transportToUse, Uri uriToUse)
+										{
+											if(transport == null)
+											{
+												lock(syncLock)
+												{
+													if(transport == null)
+													{
+														//the transport has not yet been set asynchronously so set it
+														transport = transportToUse;
+														chosenUri = uriToUse;
+													}
+													//notify issuing thread to move on
+													allDone.Set();
+												}
+											}
+										}, uri, this);
+
+									// initiate a thread to try connecting to broker
+									Thread thread = new Thread(connector.DoConnect);
+									thread.Name = uri.ToString();
+									thread.Start();
+								}
+								else
+								{
+									// synchronous connect
+									try
+									{
+										Tracer.DebugFormat("Attempting connect to: {0}", uri.ToString());
+										transport = TransportFactory.CompositeConnect(uri);
+										chosenUri = transport.RemoteAddress;
+										break;
+									}
+									catch(Exception e)
+									{
+										Failure = e;
+										Tracer.DebugFormat("Connect fail to: {0}, reason: {1}", uri, e.Message);
+									}
+								}
+							}
+
+							if(asyncConnect)
+							{
+								// now wait for transport to be populated, but timeout eventually
+								allDone.WaitOne(asyncTimeout, false);
+							}
+
+							if(transport != null)
+							{
+								transport.Command = new CommandHandler(OnCommand);
+								transport.Exception = new ExceptionHandler(OnException);
+								transport.Start();
+
+								if(started)
+								{
+									RestoreTransport(transport);
+								}
+
+								if(this.Resumed != null)
+								{
+									this.Resumed(transport);
+								}
+
+								Tracer.Debug("Connection established");
+								ReconnectDelay = InitialReconnectDelay;
+								ConnectedTransportURI = chosenUri;
+								ConnectedTransport = transport;
+								connectFailures = 0;
+								connected = true;
+
+								if(firstConnection)
+								{
+									firstConnection = false;
+									Tracer.InfoFormat("Successfully connected to: {0}", chosenUri.ToString());
+								}
+								else
+								{
+									Tracer.InfoFormat("Successfully reconnected to: {0}", chosenUri.ToString());
+								}
+
+								return false;
+							}
+
+							if(asyncConnect)
+							{
+								Tracer.DebugFormat("Connect failed after waiting for asynchronous callback.");
+							}
+
+						}
+						catch(Exception e)
+						{
+							Failure = e;
+							Tracer.DebugFormat("Connect attempt failed.  Reason: {0}", e.Message);
+						}
+					}
+
+					if(MaxReconnectAttempts > 0 && ++connectFailures >= MaxReconnectAttempts)
+					{
+						Tracer.ErrorFormat("Failed to connect to transport after {0} attempt(s)", connectFailures);
+						connectionFailure = Failure;
+						this.Exception(this, connectionFailure);
+						return false;
+					}
+				}
+			}
+
+			if(!disposed)
+			{
+				Tracer.DebugFormat("Waiting {0}ms before attempting connection.", ReconnectDelay);
+				lock(sleepMutex)
+				{
+					try
+					{
+						Thread.Sleep(ReconnectDelay);
+					}
+					catch(ThreadInterruptedException)
+					{
+					}
+				}
+
+				if(UseExponentialBackOff)
+				{
+					// Exponential increment of reconnect delay.
+					ReconnectDelay *= ReconnectDelayExponent;
+					if(ReconnectDelay > MaxReconnectDelay)
+					{
+						ReconnectDelay = MaxReconnectDelay;
+					}
+				}
+			}
+			return !disposed;
+		}
+
+		/// <summary>
+		/// This class is a helper for the asynchronous connect option
+		/// </summary>
+		public class Connector
+		{
+			/// <summary>
+			/// callback to properly set chosen transport
+			/// </summary>
+			SetTransport _setTransport;
+
+			/// <summary>
+			/// Uri to try connecting to
+			/// </summary>
+			Uri _uri;
+
+			/// <summary>
+			/// Failover transport issuing the connection attempt
+			/// </summary>
+			private FailoverTransport _transport;
+
+			/// <summary>
+			/// Initializes a new instance of the <see cref="Connector"/> class.
+			/// </summary>
+			/// <param name="setTransport">The set transport.</param>
+			/// <param name="uri">The URI.</param>
+			/// <param name="transport">The transport.</param>
+			public Connector(SetTransport setTransport, Uri uri, FailoverTransport transport)
+			{
+				_uri = uri;
+				_setTransport = setTransport;
+				_transport = transport;
+			}
+
+			/// <summary>
+			/// Does the connect.
+			/// </summary>
+			public void DoConnect()
+			{
+				try
+				{
+					TransportFactory.AsyncCompositeConnect(_uri, _setTransport);
+				}
+				catch(Exception e)
+				{
+					_transport.Failure = e;
+					Tracer.DebugFormat("Connect fail to: {0}, reason: {1}", _uri, e.Message);
+				}
+
+			}
+		}
+
+		private bool BuildBackups()
+		{
+			lock(backupMutex)
+			{
+				if(!disposed && Backup && backups.Count < BackupPoolSize)
+				{
+					List<Uri> connectList = ConnectList;
+					foreach(BackupTransport bt in backups)
+					{
+						if(bt.Disposed)
+						{
+							backups.Remove(bt);
+						}
+					}
+
+					foreach(Uri uri in connectList)
+					{
+						if(ConnectedTransportURI != null && !ConnectedTransportURI.Equals(uri))
+						{
+							try
+							{
+								BackupTransport bt = new BackupTransport(this);
+								bt.Uri = uri;
+								if(!backups.Contains(bt))
+								{
+									ITransport t = TransportFactory.CompositeConnect(uri);
+									t.Command = new CommandHandler(bt.OnCommand);
+									t.Exception = new ExceptionHandler(bt.OnException);
+									t.Start();
+									bt.Transport = t;
+									backups.Add(bt);
+								}
+							}
+							catch(Exception e)
+							{
+								Tracer.DebugFormat("Failed to build backup: {0}", e.Message);
+							}
+						}
 
-            return false;
-        }
+						if(backups.Count < BackupPoolSize)
+						{
+							break;
+						}
+					}
+				}
+			}
+
+			return false;
+		}
+
+		public void ConnectionInterruptProcessingComplete(ConnectionId connectionId)
+		{
+			lock(reconnectMutex)
+			{
+				Tracer.Debug("Connection Interrupt Processing is complete for ConnectionId: " + connectionId);
+				stateTracker.ConnectionInterruptProcessingComplete(this, connectionId);
+			}
+		}
 
-        public void ConnectionInterruptProcessingComplete(ConnectionId connectionId)
-        {
-            lock(reconnectMutex)
-            {
-                Tracer.Debug("Connection Interrupt Processing is complete for ConnectionId: " + connectionId);
-                stateTracker.ConnectionInterruptProcessingComplete(this, connectionId);
-            }
-        }
 
-		
 		public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
-		{		
-	        if(IsUpdateURIsSupported)
+		{
+			if(IsUpdateURIsSupported)
 			{
-	            List<Uri> copy = new List<Uri>(this.updated);
-	            List<Uri> added = new List<Uri>();
-				
-	            if(updatedURIs != null && updatedURIs.Length > 0) 
-				{
-	                HashSet<Uri> uriSet = new HashSet<Uri>();
-	                for(int i = 0; i < updatedURIs.Length; i++) 
-					{
-	                    Uri uri = updatedURIs[i];
-	                    if(uri != null) 
-						{
-	                        uriSet.Add(uri);
-	                    }
-	                }
-					
-	                foreach(Uri uri in uriSet)
-					{
-	                    if(copy.Remove(uri) == false) 
-						{
-	                        added.Add(uri);
-	                    }
-	                }
-					
-	                lock(reconnectMutex) 
-					{
-	                    this.updated.Clear();
-	                    this.updated.AddRange(added);
-	                    
-						foreach(Uri uri in copy) 
-						{
-	                        this.uris.Remove(uri);
-	                    }
-	                    
+				List<Uri> copy = new List<Uri>(this.updated);
+				List<Uri> added = new List<Uri>();
+
+				if(updatedURIs != null && updatedURIs.Length > 0)
+				{
+					Dictionary<Uri, bool> uriSet = new Dictionary<Uri, bool>();
+					for(int i = 0; i < updatedURIs.Length; i++)
+					{
+						Uri uri = updatedURIs[i];
+						if(uri != null)
+						{
+							uriSet[uri] = true;
+						}
+					}
+
+					foreach(Uri uri in uriSet.Keys)
+					{
+						if(copy.Remove(uri) == false)
+						{
+							added.Add(uri);
+						}
+					}
+
+					lock(reconnectMutex)
+					{
+						this.updated.Clear();
+						this.updated.AddRange(added);
+
+						foreach(Uri uri in copy)
+						{
+							this.uris.Remove(uri);
+						}
+
 						this.Add(rebalance, added.ToArray());
-	                }
-	            }
-	        }			
-		}
-
-	    public void HandleConnectionControl(ConnectionControl control) 
-		{
-	        string reconnectStr = control.ReconnectTo;
-	        
-			if(reconnectStr != null) 
-			{
-	            reconnectStr = reconnectStr.Trim();
-	            if(reconnectStr.Length > 0) 
-				{
-	                try 
-					{
-	                    Uri uri = new Uri(reconnectStr);
-	                    if(IsReconnectSupported) 
-						{
-	                        Reconnect(uri);
-	                        Tracer.Info("Reconnected to: " + uri.OriginalString);
-	                    }
-	                } 
-					catch(Exception e) 
-					{
-	                    Tracer.ErrorFormat("Failed to handle ConnectionControl reconnect to {0}: {1}", reconnectStr, e);
-	                }
-	            }
-	        }
-			
-	        ProcessNewTransports(control.RebalanceConnection, control.ConnectedBrokers);
-	    }
-	
-	    private void ProcessNewTransports(bool rebalance, String newTransports) 
-		{
-	        if(newTransports != null) 
-			{
-	            newTransports = newTransports.Trim();
-				
-	            if(newTransports.Length > 0 && IsUpdateURIsSupported) 
-				{
-	                List<Uri> list = new List<Uri>();
-	                String[] tokens = newTransports.Split(new Char []{','});
-	                
+					}
+				}
+			}
+		}
+
+		public void HandleConnectionControl(ConnectionControl control)
+		{
+			string reconnectStr = control.ReconnectTo;
+
+			if(reconnectStr != null)
+			{
+				reconnectStr = reconnectStr.Trim();
+				if(reconnectStr.Length > 0)
+				{
+					try
+					{
+						Uri uri = new Uri(reconnectStr);
+						if(IsReconnectSupported)
+						{
+							Reconnect(uri);
+							Tracer.Info("Reconnected to: " + uri.OriginalString);
+						}
+					}
+					catch(Exception e)
+					{
+						Tracer.ErrorFormat("Failed to handle ConnectionControl reconnect to {0}: {1}", reconnectStr, e);
+					}
+				}
+			}
+
+			ProcessNewTransports(control.RebalanceConnection, control.ConnectedBrokers);
+		}
+
+		private void ProcessNewTransports(bool rebalance, String newTransports)
+		{
+			if(newTransports != null)
+			{
+				newTransports = newTransports.Trim();
+
+				if(newTransports.Length > 0 && IsUpdateURIsSupported)
+				{
+					List<Uri> list = new List<Uri>();
+					String[] tokens = newTransports.Split(new Char[] { ',' });
+
 					foreach(String str in tokens)
 					{
-	                    try 
+						try
+						{
+							Uri uri = new Uri(str);
+							list.Add(uri);
+						}
+						catch
 						{
-	                        Uri uri = new Uri(str);
-	                        list.Add(uri);
-	                    } 
-						catch 
-						{
-	                        Tracer.Error("Failed to parse broker address: " + str);
-	                    }
-	                }
-					
-	                if(list.Count != 0)
+							Tracer.Error("Failed to parse broker address: " + str);
+						}
+					}
+
+					if(list.Count != 0)
 					{
-	                    try 
+						try
 						{
-	                        UpdateURIs(rebalance, list.ToArray());
-	                    } 
+							UpdateURIs(rebalance, list.ToArray());
+						}
 						catch
 						{
-	                        Tracer.Error("Failed to update transport URI's from: " + newTransports);
-	                    }
-	                }
-	            }
-	        }
-	    }
-		
+							Tracer.Error("Failed to update transport URI's from: " + newTransports);
+						}
+					}
+				}
+			}
+		}
+
 		public void Dispose()
-        {
-            Dispose(true);
-            GC.SuppressFinalize(this);
-        }
-
-        public void Dispose(bool disposing)
-        {
-            if(disposing)
-            {
-                // get rid of unmanaged stuff
-            }
-
-            disposed = true;
-        }
-
-        public int CompareTo(Object o)
-        {
-            if(o is FailoverTransport)
-            {
-                FailoverTransport oo = o as FailoverTransport;
-
-                return this.id - oo.id;
-            }
-            else
-            {
-                throw new ArgumentException();
-            }
-        }
-
-        public override String ToString()
-        {
-            return ConnectedTransportURI == null ? "unconnected" : ConnectedTransportURI.ToString();
-        }
-    }
+		{
+			Dispose(true);
+			GC.SuppressFinalize(this);
+		}
+
+		public void Dispose(bool disposing)
+		{
+			if(disposing)
+			{
+				// get rid of unmanaged stuff
+			}
+
+			disposed = true;
+		}
+
+		public int CompareTo(Object o)
+		{
+			if(o is FailoverTransport)
+			{
+				FailoverTransport oo = o as FailoverTransport;
+
+				return this.id - oo.id;
+			}
+			else
+			{
+				throw new ArgumentException();
+			}
+		}
+
+		public override String ToString()
+		{
+			return ConnectedTransportURI == null ? "unconnected" : ConnectedTransportURI.ToString();
+		}
+	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs?rev=980561&r1=980560&r2=980561&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs Thu Jul 29 20:22:54 2010
@@ -22,26 +22,26 @@ namespace Apache.NMS.ActiveMQ.Transport
 	public interface ICompositeTransport : ITransport
 	{
 		/// <summary>
-		/// Adds a new set of Uris to the list of Uris that this Transport can connect to. 
+		/// Adds a new set of Uris to the list of Uris that this Transport can connect to.
 		/// </summary>
 		/// <param name="rebalance">
 		/// A <see cref="System.Boolean"/>
 		/// Should the current connection be broken and a new one created.
 		/// </param>
 		/// <param name="uris">
-		/// A <see cref="Uri[]"/>
+		/// A <see cref="Uri"/>
 		/// </param>
 		void Add(bool rebalance, Uri[] uris);
 
 		/// <summary>
-		/// Remove the given Uris from this Transports list of known Uris. 
+		/// Remove the given Uris from this Transports list of known Uris.
 		/// </summary>
 		/// <param name="rebalance">
 		/// A <see cref="System.Boolean"/>
 		/// Should the current connection be broken and a new one created.
 		/// </param>
 		/// <param name="uris">
-		/// A <see cref="Uri[]"/>
+		/// A <see cref="Uri"/>
 		/// </param>
 		void Remove(bool rebalance, Uri[] uris);
 	}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs?rev=980561&r1=980560&r2=980561&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs Thu Jul 29 20:22:54 2010
@@ -36,7 +36,7 @@ namespace Apache.NMS.ActiveMQ.Transport
 	{
         /// <summary>
         /// Sends a Command object on the Wire but does not wait for any response from the
-        /// receiver before returning.  
+        /// receiver before returning.
         /// </summary>
         /// <param name="command">
         /// A <see cref="Command"/>
@@ -70,7 +70,7 @@ namespace Apache.NMS.ActiveMQ.Transport
 
         /// <summary>
         /// Sends a Command to the Broker and waits for the given TimeSpan to expire for a
-        /// response before returning.  
+        /// response before returning.
         /// </summary>
         /// <param name="command">
         /// A <see cref="Command"/>
@@ -95,8 +95,8 @@ namespace Apache.NMS.ActiveMQ.Transport
         /// <returns>
         /// A <see cref="System.Object"/>
         /// </returns>
-        Object Narrow(Type type);            
-        
+        Object Narrow(Type type);
+
 		CommandHandler Command
 		{
 			get;
@@ -113,13 +113,13 @@ namespace Apache.NMS.ActiveMQ.Transport
 		{
 			get;
 			set;
-		}		
+		}
 
 		ResumedHandler Resumed
 		{
 			get;
 			set;
-		}		
+		}
 
         /// <value>
         /// Indicates if this Transport has already been disposed and can no longer
@@ -155,7 +155,7 @@ namespace Apache.NMS.ActiveMQ.Transport
         {
             get;
         }
-        
+
 		/// <summary>
 		/// Returns true if this Transport supports reconnections.
 		/// </summary>
@@ -163,7 +163,7 @@ namespace Apache.NMS.ActiveMQ.Transport
 		{
 			get;
 		}
-	    
+
 		/// <summary>
 		/// Returns true if this Transport can accept updated lists of connection Uri's.
 		/// </summary>
@@ -171,7 +171,7 @@ namespace Apache.NMS.ActiveMQ.Transport
 		{
 			get;
 		}
-		
+
 		/// <summary>
 		/// Updates the Uri's that this Transport is aware of and will use to
 		/// connect itself to.  If the rebalance option is true this method will
@@ -182,10 +182,10 @@ namespace Apache.NMS.ActiveMQ.Transport
 		/// A <see cref="System.Boolean"/>
 		/// </param>
 		/// <param name="updatedURIs">
-		/// A <see cref="Uri[]"/>
+		/// A <see cref="Uri"/>
 		/// </param>
 		void UpdateURIs(bool rebalance, Uri[] updatedURIs);
-		
+
 	}
 }
 



Mime
View raw message