activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1196799 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Transport: Failover/FailoverTransport.cs InactivityMonitor.cs
Date Wed, 02 Nov 2011 21:08:05 GMT
Author: tabish
Date: Wed Nov  2 21:08:05 2011
New Revision: 1196799

URL: http://svn.apache.org/viewvc?rev=1196799&view=rev
Log:
Fix for: https://issues.apache.org/jira/browse/AMQNET-344

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Transport/Failover/FailoverTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Transport/InactivityMonitor.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=1196799&r1=1196798&r2=1196799&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Transport/Failover/FailoverTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Transport/Failover/FailoverTransport.cs Wed Nov  2 21:08:05 2011
@@ -1,1385 +1,1384 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using System.Collections;
-using System.Collections.Generic;
-using System.Threading;
-using Apache.NMS.ActiveMQ.Commands;
-using Apache.NMS.ActiveMQ.State;
-using Apache.NMS.ActiveMQ.Threads;
-using Apache.NMS.Util;
-
-namespace Apache.NMS.ActiveMQ.Transport.Failover
-{
-	/// <summary>
-	/// A Transport that is made reliable by being able to fail over to another
-	/// transport when a transport failure is detected.
-	/// </summary>
-	public class FailoverTransport : ICompositeTransport, IComparable
-	{
-		private static int idCounter = 0;
-		private readonly int id;
-
-		private bool disposed;
-		private bool connected;
-		private readonly List<Uri> uris = new List<Uri>();
-		private readonly List<Uri> updated = new List<Uri>();
-		private CommandHandler commandHandler;
-		private ExceptionHandler exceptionHandler;
-		private InterruptedHandler interruptedHandler;
-		private ResumedHandler resumedHandler;
-
-		private readonly Mutex reconnectMutex = new Mutex();
-		private readonly Mutex backupMutex = new Mutex();
-		private readonly Mutex sleepMutex = new Mutex();
-		private readonly ConnectionStateTracker stateTracker = new ConnectionStateTracker();
-		private readonly Dictionary<int, Command> requestMap = new Dictionary<int, Command>();
-
-		private Uri connectedTransportURI;
-		private Uri failedConnectTransportURI;
-		private readonly AtomicReference<ITransport> connectedTransport = new AtomicReference<ITransport>(null);
-		private TaskRunner reconnectTask = null;
-		private bool started;
-
-		private int timeout = -1;
-		private int initialReconnectDelay = 10;
-		private int maxReconnectDelay = 1000 * 30;
-		private int backOffMultiplier = 2;
-		private bool useExponentialBackOff = true;
-		private bool randomize = true;
-		private bool initialized;
-        private int maxReconnectAttempts;
-        private int startupMaxReconnectAttempts;
-		private int connectFailures;
-		private int reconnectDelay = 10;
-		private int asyncTimeout = 45000;
-		private bool asyncConnect = false;
-		private Exception connectionFailure;
-		private bool firstConnection = true;
-		private bool backup = false;
-		private readonly List<BackupTransport> backups = new List<BackupTransport>();
-		private int backupPoolSize = 1;
-		private bool trackMessages = false;
-		private int maxCacheSize = 256;
-		private volatile Exception failure;
-		private readonly object mutex = new object();
-		private bool reconnectSupported = true;
-		private bool updateURIsSupported = true;
-
-		public FailoverTransport()
-		{
-			id = idCounter++;
-
-			stateTracker.TrackTransactions = true;
-		}
-
-		~FailoverTransport()
-		{
-			Dispose(false);
-		}
-
-		#region FailoverTask
-
-		private class FailoverTask : Task
-		{
-			private readonly FailoverTransport parent;
-
-			public FailoverTask(FailoverTransport p)
-			{
-				parent = p;
-			}
-
-			public bool Iterate()
-			{
-				bool result = false;
-				bool buildBackup = true;
-				bool doReconnect = !parent.disposed && parent.connectionFailure == null;
-				try
-				{
-					parent.backupMutex.WaitOne();
-					if(parent.ConnectedTransport == null && doReconnect)
-					{
-						result = parent.DoConnect();
-						buildBackup = false;
-					}
-				}
-				finally
-				{
-					parent.backupMutex.ReleaseMutex();
-				}
-
-				if(buildBackup)
-				{
-					parent.BuildBackups();
-				}
-				else
-				{
-					//build backups on the next iteration
-					result = true;
-					try
-					{
-						parent.reconnectTask.Wakeup();
-					}
-					catch(ThreadInterruptedException)
-					{
-						Tracer.Debug("Reconnect task has been interrupted.");
-					}
-				}
-				return result;
-			}
-		}
-
-		#endregion
-
-		#region Property Accessors
-
-		public CommandHandler Command
-		{
-			get { return commandHandler; }
-			set { commandHandler = value; }
-		}
-
-		public ExceptionHandler Exception
-		{
-			get { return exceptionHandler; }
-			set { exceptionHandler = value; }
-		}
-
-		public InterruptedHandler Interrupted
-		{
-			get { return interruptedHandler; }
-			set { this.interruptedHandler = value; }
-		}
-
-		public ResumedHandler Resumed
-		{
-			get { return resumedHandler; }
-			set { this.resumedHandler = value; }
-		}
-
-		internal Exception Failure
-		{
-			get { return failure; }
-			set
-			{
-				lock(mutex)
-				{
-					failure = value;
-				}
-			}
-		}
-
-		public int Timeout
-		{
-			get { return this.timeout; }
-			set { this.timeout = value; }
-		}
-
-		public int InitialReconnectDelay
-		{
-			get { return initialReconnectDelay; }
-			set { initialReconnectDelay = value; }
-		}
-
-		public int MaxReconnectDelay
-		{
-			get { return maxReconnectDelay; }
-			set { maxReconnectDelay = value; }
-		}
-
-		public int ReconnectDelay
-		{
-			get { return reconnectDelay; }
-			set { reconnectDelay = value; }
-		}
-
-		public int ReconnectDelayExponent
-		{
-			get { return backOffMultiplier; }
-			set { backOffMultiplier = value; }
-		}
-
-		public ITransport ConnectedTransport
-		{
-			get { return connectedTransport.Value; }
-			set { connectedTransport.Value = value; }
-		}
-
-		public Uri ConnectedTransportURI
-		{
-			get { return connectedTransportURI; }
-			set { connectedTransportURI = value; }
-		}
-
-		public int MaxReconnectAttempts
-		{
-			get { return maxReconnectAttempts; }
-			set { maxReconnectAttempts = value; }
-		}
-
-        public int StartupMaxReconnectAttempts
-        {
-            get { return startupMaxReconnectAttempts; }
-            set { startupMaxReconnectAttempts = value; }
-        }
-
-		public bool Randomize
-		{
-			get { return randomize; }
-			set { randomize = value; }
-		}
-
-		public bool Backup
-		{
-			get { return backup; }
-			set { backup = value; }
-		}
-
-		public int BackupPoolSize
-		{
-			get { return backupPoolSize; }
-			set { backupPoolSize = value; }
-		}
-
-		public bool TrackMessages
-		{
-			get { return trackMessages; }
-			set { trackMessages = value; }
-		}
-
-		public int MaxCacheSize
-		{
-			get { return maxCacheSize; }
-			set { maxCacheSize = value; }
-		}
-
-		public bool UseExponentialBackOff
-		{
-			get { return useExponentialBackOff; }
-			set { useExponentialBackOff = value; }
-		}
-
-        public IWireFormat WireFormat
-        {
-            get
-            {
-                ITransport transport = ConnectedTransport;
-                if(transport != null)
-                {
-                    return transport.WireFormat;
-                }
-
-                return null;
-            }
-        }
-
-		/// <summary>
-		/// Gets or sets a value indicating whether to asynchronously connect to sockets
-		/// </summary>
-		/// <value><c>true</c> if [async connect]; otherwise, <c>false</c>.</value>
-		public bool AsyncConnect
-		{
-			set { asyncConnect = value; }
-		}
-
-		/// <summary>
-		/// If doing an asynchronous connect, the milliseconds before timing out if no connection can be made
-		/// </summary>
-		/// <value>The async timeout.</value>
-		public int AsyncTimeout
-		{
-			get { return asyncTimeout; }
-			set { asyncTimeout = value; }
-		}
-
-        public ConnectionStateTracker StateTracker
-        {
-            get { return this.stateTracker; }
-        }
-
-		#endregion
-
-		public bool IsFaultTolerant
-		{
-			get { return true; }
-		}
-
-		public bool IsDisposed
-		{
-			get { return disposed; }
-		}
-
-		public bool IsConnected
-		{
-			get { return connected; }
-		}
-
-		public bool IsStarted
-		{
-			get { return started; }
-		}
-
-		public bool IsReconnectSupported
-		{
-			get { return this.reconnectSupported; }
-		}
-
-		public bool IsUpdateURIsSupported
-		{
-			get { return this.updateURIsSupported; }
-		}
-
-		public void OnException(ITransport sender, Exception error)
-		{
-			try
-			{
-				HandleTransportFailure(error);
-			}
-			catch(Exception e)
-			{
-				e.GetType();
-				// What to do here?
-			}
-		}
-
-		public void disposedOnCommand(ITransport sender, Command c)
-		{
-		}
-
-		public void disposedOnException(ITransport sender, Exception e)
-		{
-		}
-
-		public void HandleTransportFailure(Exception e)
-		{
-			ITransport transport = connectedTransport.GetAndSet(null);
-			if(transport != null)
-			{
-				transport.Command = disposedOnCommand;
-				transport.Exception = disposedOnException;
-				try
-				{
-					transport.Stop();
-				}
-				catch(Exception ex)
-				{
-					ex.GetType();	// Ignore errors but this lets us see the error during debugging
-				}
-
-				lock(reconnectMutex)
-				{
-					bool reconnectOk = false;
-					if(started)
-					{
-						Tracer.WarnFormat("Transport failed to {0}, attempting to automatically reconnect due to: {1}", ConnectedTransportURI.ToString(), e.Message);
-						reconnectOk = true;
-					}
-
-					initialized = false;
-					failedConnectTransportURI = ConnectedTransportURI;
-					ConnectedTransportURI = null;
-					connected = false;
-
-					if(this.Interrupted != null)
-					{
-						this.Interrupted(transport);
-					}
-
-					if(reconnectOk)
-					{
-						reconnectTask.Wakeup();
-					}
-				}
-			}
-		}
-
-		public void Start()
-		{
-			lock(reconnectMutex)
-			{
-				if(started)
-				{
-					Tracer.Debug("FailoverTransport Already Started.");
-					return;
-				}
-
-				Tracer.Debug("FailoverTransport Started.");
-				started = true;
-				stateTracker.MaxCacheSize = MaxCacheSize;
-				stateTracker.TrackMessages = TrackMessages;
-				if(ConnectedTransport != null)
-				{
-					stateTracker.DoRestore(ConnectedTransport);
-				}
-				else
-				{
-					Reconnect(false);
-				}
-			}
-		}
-
-		public virtual void Stop()
-		{
-			ITransport transportToStop = null;
-
-			lock(reconnectMutex)
-			{
-				if(!started)
-				{
-					Tracer.Debug("FailoverTransport Already Stopped.");
-					return;
-				}
-
-				Tracer.Debug("FailoverTransport Stopped.");
-				started = false;
-				disposed = true;
-				connected = false;
-				foreach(BackupTransport t in backups)
-				{
-					t.Disposed = true;
-				}
-				backups.Clear();
-
-				if(ConnectedTransport != null)
-				{
-					transportToStop = connectedTransport.GetAndSet(null);
-				}
-			}
-
-			try
-			{
-				sleepMutex.WaitOne();
-			}
-			finally
-			{
-				sleepMutex.ReleaseMutex();
-			}
-
-			if(reconnectTask != null)
-			{
-				reconnectTask.Shutdown();
-			}
-
-			if(transportToStop != null)
-			{
-				transportToStop.Stop();
-			}
-		}
-
-		public FutureResponse AsyncRequest(Command command)
-		{
-			throw new ApplicationException("FailoverTransport does not implement AsyncRequest(Command)");
-		}
-
-		public Response Request(Command command)
-		{
-			throw new ApplicationException("FailoverTransport does not implement Request(Command)");
-		}
-
-		public Response Request(Command command, TimeSpan ts)
-		{
-			throw new ApplicationException("FailoverTransport does not implement Request(Command, TimeSpan)");
-		}
-
-		public void OnCommand(ITransport sender, Command command)
-		{
-			if(command != null)
-			{
-				if(command.IsResponse)
-				{
-					Object oo = null;
-					lock(((ICollection) requestMap).SyncRoot)
-					{
-						int v = ((Response) command).CorrelationId;
-						try
-						{
-							if(requestMap.ContainsKey(v))
-							{
-								oo = requestMap[v];
-								requestMap.Remove(v);
-							}
-						}
-						catch
-						{
-						}
-					}
-
-					Tracked t = oo as Tracked;
-					if(t != null)
-					{
-						t.onResponses();
-					}
-				}
-
-				if(!initialized)
-				{
-					initialized = true;
-				}
-
-				if(command.IsConnectionControl)
-				{
-					this.HandleConnectionControl(command as ConnectionControl);
-				}
-			}
-
-			this.Command(sender, command);
-		}
-
-		public void Oneway(Command command)
-		{
-			Exception error = null;
-
-			lock(reconnectMutex)
-			{
-				if(command != null && ConnectedTransport == null)
-				{
-					if(command.IsShutdownInfo)
-					{
-						// Skipping send of ShutdownInfo command when not connected.
-						return;
-					}
-
-					if(command.IsRemoveInfo || command.IsMessageAck)
-					{
-						// Simulate response to RemoveInfo command or a MessageAck
-                        // since it would be stale at this point.
-                        if(command.ResponseRequired)
-                        {
-						    OnCommand(this, new Response() { CorrelationId = command.CommandId });
-                        }
-						return;
-					}
-				}
-
-				// Keep trying until the message is sent.
-				for(int i = 0; !disposed; i++)
-				{
-					try
-					{
-                        // Any Ack that was being sent when the connection dropped is now
-                        // stale so we don't send it here as it would cause an unmatched ack
-                        // on the broker side and probably prevent a consumer from getting
-                        // any new messages.
-                        if(command.IsMessageAck && i > 0)
-                        {
-                            Tracer.Debug("Inflight MessageAck being dropped as stale.");
-                            if(command.ResponseRequired)
-                            {
-                                OnCommand(this, new Response() { CorrelationId = command.CommandId });
-                            }
-                            return;
-                        }
-
-						// Wait for transport to be connected.
-						ITransport transport = ConnectedTransport;
-						DateTime start = DateTime.Now;
-						bool timedout = false;
-						while(transport == null && !disposed && connectionFailure == null)
-						{
-							Tracer.Info("Waiting for transport to reconnect.");
-
-							int elapsed = (int) (DateTime.Now - start).TotalMilliseconds;
-							if(this.timeout > 0 && elapsed > timeout)
-							{
-								timedout = true;
-								Tracer.DebugFormat("FailoverTransport.oneway - timed out after {0} mills", elapsed);
-								break;
-							}
-
-							// Release so that the reconnect task can run
-							try
-							{
-								// Wait for something
-								Monitor.Wait(reconnectMutex, 1000);
-							}
-							catch(ThreadInterruptedException e)
-							{
-								Tracer.DebugFormat("Interrupted: {0}", e.Message);
-							}
-
-							transport = ConnectedTransport;
-						}
-
-						if(transport == null)
-						{
-							// Previous loop may have exited due to use being disposed.
-							if(disposed)
-							{
-								error = new IOException("Transport disposed.");
-							}
-							else if(connectionFailure != null)
-							{
-								error = connectionFailure;
-							}
-							else if(timedout)
-							{
-								error = new IOException("Failover oneway timed out after " + timeout + " milliseconds.");
-							}
-							else
-							{
-								error = new IOException("Unexpected failure.");
-							}
-							break;
-						}
-
-						// If it was a request and it was not being tracked by
-						// the state tracker, then hold it in the requestMap so
-						// that we can replay it later.
-						Tracked tracked = stateTracker.Track(command);
-						lock(((ICollection) requestMap).SyncRoot)
-						{
-							if(tracked != null && tracked.WaitingForResponse)
-							{
-								requestMap.Add(command.CommandId, tracked);
-							}
-							else if(tracked == null && command.ResponseRequired)
-							{
-								requestMap.Add(command.CommandId, command);
-							}
-						}
-
-						// Send the message.
-						try
-						{
-							transport.Oneway(command);
-							stateTracker.TrackBack(command);
-						}
-						catch(Exception e)
-						{
-							// If the command was not tracked.. we will retry in
-							// this method
-							if(tracked == null)
-							{
-
-								// since we will retry in this method.. take it
-								// out of the request map so that it is not
-								// sent 2 times on recovery
-								if(command.ResponseRequired)
-								{
-									lock(((ICollection) requestMap).SyncRoot)
-									{
-										requestMap.Remove(command.CommandId);
-									}
-								}
-
-								// Rethrow the exception so it will handled by
-								// the outer catch
-								throw e;
-							}
-
-						}
-
-						return;
-
-					}
-					catch(Exception e)
-					{
-						Tracer.DebugFormat("Send Oneway attempt: {0} failed: Message = {1}", i, e.Message);
-						Tracer.DebugFormat("Failed Message Was: {0}", command);
-						HandleTransportFailure(e);
-					}
-				}
-			}
-
-			if(!disposed)
-			{
-				if(error != null)
-				{
-					throw error;
-				}
-			}
-		}
-
-		public void Add(bool rebalance, Uri[] u)
-		{
-			lock(uris)
-			{
-				for(int i = 0; i < u.Length; i++)
-				{
-					if(!uris.Contains(u[i]))
-					{
-						uris.Add(u[i]);
-					}
-				}
-			}
-
-			Reconnect(rebalance);
-		}
-
-		public void Add(bool rebalance, String u)
-		{
-			try
-			{
-				Add(rebalance, new Uri[] { new Uri(u) });
-			}
-			catch(Exception e)
-			{
-				Tracer.ErrorFormat("Failed to parse URI '{0}': {1}", u, e.Message);
-			}
-		}
-
-		public void Remove(bool rebalance, Uri[] u)
-		{
-			lock(uris)
-			{
-				for(int i = 0; i < u.Length; i++)
-				{
-					uris.Remove(u[i]);
-				}
-			}
-
-			Reconnect(rebalance);
-		}
-
-		public void Remove(bool rebalance, String u)
-		{
-			try
-			{
-				Remove(rebalance, new Uri[] { new Uri(u) });
-			}
-			catch(Exception e)
-			{
-				Tracer.ErrorFormat("Failed to parse URI '{0}': {1}", u, e.Message);
-			}
-		}
-
-		public void Reconnect(Uri uri)
-		{
-			Add(true, new Uri[] { uri });
-		}
-
-		public void Reconnect(bool rebalance)
-		{
-			lock(reconnectMutex)
-			{
-				if(started)
-				{
-					if(reconnectTask == null)
-					{
-						Tracer.Debug("Creating reconnect task");
-						reconnectTask = DefaultThreadPools.DefaultTaskRunnerFactory.CreateTaskRunner(new FailoverTask(this),
-											"ActiveMQ Failover Worker: " + this.GetHashCode().ToString());
-					}
-
-					if(rebalance)
-					{
-						ITransport transport = connectedTransport.GetAndSet(null);
-						if(transport != null)
-						{
-							transport.Command = disposedOnCommand;
-							transport.Exception = disposedOnException;
-							try
-							{
-								transport.Stop();
-							}
-							catch(Exception ex)
-							{
-								ex.GetType();   // Ignore errors but this lets us see the error during debugging
-							}
-						}
-					}
-
-					Tracer.Debug("Waking up reconnect task");
-					try
-					{
-						reconnectTask.Wakeup();
-					}
-					catch(ThreadInterruptedException)
-					{
-					}
-				}
-				else
-				{
-					Tracer.Debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
-				}
-			}
-		}
-
-		private List<Uri> ConnectList
-		{
-			get
-			{
-				List<Uri> l = new List<Uri>(uris);
-				bool removed = false;
-				if(failedConnectTransportURI != null)
-				{
-					removed = l.Remove(failedConnectTransportURI);
-				}
-
-				if(Randomize)
-				{
-					// Randomly, reorder the list by random swapping
-					Random r = new Random(DateTime.Now.Millisecond);
-					for(int i = 0; i < l.Count; i++)
-					{
-						int p = r.Next(l.Count);
-						Uri t = l[p];
-						l[p] = l[i];
-						l[i] = t;
-					}
-				}
-
-				if(removed)
-				{
-					l.Add(failedConnectTransportURI);
-				}
-
-				return l;
-			}
-		}
-
-		protected void RestoreTransport(ITransport t)
-		{
-			Tracer.Info("Restoring previous transport connection.");
-			t.Start();
-
-			// Send information to the broker - informing it we are a fault tolerant client
-			t.Oneway(new ConnectionControl() { FaultTolerant = true });
-			stateTracker.DoRestore(t);
-
-			Tracer.Info("Sending queued commands...");
-			Dictionary<int, Command> tmpMap = null;
-			lock(((ICollection) requestMap).SyncRoot)
-			{
-				tmpMap = new Dictionary<int, Command>(requestMap);
-			}
-
-			foreach(Command command in tmpMap.Values)
-			{
-                if(command.IsMessageAck)
-                {
-                    Tracer.Debug("Stored MessageAck being dropped as stale.");
-                    OnCommand(this, new Response() { CorrelationId = command.CommandId });
-                    continue;
-                }
-
-                t.Oneway(command);
-			}
-		}
-
-		public Uri RemoteAddress
-		{
-			get
-			{
-				if(ConnectedTransport != null)
-				{
-					return ConnectedTransport.RemoteAddress;
-				}
-				return null;
-			}
-		}
-
-		public Object Narrow(Type type)
-		{
-			if(this.GetType().Equals(type))
-			{
-				return this;
-			}
-			else if(ConnectedTransport != null)
-			{
-				return ConnectedTransport.Narrow(type);
-			}
-
-			return null;
-		}
-
-		private bool DoConnect()
-		{
-			lock(reconnectMutex)
-			{
-				if(ConnectedTransport != null || disposed || connectionFailure != null)
-				{
-					return false;
-				}
-				else
-				{
-					List<Uri> connectList = ConnectList;
-					if(connectList.Count == 0)
-					{
-						Failure = new NMSConnectionException("No URIs available for connection.");
-					}
-					else
-					{
-						if(!UseExponentialBackOff)
-						{
-							ReconnectDelay = InitialReconnectDelay;
-						}
-
-						try
-						{
-							backupMutex.WaitOne();
-							if(Backup && backups.Count != 0)
-							{
-								BackupTransport bt = backups[0];
-								backups.RemoveAt(0);
-								ITransport t = bt.Transport;
-								Uri uri = bt.Uri;
-								t.Command = OnCommand;
-								t.Exception = OnException;
-								try
-								{
-									if(started)
-									{
-										RestoreTransport(t);
-									}
-									ReconnectDelay = InitialReconnectDelay;
-									failedConnectTransportURI = null;
-									ConnectedTransportURI = uri;
-									ConnectedTransport = t;
-									connectFailures = 0;
-									connected = true;
-									if(this.Resumed != null)
-									{
-										this.Resumed(t);
-									}
-									Tracer.InfoFormat("Successfully reconnected to backup {0}", uri.ToString());
-									return false;
-								}
-								catch(Exception e)
-								{
-									Tracer.DebugFormat("Backup transport failed: {0}", e.Message);
-								}
-							}
-						}
-						finally
-						{
-							backupMutex.ReleaseMutex();
-						}
-
-						ManualResetEvent allDone = new ManualResetEvent(false);
-						ITransport transport = null;
-						Uri chosenUri = null;
-						object syncLock = new object();
-
-						try
-						{
-							foreach(Uri uri in connectList)
-							{
-								if(ConnectedTransport != null || disposed)
-								{
-									break;
-								}
-
-								if(asyncConnect)
-								{
-									Tracer.DebugFormat("Attempting async connect to: {0}", uri);
-									// set connector up
-									Connector connector = new Connector(
-										delegate(ITransport transportToUse, Uri uriToUse)
-										{
-											if(transport == null)
-											{
-												lock(syncLock)
-												{
-													if(transport == null)
-													{
-														//the transport has not yet been set asynchronously so set it
-														transport = transportToUse;
-														chosenUri = uriToUse;
-													}
-													//notify issuing thread to move on
-													allDone.Set();
-												}
-											}
-										}, uri, this);
-
-									// initiate a thread to try connecting to broker
-									Thread thread = new Thread(connector.DoConnect) {Name = uri.ToString()};
-								    thread.Start();
-								}
-								else
-								{
-									// synchronous connect
-									try
-									{
-										Tracer.DebugFormat("Attempting sync connect to: {0}", uri);
-										transport = TransportFactory.CompositeConnect(uri);
-										chosenUri = transport.RemoteAddress;
-										break;
-									}
-									catch(Exception e)
-									{
-										Failure = e;
-										Tracer.DebugFormat("Connect fail to: {0}, reason: {1}", uri, e.Message);
-									}
-								}
-							}
-
-							if(asyncConnect)
-							{
-								// now wait for transport to be populated, but timeout eventually
-								allDone.WaitOne(asyncTimeout, false);
-							}
-
-							if(transport != null)
-							{
-								transport.Command = OnCommand;
-								transport.Exception = OnException;
-								transport.Start();
-
-								if(started)
-								{
-									RestoreTransport(transport);
-								}
-
-								if(this.Resumed != null)
-								{
-									this.Resumed(transport);
-								}
-
-								Tracer.Debug("Connection established");
-								ReconnectDelay = InitialReconnectDelay;
-								ConnectedTransportURI = chosenUri;
-								ConnectedTransport = transport;
-								connectFailures = 0;
-								connected = true;
-
-								if(firstConnection)
-								{
-									firstConnection = false;
-									Tracer.InfoFormat("Successfully connected to: {0}", chosenUri.ToString());
-								}
-								else
-								{
-									Tracer.InfoFormat("Successfully reconnected to: {0}", chosenUri.ToString());
-								}
-
-								return false;
-							}
-
-							if(asyncConnect)
-							{
-								Tracer.DebugFormat("Connect failed after waiting for asynchronous callback.");
-							}
-
-						}
-						catch(Exception e)
-						{
-							Failure = e;
-							Tracer.DebugFormat("Connect attempt failed.  Reason: {0}", e.Message);
-						}
-					}
-
-                    int reconnectAttempts = 0;
-                    if( firstConnection ) {
-                        if( StartupMaxReconnectAttempts != 0 ) {
-                            reconnectAttempts = StartupMaxReconnectAttempts;
-                        }
-                    }
-                    if( reconnectAttempts == 0 ) {
-                        reconnectAttempts = MaxReconnectAttempts;
-                    }
-        
-					if(reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts)
-					{
-						Tracer.ErrorFormat("Failed to connect to transport after {0} attempt(s)", connectFailures);
-						connectionFailure = Failure;
-						this.Exception(this, connectionFailure);
-						return false;
-					}
-				}
-			}
-
-			if(!disposed)
-			{
-				Tracer.DebugFormat("Waiting {0}ms before attempting connection.", ReconnectDelay);
-				lock(sleepMutex)
-				{
-					try
-					{
-						Thread.Sleep(ReconnectDelay);
-					}
-					catch(ThreadInterruptedException)
-					{
-					}
-				}
-
-				if(UseExponentialBackOff)
-				{
-					// Exponential increment of reconnect delay.
-					ReconnectDelay *= ReconnectDelayExponent;
-					if(ReconnectDelay > MaxReconnectDelay)
-					{
-						ReconnectDelay = MaxReconnectDelay;
-					}
-				}
-			}
-			return !disposed;
-		}
-
-		/// <summary>
-		/// This class is a helper for the asynchronous connect option
-		/// </summary>
-		public class Connector
-		{
-			/// <summary>
-			/// callback to properly set chosen transport
-			/// </summary>
-			readonly SetTransport _setTransport;
-
-			/// <summary>
-			/// Uri to try connecting to
-			/// </summary>
-			readonly Uri _uri;
-
-			/// <summary>
-			/// Failover transport issuing the connection attempt
-			/// </summary>
-			private readonly FailoverTransport _transport;
-
-			/// <summary>
-			/// Initializes a new instance of the <see cref="Connector"/> class.
-			/// </summary>
-			/// <param name="setTransport">The set transport.</param>
-			/// <param name="uri">The URI.</param>
-			/// <param name="transport">The transport.</param>
-			public Connector(SetTransport setTransport, Uri uri, FailoverTransport transport)
-			{
-				_uri = uri;
-				_setTransport = setTransport;
-				_transport = transport;
-			}
-
-			/// <summary>
-			/// Does the connect.
-			/// </summary>
-			public void DoConnect()
-			{
-				try
-				{
-					TransportFactory.AsyncCompositeConnect(_uri, _setTransport);
-				}
-				catch(Exception e)
-				{
-					_transport.Failure = e;
-					Tracer.DebugFormat("Connect fail to: {0}, reason: {1}", _uri, e.Message);
-				}
-
-			}
-		}
-
-		private bool BuildBackups()
-		{
-			lock(backupMutex)
-			{
-				if(!disposed && Backup && backups.Count < BackupPoolSize)
-				{
-					List<Uri> connectList = ConnectList;
-					foreach(BackupTransport bt in backups)
-					{
-						if(bt.Disposed)
-						{
-							backups.Remove(bt);
-						}
-					}
-
-					foreach(Uri uri in connectList)
-					{
-						if(ConnectedTransportURI != null && !ConnectedTransportURI.Equals(uri))
-						{
-							try
-							{
-								BackupTransport bt = new BackupTransport(this)
-								{
-									Uri = uri
-								};
-
-								if(!backups.Contains(bt))
-								{
-									ITransport t = TransportFactory.CompositeConnect(uri);
-									t.Command = bt.OnCommand;
-									t.Exception = bt.OnException;
-									t.Start();
-									bt.Transport = t;
-									backups.Add(bt);
-								}
-							}
-							catch(Exception e)
-							{
-								Tracer.DebugFormat("Failed to build backup: {0}", e.Message);
-							}
-						}
-
-						if(backups.Count == BackupPoolSize)
-						{
-							break;
-						}
-					}
-				}
-			}
-
-			return false;
-		}
-
-		public void ConnectionInterruptProcessingComplete(ConnectionId connectionId)
-		{
-			lock(reconnectMutex)
-			{
-				Tracer.Debug("Connection Interrupt Processing is complete for ConnectionId: " + connectionId);
-				stateTracker.ConnectionInterruptProcessingComplete(this, connectionId);
-			}
-		}
-
-		public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
-		{
-			if(IsUpdateURIsSupported)
-			{
-				List<Uri> copy = new List<Uri>(this.updated);
-				List<Uri> added = new List<Uri>();
-
-				if(updatedURIs != null && updatedURIs.Length > 0)
-				{
-					Dictionary<Uri, bool> uriSet = new Dictionary<Uri, bool>();
-					for(int i = 0; i < updatedURIs.Length; i++)
-					{
-						Uri uri = updatedURIs[i];
-						if(uri != null)
-						{
-							uriSet[uri] = true;
-						}
-					}
-
-					foreach(Uri uri in uriSet.Keys)
-					{
-						if(copy.Remove(uri) == false)
-						{
-							added.Add(uri);
-						}
-					}
-
-					lock(reconnectMutex)
-					{
-						this.updated.Clear();
-						this.updated.AddRange(added);
-
-						foreach(Uri uri in copy)
-						{
-							this.uris.Remove(uri);
-						}
-
-						this.Add(rebalance, added.ToArray());
-					}
-				}
-			}
-		}
-
-		public void HandleConnectionControl(ConnectionControl control)
-		{
-			string reconnectStr = control.ReconnectTo;
-
-			if(reconnectStr != null)
-			{
-				reconnectStr = reconnectStr.Trim();
-				if(reconnectStr.Length > 0)
-				{
-					try
-					{
-						Uri uri = new Uri(reconnectStr);
-						if(IsReconnectSupported)
-						{
-							Reconnect(uri);
-							Tracer.Info("Reconnected to: " + uri.OriginalString);
-						}
-					}
-					catch(Exception e)
-					{
-						Tracer.ErrorFormat("Failed to handle ConnectionControl reconnect to {0}: {1}", reconnectStr, e);
-					}
-				}
-			}
-
-			ProcessNewTransports(control.RebalanceConnection, control.ConnectedBrokers);
-		}
-
-		private void ProcessNewTransports(bool rebalance, String newTransports)
-		{
-			if(newTransports != null)
-			{
-				newTransports = newTransports.Trim();
-
-				if(newTransports.Length > 0 && IsUpdateURIsSupported)
-				{
-					List<Uri> list = new List<Uri>();
-					String[] tokens = newTransports.Split(new Char[] { ',' });
-
-					foreach(String str in tokens)
-					{
-						try
-						{
-							Uri uri = new Uri(str);
-							list.Add(uri);
-						}
-						catch
-						{
-							Tracer.Error("Failed to parse broker address: " + str);
-						}
-					}
-
-					if(list.Count != 0)
-					{
-						try
-						{
-							UpdateURIs(rebalance, list.ToArray());
-						}
-						catch
-						{
-							Tracer.Error("Failed to update transport URI's from: " + newTransports);
-						}
-					}
-				}
-			}
-		}
-
-		public void Dispose()
-		{
-			Dispose(true);
-			GC.SuppressFinalize(this);
-		}
-
-		public void Dispose(bool disposing)
-		{
-			if(disposing)
-			{
-				// get rid of unmanaged stuff
-			}
-
-            this.Stop();
-
-			disposed = true;
-		}
-
-		public int CompareTo(Object o)
-		{
-			if(o is FailoverTransport)
-			{
-				FailoverTransport oo = o as FailoverTransport;
-
-				return this.id - oo.id;
-			}
-			else
-			{
-				throw new ArgumentException();
-			}
-		}
-
-		public override String ToString()
-		{
-			return ConnectedTransportURI == null ? "unconnected" : ConnectedTransportURI.ToString();
-		}
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Threading;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.ActiveMQ.State;
+using Apache.NMS.ActiveMQ.Threads;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Failover
+{
+	/// <summary>
+	/// A Transport that is made reliable by being able to fail over to another
+	/// transport when a transport failure is detected.
+	/// </summary>
+	public class FailoverTransport : ICompositeTransport, IComparable
+	{
+		private static int idCounter = 0;
+		private readonly int id;
+
+		private bool disposed;
+		private bool connected;
+		private readonly List<Uri> uris = new List<Uri>();
+		private readonly List<Uri> updated = new List<Uri>();
+		private CommandHandler commandHandler;
+		private ExceptionHandler exceptionHandler;
+		private InterruptedHandler interruptedHandler;
+		private ResumedHandler resumedHandler;
+
+		private readonly Mutex reconnectMutex = new Mutex();
+		private readonly Mutex backupMutex = new Mutex();
+		private readonly Mutex sleepMutex = new Mutex();
+		private readonly ConnectionStateTracker stateTracker = new ConnectionStateTracker();
+		private readonly Dictionary<int, Command> requestMap = new Dictionary<int, Command>();
+
+		private Uri connectedTransportURI;
+		private Uri failedConnectTransportURI;
+		private readonly AtomicReference<ITransport> connectedTransport = new AtomicReference<ITransport>(null);
+		private TaskRunner reconnectTask = null;
+		private bool started;
+
+		private int timeout = -1;
+		private int initialReconnectDelay = 10;
+		private int maxReconnectDelay = 1000 * 30;
+		private int backOffMultiplier = 2;
+		private bool useExponentialBackOff = true;
+		private bool randomize = true;
+		private bool initialized;
+        private int maxReconnectAttempts;
+        private int startupMaxReconnectAttempts;
+		private int connectFailures;
+		private int reconnectDelay = 10;
+		private int asyncTimeout = 45000;
+		private bool asyncConnect = false;
+		private Exception connectionFailure;
+		private bool firstConnection = true;
+		private bool backup = false;
+		private readonly List<BackupTransport> backups = new List<BackupTransport>();
+		private int backupPoolSize = 1;
+		private bool trackMessages = false;
+		private int maxCacheSize = 256;
+		private volatile Exception failure;
+		private readonly object mutex = new object();
+		private bool reconnectSupported = true;
+		private bool updateURIsSupported = true;
+
+		public FailoverTransport()
+		{
+			id = idCounter++;
+
+			stateTracker.TrackTransactions = true;
+            reconnectTask = DefaultThreadPools.DefaultTaskRunnerFactory.CreateTaskRunner(
+                new FailoverTask(this), "ActiveMQ Failover Worker: " + this.GetHashCode().ToString());
+		}
+
+		~FailoverTransport()
+		{
+			Dispose(false);
+		}
+
+		#region FailoverTask
+
+		private class FailoverTask : Task
+		{
+			private readonly FailoverTransport parent;
+
+			public FailoverTask(FailoverTransport p)
+			{
+				parent = p;
+			}
+
+			public bool Iterate()
+			{
+				bool result = false;
+                if (!parent.IsStarted)
+                {
+                    return false;
+                }
+
+				bool buildBackup = true;
+				bool doReconnect = !parent.disposed && parent.connectionFailure == null;
+				try
+				{
+					parent.backupMutex.WaitOne();
+					if(parent.ConnectedTransport == null && doReconnect)
+					{
+						result = parent.DoConnect();
+						buildBackup = false;
+					}
+				}
+				finally
+				{
+					parent.backupMutex.ReleaseMutex();
+				}
+
+				if(buildBackup)
+				{
+					parent.BuildBackups();
+				}
+				else
+				{
+					//build backups on the next iteration
+					result = true;
+					try
+					{
+						parent.reconnectTask.Wakeup();
+					}
+					catch(ThreadInterruptedException)
+					{
+						Tracer.Debug("Reconnect task has been interrupted.");
+					}
+				}
+				return result;
+			}
+		}
+
+		#endregion
+
+		#region Property Accessors
+
+		public CommandHandler Command
+		{
+			get { return commandHandler; }
+			set { commandHandler = value; }
+		}
+
+		public ExceptionHandler Exception
+		{
+			get { return exceptionHandler; }
+			set { exceptionHandler = value; }
+		}
+
+		public InterruptedHandler Interrupted
+		{
+			get { return interruptedHandler; }
+			set { this.interruptedHandler = value; }
+		}
+
+		public ResumedHandler Resumed
+		{
+			get { return resumedHandler; }
+			set { this.resumedHandler = value; }
+		}
+
+		internal Exception Failure
+		{
+			get { return failure; }
+			set
+			{
+				lock(mutex)
+				{
+					failure = value;
+				}
+			}
+		}
+
+		public int Timeout
+		{
+			get { return this.timeout; }
+			set { this.timeout = value; }
+		}
+
+		public int InitialReconnectDelay
+		{
+			get { return initialReconnectDelay; }
+			set { initialReconnectDelay = value; }
+		}
+
+		public int MaxReconnectDelay
+		{
+			get { return maxReconnectDelay; }
+			set { maxReconnectDelay = value; }
+		}
+
+		public int ReconnectDelay
+		{
+			get { return reconnectDelay; }
+			set { reconnectDelay = value; }
+		}
+
+		public int ReconnectDelayExponent
+		{
+			get { return backOffMultiplier; }
+			set { backOffMultiplier = value; }
+		}
+
+		public ITransport ConnectedTransport
+		{
+			get { return connectedTransport.Value; }
+			set { connectedTransport.Value = value; }
+		}
+
+		public Uri ConnectedTransportURI
+		{
+			get { return connectedTransportURI; }
+			set { connectedTransportURI = value; }
+		}
+
+		public int MaxReconnectAttempts
+		{
+			get { return maxReconnectAttempts; }
+			set { maxReconnectAttempts = value; }
+		}
+
+        public int StartupMaxReconnectAttempts
+        {
+            get { return startupMaxReconnectAttempts; }
+            set { startupMaxReconnectAttempts = value; }
+        }
+
+		public bool Randomize
+		{
+			get { return randomize; }
+			set { randomize = value; }
+		}
+
+		public bool Backup
+		{
+			get { return backup; }
+			set { backup = value; }
+		}
+
+		public int BackupPoolSize
+		{
+			get { return backupPoolSize; }
+			set { backupPoolSize = value; }
+		}
+
+		public bool TrackMessages
+		{
+			get { return trackMessages; }
+			set { trackMessages = value; }
+		}
+
+		public int MaxCacheSize
+		{
+			get { return maxCacheSize; }
+			set { maxCacheSize = value; }
+		}
+
+		public bool UseExponentialBackOff
+		{
+			get { return useExponentialBackOff; }
+			set { useExponentialBackOff = value; }
+		}
+
+        public IWireFormat WireFormat
+        {
+            get
+            {
+                ITransport transport = ConnectedTransport;
+                if(transport != null)
+                {
+                    return transport.WireFormat;
+                }
+
+                return null;
+            }
+        }
+
+		/// <summary>
+		/// Gets or sets a value indicating whether to asynchronously connect to sockets
+		/// </summary>
+		/// <value><c>true</c> if [async connect]; otherwise, <c>false</c>.</value>
+		public bool AsyncConnect
+		{
+			set { asyncConnect = value; }
+		}
+
+		/// <summary>
+		/// If doing an asynchronous connect, the milliseconds before timing out if no connection can be made
+		/// </summary>
+		/// <value>The async timeout.</value>
+		public int AsyncTimeout
+		{
+			get { return asyncTimeout; }
+			set { asyncTimeout = value; }
+		}
+
+        public ConnectionStateTracker StateTracker
+        {
+            get { return this.stateTracker; }
+        }
+
+		#endregion
+
+		public bool IsFaultTolerant
+		{
+			get { return true; }
+		}
+
+		public bool IsDisposed
+		{
+			get { return disposed; }
+		}
+
+		public bool IsConnected
+		{
+			get { return connected; }
+		}
+
+		public bool IsStarted
+		{
+			get { return started; }
+		}
+
+		public bool IsReconnectSupported
+		{
+			get { return this.reconnectSupported; }
+		}
+
+		public bool IsUpdateURIsSupported
+		{
+			get { return this.updateURIsSupported; }
+		}
+
+		public void OnException(ITransport sender, Exception error)
+		{
+			try
+			{
+				HandleTransportFailure(error);
+			}
+			catch(Exception e)
+			{
+				e.GetType();
+				// What to do here?
+			}
+		}
+
+		public void disposedOnCommand(ITransport sender, Command c)
+		{
+		}
+
+		public void disposedOnException(ITransport sender, Exception e)
+		{
+		}
+
+		public void HandleTransportFailure(Exception e)
+		{
+			ITransport transport = connectedTransport.GetAndSet(null);
+			if(transport != null)
+			{
+				transport.Command = disposedOnCommand;
+				transport.Exception = disposedOnException;
+				try
+				{
+					transport.Stop();
+				}
+				catch(Exception ex)
+				{
+					ex.GetType();	// Ignore errors but this lets us see the error during debugging
+				}
+
+				lock(reconnectMutex)
+				{
+					bool reconnectOk = false;
+					if(started)
+					{
+						Tracer.WarnFormat("Transport failed to {0}, attempting to automatically reconnect due to: {1}", ConnectedTransportURI.ToString(), e.Message);
+						reconnectOk = true;
+					}
+
+					initialized = false;
+					failedConnectTransportURI = ConnectedTransportURI;
+					ConnectedTransportURI = null;
+					connected = false;
+
+					if(this.Interrupted != null)
+					{
+						this.Interrupted(transport);
+					}
+
+					if(reconnectOk)
+					{
+						reconnectTask.Wakeup();
+					}
+				}
+			}
+		}
+
+		public void Start()
+		{
+			lock(reconnectMutex)
+			{
+				if(started)
+				{
+					Tracer.Debug("FailoverTransport Already Started.");
+					return;
+				}
+
+				Tracer.Debug("FailoverTransport Started.");
+				started = true;
+				stateTracker.MaxCacheSize = MaxCacheSize;
+				stateTracker.TrackMessages = TrackMessages;
+				if(ConnectedTransport != null)
+				{
+                    Tracer.Debug("FailoverTransport already connected, start is restoring.");
+					stateTracker.DoRestore(ConnectedTransport);
+				}
+				else
+				{
+                    Tracer.Debug("FailoverTransport not connected, start is reconnecting.");
+					Reconnect(false);
+				}
+			}
+		}
+
+		public virtual void Stop()
+		{
+			ITransport transportToStop = null;
+
+			lock(reconnectMutex)
+			{
+				if(!started)
+				{
+					Tracer.Debug("FailoverTransport Already Stopped.");
+					return;
+				}
+
+				Tracer.Debug("FailoverTransport Stopped.");
+				started = false;
+				disposed = true;
+				connected = false;
+				foreach(BackupTransport t in backups)
+				{
+					t.Disposed = true;
+				}
+				backups.Clear();
+
+				if(ConnectedTransport != null)
+				{
+					transportToStop = connectedTransport.GetAndSet(null);
+				}
+			}
+
+			try
+			{
+				sleepMutex.WaitOne();
+			}
+			finally
+			{
+				sleepMutex.ReleaseMutex();
+			}
+
+			if(reconnectTask != null)
+			{
+				reconnectTask.Shutdown();
+			}
+
+			if(transportToStop != null)
+			{
+				transportToStop.Stop();
+			}
+		}
+
+		public FutureResponse AsyncRequest(Command command)
+		{
+			throw new ApplicationException("FailoverTransport does not implement AsyncRequest(Command)");
+		}
+
+		public Response Request(Command command)
+		{
+			throw new ApplicationException("FailoverTransport does not implement Request(Command)");
+		}
+
+		public Response Request(Command command, TimeSpan ts)
+		{
+			throw new ApplicationException("FailoverTransport does not implement Request(Command, TimeSpan)");
+		}
+
+		public void OnCommand(ITransport sender, Command command)
+		{
+			if(command != null)
+			{
+				if(command.IsResponse)
+				{
+					Object oo = null;
+					lock(((ICollection) requestMap).SyncRoot)
+					{
+						int v = ((Response) command).CorrelationId;
+						try
+						{
+							if(requestMap.ContainsKey(v))
+							{
+								oo = requestMap[v];
+								requestMap.Remove(v);
+							}
+						}
+						catch
+						{
+						}
+					}
+
+					Tracked t = oo as Tracked;
+					if(t != null)
+					{
+						t.onResponses();
+					}
+				}
+
+				if(!initialized)
+				{
+					initialized = true;
+				}
+
+				if(command.IsConnectionControl)
+				{
+					this.HandleConnectionControl(command as ConnectionControl);
+				}
+			}
+
+			this.Command(sender, command);
+		}
+
+		public void Oneway(Command command)
+		{
+			Exception error = null;
+
+			lock(reconnectMutex)
+			{
+				if(command != null && ConnectedTransport == null)
+				{
+					if(command.IsShutdownInfo)
+					{
+						// Skipping send of ShutdownInfo command when not connected.
+						return;
+					}
+					else if(command.IsRemoveInfo || command.IsMessageAck)
+					{
+						// Simulate response to RemoveInfo command or a MessageAck
+                        // since it would be stale at this point.
+                        if(command.ResponseRequired)
+                        {
+						    OnCommand(this, new Response() { CorrelationId = command.CommandId });
+                        }
+						return;
+					}
+				}
+
+				// Keep trying until the message is sent.
+				for(int i = 0; !disposed; i++)
+				{
+					try
+					{
+                        // Any Ack that was being sent when the connection dropped is now
+                        // stale so we don't send it here as it would cause an unmatched ack
+                        // on the broker side and probably prevent a consumer from getting
+                        // any new messages.
+                        if(command.IsMessageAck && i > 0)
+                        {
+                            Tracer.Debug("Inflight MessageAck being dropped as stale.");
+                            if(command.ResponseRequired)
+                            {
+                                OnCommand(this, new Response() { CorrelationId = command.CommandId });
+                            }
+                            return;
+                        }
+
+						// Wait for transport to be connected.
+						ITransport transport = ConnectedTransport;
+						DateTime start = DateTime.Now;
+						bool timedout = false;
+						while(transport == null && !disposed && connectionFailure == null)
+						{
+							Tracer.Info("Waiting for transport to reconnect.");
+
+							int elapsed = (int) (DateTime.Now - start).TotalMilliseconds;
+							if(this.timeout > 0 && elapsed > timeout)
+							{
+								timedout = true;
+								Tracer.DebugFormat("FailoverTransport.oneway - timed out after {0} mills", elapsed);
+								break;
+							}
+
+							// Release so that the reconnect task can run
+							try
+							{
+								// Wait for something
+								Monitor.Wait(reconnectMutex, 100);
+							}
+							catch(ThreadInterruptedException e)
+							{
+								Tracer.DebugFormat("Interrupted: {0}", e.Message);
+							}
+
+							transport = ConnectedTransport;
+						}
+
+						if(transport == null)
+						{
+							// Previous loop may have exited due to use being disposed.
+							if(disposed)
+							{
+								error = new IOException("Transport disposed.");
+							}
+							else if(connectionFailure != null)
+							{
+								error = connectionFailure;
+							}
+							else if(timedout)
+							{
+								error = new IOException("Failover oneway timed out after " + timeout + " milliseconds.");
+							}
+							else
+							{
+								error = new IOException("Unexpected failure.");
+							}
+							break;
+						}
+
+						// If it was a request and it was not being tracked by
+						// the state tracker, then hold it in the requestMap so
+						// that we can replay it later.
+						Tracked tracked = stateTracker.Track(command);
+						lock(((ICollection) requestMap).SyncRoot)
+						{
+							if(tracked != null && tracked.WaitingForResponse)
+							{
+								requestMap.Add(command.CommandId, tracked);
+							}
+							else if(tracked == null && command.ResponseRequired)
+							{
+								requestMap.Add(command.CommandId, command);
+							}
+						}
+
+						// Send the message.
+						try
+						{
+							transport.Oneway(command);
+							stateTracker.TrackBack(command);
+						}
+						catch(Exception e)
+						{
+							// If the command was not tracked.. we will retry in
+							// this method
+							if(tracked == null)
+							{
+								// since we will retry in this method.. take it
+								// out of the request map so that it is not
+								// sent 2 times on recovery
+								if(command.ResponseRequired)
+								{
+									lock(((ICollection) requestMap).SyncRoot)
+									{
+										requestMap.Remove(command.CommandId);
+									}
+								}
+
+								// Rethrow the exception so it will handled by
+								// the outer catch
+								throw e;
+							}
+						}
+
+						return;
+					}
+					catch(Exception e)
+					{
+						Tracer.DebugFormat("Send Oneway attempt: {0} failed: Message = {1}", i, e.Message);
+						Tracer.DebugFormat("Failed Message Was: {0}", command);
+						HandleTransportFailure(e);
+					}
+				}
+			}
+
+			if(!disposed)
+			{
+				if(error != null)
+				{
+					throw error;
+				}
+			}
+		}
+
+		public void Add(bool rebalance, Uri[] u)
+		{
+			lock(uris)
+			{
+				for(int i = 0; i < u.Length; i++)
+				{
+					if(!uris.Contains(u[i]))
+					{
+						uris.Add(u[i]);
+					}
+				}
+			}
+
+			Reconnect(rebalance);
+		}
+
+		public void Add(bool rebalance, String u)
+		{
+			try
+			{
+				Add(rebalance, new Uri[] { new Uri(u) });
+			}
+			catch(Exception e)
+			{
+				Tracer.ErrorFormat("Failed to parse URI '{0}': {1}", u, e.Message);
+			}
+		}
+
+		public void Remove(bool rebalance, Uri[] u)
+		{
+			lock(uris)
+			{
+				for(int i = 0; i < u.Length; i++)
+				{
+					uris.Remove(u[i]);
+				}
+			}
+
+			Reconnect(rebalance);
+		}
+
+		public void Remove(bool rebalance, String u)
+		{
+			try
+			{
+				Remove(rebalance, new Uri[] { new Uri(u) });
+			}
+			catch(Exception e)
+			{
+				Tracer.ErrorFormat("Failed to parse URI '{0}': {1}", u, e.Message);
+			}
+		}
+
+		public void Reconnect(Uri uri)
+		{
+			Add(true, new Uri[] { uri });
+		}
+
+		public void Reconnect(bool rebalance)
+		{
+			lock(reconnectMutex)
+			{
+				if(started)
+				{
+					if(rebalance)
+					{
+						ITransport transport = connectedTransport.GetAndSet(null);
+						if(transport != null)
+						{
+							transport.Command = disposedOnCommand;
+							transport.Exception = disposedOnException;
+							try
+							{
+								transport.Stop();
+							}
+							catch(Exception ex)
+							{
+								ex.GetType();   // Ignore errors but this lets us see the error during debugging
+							}
+						}
+					}
+
+					Tracer.Debug("Waking up reconnect task");
+					try
+					{
+						reconnectTask.Wakeup();
+					}
+					catch(ThreadInterruptedException)
+					{
+					}
+				}
+				else
+				{
+					Tracer.Debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
+				}
+			}
+		}
+
+		private List<Uri> ConnectList
+		{
+			get
+			{
+				List<Uri> l = new List<Uri>(uris);
+				bool removed = false;
+				if(failedConnectTransportURI != null)
+				{
+					removed = l.Remove(failedConnectTransportURI);
+				}
+
+				if(Randomize)
+				{
+					// Randomly, reorder the list by random swapping
+					Random r = new Random(DateTime.Now.Millisecond);
+					for(int i = 0; i < l.Count; i++)
+					{
+						int p = r.Next(l.Count);
+						Uri t = l[p];
+						l[p] = l[i];
+						l[i] = t;
+					}
+				}
+
+				if(removed)
+				{
+					l.Add(failedConnectTransportURI);
+				}
+
+				return l;
+			}
+		}
+
+		protected void RestoreTransport(ITransport t)
+		{
+			Tracer.Info("Restoring previous transport connection.");
+			t.Start();
+
+			// Send information to the broker - informing it we are a fault tolerant client
+			t.Oneway(new ConnectionControl() { FaultTolerant = true });
+			stateTracker.DoRestore(t);
+
+			Tracer.Info("Sending queued commands...");
+			Dictionary<int, Command> tmpMap = null;
+			lock(((ICollection) requestMap).SyncRoot)
+			{
+				tmpMap = new Dictionary<int, Command>(requestMap);
+			}
+
+			foreach(Command command in tmpMap.Values)
+			{
+                if(command.IsMessageAck)
+                {
+                    Tracer.Debug("Stored MessageAck being dropped as stale.");
+                    OnCommand(this, new Response() { CorrelationId = command.CommandId });
+                    continue;
+                }
+
+                t.Oneway(command);
+			}
+		}
+
+		public Uri RemoteAddress
+		{
+			get
+			{
+				if(ConnectedTransport != null)
+				{
+					return ConnectedTransport.RemoteAddress;
+				}
+				return null;
+			}
+		}
+
+		public Object Narrow(Type type)
+		{
+			if(this.GetType().Equals(type))
+			{
+				return this;
+			}
+			else if(ConnectedTransport != null)
+			{
+				return ConnectedTransport.Narrow(type);
+			}
+
+			return null;
+		}
+
+		private bool DoConnect()
+		{
+			lock(reconnectMutex)
+			{
+				if(ConnectedTransport != null || disposed || connectionFailure != null)
+				{
+					return false;
+				}
+				else
+				{
+					List<Uri> connectList = ConnectList;
+					if(connectList.Count == 0)
+					{
+						Failure = new NMSConnectionException("No URIs available for connection.");
+					}
+					else
+					{
+						if(!UseExponentialBackOff)
+						{
+							ReconnectDelay = InitialReconnectDelay;
+						}
+
+						try
+						{
+							backupMutex.WaitOne();
+							if(Backup && backups.Count != 0)
+							{
+								BackupTransport bt = backups[0];
+								backups.RemoveAt(0);
+								ITransport t = bt.Transport;
+								Uri uri = bt.Uri;
+								t.Command = OnCommand;
+								t.Exception = OnException;
+								try
+								{
+									if(started)
+									{
+										RestoreTransport(t);
+									}
+									ReconnectDelay = InitialReconnectDelay;
+									failedConnectTransportURI = null;
+									ConnectedTransportURI = uri;
+									ConnectedTransport = t;
+									connectFailures = 0;
+									connected = true;
+                                    Monitor.PulseAll(reconnectMutex);
+									if(this.Resumed != null)
+									{
+										this.Resumed(t);
+									}
+									Tracer.InfoFormat("Successfully reconnected to backup {0}", uri.ToString());
+									return false;
+								}
+								catch(Exception e)
+								{
+									Tracer.DebugFormat("Backup transport failed: {0}", e.Message);
+								}
+							}
+						}
+						finally
+						{
+							backupMutex.ReleaseMutex();
+						}
+
+						ManualResetEvent allDone = new ManualResetEvent(false);
+						ITransport transport = null;
+						Uri chosenUri = null;
+						object syncLock = new object();
+
+						try
+						{
+							foreach(Uri uri in connectList)
+							{
+								if(ConnectedTransport != null || disposed)
+								{
+									break;
+								}
+
+								if(asyncConnect)
+								{
+									Tracer.DebugFormat("Attempting async connect to: {0}", uri);
+									// set connector up
+									Connector connector = new Connector(
+										delegate(ITransport transportToUse, Uri uriToUse)
+										{
+											if(transport == null)
+											{
+												lock(syncLock)
+												{
+													if(transport == null)
+													{
+														//the transport has not yet been set asynchronously so set it
+														transport = transportToUse;
+														chosenUri = uriToUse;
+													}
+													//notify issuing thread to move on
+													allDone.Set();
+												}
+											}
+										}, uri, this);
+
+									// initiate a thread to try connecting to broker
+									Thread thread = new Thread(connector.DoConnect) {Name = uri.ToString()};
+								    thread.Start();
+								}
+								else
+								{
+									// synchronous connect
+									try
+									{
+										Tracer.DebugFormat("Attempting sync connect to: {0}", uri);
+										transport = TransportFactory.CompositeConnect(uri);
+										chosenUri = transport.RemoteAddress;
+										break;
+									}
+									catch(Exception e)
+									{
+										Failure = e;
+										Tracer.DebugFormat("Connect fail to: {0}, reason: {1}", uri, e.Message);
+									}
+								}
+							}
+
+							if(asyncConnect)
+							{
+								// now wait for transport to be populated, but timeout eventually
+								allDone.WaitOne(asyncTimeout, false);
+							}
+
+							if(transport != null)
+							{
+								transport.Command = OnCommand;
+								transport.Exception = OnException;
+								transport.Start();
+
+								if(started)
+								{
+									RestoreTransport(transport);
+								}
+
+								if(this.Resumed != null)
+								{
+									this.Resumed(transport);
+								}
+
+								Tracer.Debug("Connection established");
+								ReconnectDelay = InitialReconnectDelay;
+								ConnectedTransportURI = chosenUri;
+								ConnectedTransport = transport;
+								connectFailures = 0;
+								connected = true;
+                                Monitor.PulseAll(reconnectMutex);
+
+								if(firstConnection)
+								{
+									firstConnection = false;
+									Tracer.InfoFormat("Successfully connected to: {0}", chosenUri.ToString());
+								}
+								else
+								{
+									Tracer.InfoFormat("Successfully reconnected to: {0}", chosenUri.ToString());
+								}
+
+								return false;
+							}
+
+							if(asyncConnect)
+							{
+								Tracer.DebugFormat("Connect failed after waiting for asynchronous callback.");
+							}
+						}
+						catch(Exception e)
+						{
+							Failure = e;
+							Tracer.DebugFormat("Connect attempt failed.  Reason: {0}", e.Message);
+						}
+					}
+
+                    int reconnectAttempts = 0;
+                    if( firstConnection ) {
+                        if( StartupMaxReconnectAttempts != 0 ) {
+                            reconnectAttempts = StartupMaxReconnectAttempts;
+                        }
+                    }
+                    if( reconnectAttempts == 0 ) {
+                        reconnectAttempts = MaxReconnectAttempts;
+                    }
+        
+					if(reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts)
+					{
+						Tracer.ErrorFormat("Failed to connect to transport after {0} attempt(s)", connectFailures);
+						connectionFailure = Failure;
+						this.Exception(this, connectionFailure);
+						return false;
+					}
+				}
+			}
+
+			if(!disposed)
+			{
+				Tracer.DebugFormat("Waiting {0}ms before attempting connection.", ReconnectDelay);
+				lock(sleepMutex)
+				{
+					try
+					{
+						Thread.Sleep(ReconnectDelay);
+					}
+					catch(ThreadInterruptedException)
+					{
+					}
+				}
+
+				if(UseExponentialBackOff)
+				{
+					// Exponential increment of reconnect delay.
+					ReconnectDelay *= ReconnectDelayExponent;
+					if(ReconnectDelay > MaxReconnectDelay)
+					{
+						ReconnectDelay = MaxReconnectDelay;
+					}
+				}
+			}
+			return !disposed;
+		}
+
+		/// <summary>
+		/// This class is a helper for the asynchronous connect option
+		/// </summary>
+		public class Connector
+		{
+			/// <summary>
+			/// callback to properly set chosen transport
+			/// </summary>
+			readonly SetTransport _setTransport;
+
+			/// <summary>
+			/// Uri to try connecting to
+			/// </summary>
+			readonly Uri _uri;
+
+			/// <summary>
+			/// Failover transport issuing the connection attempt
+			/// </summary>
+			private readonly FailoverTransport _transport;
+
+			/// <summary>
+			/// Initializes a new instance of the <see cref="Connector"/> class.
+			/// </summary>
+			/// <param name="setTransport">The set transport.</param>
+			/// <param name="uri">The URI.</param>
+			/// <param name="transport">The transport.</param>
+			public Connector(SetTransport setTransport, Uri uri, FailoverTransport transport)
+			{
+				_uri = uri;
+				_setTransport = setTransport;
+				_transport = transport;
+			}
+
+			/// <summary>
+			/// Does the connect.
+			/// </summary>
+			public void DoConnect()
+			{
+				try
+				{
+					TransportFactory.AsyncCompositeConnect(_uri, _setTransport);
+				}
+				catch(Exception e)
+				{
+					_transport.Failure = e;
+					Tracer.DebugFormat("Connect fail to: {0}, reason: {1}", _uri, e.Message);
+				}
+
+			}
+		}
+
+		private bool BuildBackups()
+		{
+			lock(backupMutex)
+			{
+				if(!disposed && Backup && backups.Count < BackupPoolSize)
+				{
+					List<Uri> connectList = ConnectList;
+					foreach(BackupTransport bt in backups)
+					{
+						if(bt.Disposed)
+						{
+							backups.Remove(bt);
+						}
+					}
+
+					foreach(Uri uri in connectList)
+					{
+						if(ConnectedTransportURI != null && !ConnectedTransportURI.Equals(uri))
+						{
+							try
+							{
+								BackupTransport bt = new BackupTransport(this)
+								{
+									Uri = uri
+								};
+
+								if(!backups.Contains(bt))
+								{
+									ITransport t = TransportFactory.CompositeConnect(uri);
+									t.Command = bt.OnCommand;
+									t.Exception = bt.OnException;
+									t.Start();
+									bt.Transport = t;
+									backups.Add(bt);
+								}
+							}
+							catch(Exception e)
+							{
+								Tracer.DebugFormat("Failed to build backup: {0}", e.Message);
+							}
+						}
+
+						if(backups.Count == BackupPoolSize)
+						{
+							break;
+						}
+					}
+				}
+			}
+
+			return false;
+		}
+
+		public void ConnectionInterruptProcessingComplete(ConnectionId connectionId)
+		{
+			lock(reconnectMutex)
+			{
+				Tracer.Debug("Connection Interrupt Processing is complete for ConnectionId: " + connectionId);
+				stateTracker.ConnectionInterruptProcessingComplete(this, connectionId);
+			}
+		}
+
+		public void UpdateURIs(bool rebalance, Uri[] updatedURIs)
+		{
+			if(IsUpdateURIsSupported)
+			{
+				List<Uri> copy = new List<Uri>(this.updated);
+				List<Uri> added = new List<Uri>();
+
+				if(updatedURIs != null && updatedURIs.Length > 0)
+				{
+					Dictionary<Uri, bool> uriSet = new Dictionary<Uri, bool>();
+					for(int i = 0; i < updatedURIs.Length; i++)
+					{
+						Uri uri = updatedURIs[i];
+						if(uri != null)
+						{
+							uriSet[uri] = true;
+						}
+					}
+
+					foreach(Uri uri in uriSet.Keys)
+					{
+						if(copy.Remove(uri) == false)
+						{
+							added.Add(uri);
+						}
+					}
+
+					lock(reconnectMutex)
+					{
+						this.updated.Clear();
+						this.updated.AddRange(added);
+
+						foreach(Uri uri in copy)
+						{
+							this.uris.Remove(uri);
+						}
+
+						this.Add(rebalance, added.ToArray());
+					}
+				}
+			}
+		}
+
+		public void HandleConnectionControl(ConnectionControl control)
+		{
+			string reconnectStr = control.ReconnectTo;
+
+			if(reconnectStr != null)
+			{
+				reconnectStr = reconnectStr.Trim();
+				if(reconnectStr.Length > 0)
+				{
+					try
+					{
+						Uri uri = new Uri(reconnectStr);
+						if(IsReconnectSupported)
+						{
+							Reconnect(uri);
+							Tracer.Info("Reconnected to: " + uri.OriginalString);
+						}
+					}
+					catch(Exception e)
+					{
+						Tracer.ErrorFormat("Failed to handle ConnectionControl reconnect to {0}: {1}", reconnectStr, e);
+					}
+				}
+			}
+
+			ProcessNewTransports(control.RebalanceConnection, control.ConnectedBrokers);
+		}
+
+		private void ProcessNewTransports(bool rebalance, String newTransports)
+		{
+			if(newTransports != null)
+			{
+				newTransports = newTransports.Trim();
+
+				if(newTransports.Length > 0 && IsUpdateURIsSupported)
+				{
+					List<Uri> list = new List<Uri>();
+					String[] tokens = newTransports.Split(new Char[] { ',' });
+
+					foreach(String str in tokens)
+					{
+						try
+						{
+							Uri uri = new Uri(str);
+							list.Add(uri);
+						}
+						catch
+						{
+							Tracer.Error("Failed to parse broker address: " + str);
+						}
+					}
+
+					if(list.Count != 0)
+					{
+						try
+						{
+							UpdateURIs(rebalance, list.ToArray());
+						}
+						catch
+						{
+							Tracer.Error("Failed to update transport URI's from: " + newTransports);
+						}
+					}
+				}
+			}
+		}
+
+		public void Dispose()
+		{
+			Dispose(true);
+			GC.SuppressFinalize(this);
+		}
+
+		public void Dispose(bool disposing)
+		{
+			if(disposing)
+			{
+				// get rid of unmanaged stuff
+			}
+
+            this.Stop();
+
+			disposed = true;
+		}
+
+		public int CompareTo(Object o)
+		{
+			if(o is FailoverTransport)
+			{
+				FailoverTransport oo = o as FailoverTransport;
+
+				return this.id - oo.id;
+			}
+			else
+			{
+				throw new ArgumentException();
+			}
+		}
+
+		public override String ToString()
+		{
+			return ConnectedTransportURI == null ? "unconnected" : ConnectedTransportURI.ToString();
+		}
+	}
+}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Transport/InactivityMonitor.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Transport/InactivityMonitor.cs?rev=1196799&r1=1196798&r2=1196799&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Transport/InactivityMonitor.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/Transport/InactivityMonitor.cs Wed Nov  2 21:08:05 2011
@@ -262,9 +262,9 @@ namespace Apache.NMS.ActiveMQ.Transport
 		public override void Oneway(Command command)
 		{
 			// Disable inactivity monitoring while processing a command.
-			//synchronize this method - its not synchronized
-			//further down the transport stack and gets called by more
-			//than one thread  by this class
+			// synchronize this method - its not synchronized
+			// further down the transport stack and gets called by more
+			// than one thread  by this class
 			lock(inWrite)
 			{
 				inWrite.Value = true;
@@ -330,10 +330,10 @@ namespace Apache.NMS.ActiveMQ.Transport
 					Math.Min(
 						localWireFormatInfo.MaxInactivityDuration,
 						remoteWireFormatInfo.MaxInactivityDuration);
-				initialDelayTime =
-					Math.Min(
-						localWireFormatInfo.MaxInactivityDurationInitialDelay,
-						remoteWireFormatInfo.MaxInactivityDurationInitialDelay);
+				initialDelayTime = remoteWireFormatInfo.MaxInactivityDurationInitialDelay > 0 ?
+					Math.Min(localWireFormatInfo.MaxInactivityDurationInitialDelay,
+						     remoteWireFormatInfo.MaxInactivityDurationInitialDelay) :
+                    localWireFormatInfo.MaxInactivityDurationInitialDelay;
 
 				if(readCheckTime > 0)
 				{



Mime
View raw message