activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jgo...@apache.org
Subject svn commit: r776509 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src: main/csharp/Transport/ main/csharp/Transport/Failover/ test/csharp/
Date Wed, 20 May 2009 00:40:54 GMT
Author: jgomes
Date: Wed May 20 00:40:53 2009
New Revision: 776509

URL: http://svn.apache.org/viewvc?rev=776509&view=rev
Log:
Refactored transport factory to not cache sub-transport factories.  Removal of the cache solves race condition when multiple threads are creating multiple connection factories.  It also solves isolation problem where failover transport requires multiple transport factories to support separate transport settings per failover connection.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSConnectionFactoryTest.cs

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs?rev=776509&r1=776508&r2=776509&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs Wed May 20 00:40:53 2009
@@ -16,7 +16,6 @@
  */
 
 using System;
-
 using Apache.NMS.ActiveMQ.Commands;
 
 namespace Apache.NMS.ActiveMQ.Transport.Failover

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=776509&r1=776508&r2=776509&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs Wed May 20 00:40:53 2009
@@ -30,1011 +30,1011 @@
 
 namespace Apache.NMS.ActiveMQ.Transport.Failover
 {
-    /// <summary>
-    /// A Transport that is made reliable by being able to fail over to another
-    /// transport when a transport failure is detected.
-    /// </summary>
-    public class FailoverTransport : ICompositeTransport, IComparable
-    {
-
-        private static int _idCounter = 0;
-        private int _id;
-
-        private bool disposed;
-        private bool connected;
-        private List<Uri> uris = new List<Uri>();
-        private CommandHandler _commandHandler;
-        private ExceptionHandler _exceptionHandler;
-
-        private Mutex reconnectMutex = new Mutex();
-        private Mutex backupMutex = new Mutex();
-        private Mutex sleepMutex = new Mutex();
-        private ConnectionStateTracker stateTracker = new ConnectionStateTracker();
-        private Dictionary<int, Command> requestMap = new Dictionary<int, Command>();
-
-        private Uri connectedTransportURI;
-        private Uri failedConnectTransportURI;
-        private AtomicReference<ITransport> connectedTransport = new AtomicReference<ITransport>(null);
-        private TaskRunner reconnectTask = null;
-        private bool started;
-
-        private int _initialReconnectDelay = 10;
-        private int _maxReconnectDelay = 1000 * 30;
-        private int _backOffMultiplier = 2;
-        private bool _useExponentialBackOff = true;
-        private bool _randomize = true;
-        private bool initialized;
-        private int _maxReconnectAttempts;
-        private int connectFailures;
-        private int _reconnectDelay = 10;
-        private Exception connectionFailure;
-        private bool firstConnection = true;
-        //optionally always have a backup created
-        private bool _backup = false;
-        private List<BackupTransport> backups = new List<BackupTransport>();
-        private int _backupPoolSize = 1;
-        private bool _trackMessages = false;
-        private int _maxCacheSize = 256;
-        private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
-
-        public TimeSpan RequestTimeout
-        {
-            get { return requestTimeout; }
-            set { requestTimeout = value; }
-        }
-
-        private class FailoverTask : Task
-        {
-            private FailoverTransport parent;
-
-            public FailoverTask(FailoverTransport p)
-            {
-                parent = p;
-            }
-
-            public bool iterate()
-            {
-                bool result = false;
-                bool buildBackup = true;
-                bool doReconnect = !parent.disposed;
-                try
-                {
-                    parent.backupMutex.WaitOne();
-                    if(parent.ConnectedTransport == null && doReconnect)
-                    {
-                        result = parent.doReconnect();
-                        buildBackup = false;
-                    }
-                }
-                finally
-                {
-                    parent.backupMutex.ReleaseMutex();
-                }
-
-                if(buildBackup)
-                {
-                    parent.buildBackups();
-                }
-                else
-                {
-                    //build backups on the next iteration
-                    result = true;
-                    try
-                    {
-                        parent.reconnectTask.wakeup();
-                    }
-                    catch(ThreadInterruptedException)
-                    {
-                        Tracer.Debug("Reconnect task has been interrupted.");
-                    }
-                }
-                return result;
-            }
-        }
-
-        public FailoverTransport()
-        {
-            _id = _idCounter++;
-
-            stateTracker.TrackTransactions = true;
-        }
-
-        ~FailoverTransport()
-        {
-            Dispose(false);
-        }
-
-        public void onCommand(ITransport sender, Command command)
-        {
-            if(command != null)
-            {
-                if(command.IsResponse)
-                {
-                    Object oo = null;
-                    lock(requestMap)
-                    {
-                        int v = ((Response) command).CorrelationId;
-                        try
-                        {
-                            oo = requestMap[v];
-                            requestMap.Remove(v);
-                        }
-                        catch
-                        {
-                        }
-                    }
-
-                    Tracked t = oo as Tracked;
-                    if(t != null)
-                    {
-                        t.onResponses();
-                    }
-                }
-
-                if(!initialized)
-                {
-                    if(command.IsBrokerInfo)
-                    {
-                        BrokerInfo info = (BrokerInfo) command;
-                        BrokerInfo[] peers = info.PeerBrokerInfos;
-                        if(peers != null)
-                        {
-                            for(int i = 0; i < peers.Length; i++)
-                            {
-                                String brokerString = peers[i].BrokerURL;
-                                add(brokerString);
-                            }
-                        }
-
-                        initialized = true;
-                    }
-                }
-            }
-
-            this.Command(sender, command);
-        }
-
-        public void onException(ITransport sender, Exception error)
-        {
-            try
-            {
-                handleTransportFailure(error);
-            }
-            catch(Exception e)
-            {
-                e.GetType();
-                // What to do here?
-            }
-        }
-
-        public void disposedOnCommand(ITransport sender, Command c)
-        {
-        }
-
-        public void disposedOnException(ITransport sender, Exception e)
-        {
-        }
-
-        public void handleTransportFailure(Exception e)
-        {
-            ITransport transport = connectedTransport.GetAndSet(null);
-            if(transport != null)
-            {
-                transport.Command = new CommandHandler(disposedOnCommand);
-                transport.Exception = new ExceptionHandler(disposedOnException);
-                try
-                {
-                    transport.Stop();
-                }
-                catch(Exception ex)
-                {
-                    ex.GetType();	// Ignore errors but this lets us see the error during debugging
-                }
-
-                try
-                {
-                    reconnectMutex.WaitOne();
-                    bool reconnectOk = false;
-                    if(started)
-                    {
-                        Tracer.WarnFormat("Transport failed to {0}, attempting to automatically reconnect due to: {1}", ConnectedTransportURI, e.Message);
-                        reconnectOk = true;
-                    }
-
-                    initialized = false;
-                    failedConnectTransportURI = ConnectedTransportURI;
-                    ConnectedTransportURI = null;
-                    connected = false;
-                    if(reconnectOk)
-                    {
-                        reconnectTask.wakeup();
-                    }
-                }
-                finally
-                {
-                    reconnectMutex.ReleaseMutex();
-                }
-            }
-        }
-
-        public void Start()
-        {
-            try
-            {
-                reconnectMutex.WaitOne();
-                Tracer.Debug("Started.");
-                if(started)
-                {
-                    return;
-                }
-                started = true;
-                stateTracker.MaxCacheSize = MaxCacheSize;
-                stateTracker.TrackMessages = TrackMessages;
-                if(ConnectedTransport != null)
-                {
-                    stateTracker.DoRestore(ConnectedTransport);
-                }
-                else
-                {
-                    Reconnect();
-                }
-            }
-            finally
-            {
-                reconnectMutex.ReleaseMutex();
-            }
-        }
-
-        public virtual void Stop()
-        {
-            ITransport transportToStop = null;
-            try
-            {
-                reconnectMutex.WaitOne();
-                Tracer.Debug("Stopped.");
-                if(!started)
-                {
-                    return;
-                }
-
-                started = false;
-                disposed = true;
-                connected = false;
-                foreach(BackupTransport t in backups)
-                {
-                    t.Disposed = true;
-                }
-                backups.Clear();
-
-                if(ConnectedTransport != null)
-                {
-                    transportToStop = connectedTransport.GetAndSet(null);
-                }
-            }
-            finally
-            {
-                reconnectMutex.ReleaseMutex();
-            }
-
-            try
-            {
-                sleepMutex.WaitOne();
-            }
-            finally
-            {
-                sleepMutex.ReleaseMutex();
-            }
-
-            reconnectTask.shutdown();
-            if(transportToStop != null)
-            {
-                transportToStop.Stop();
-            }
-        }
-
-        public int InitialReconnectDelay
-        {
-            get { return _initialReconnectDelay; }
-            set { _initialReconnectDelay = value; }
-        }
-
-        public int MaxReconnectDelay
-        {
-            get { return _maxReconnectDelay; }
-            set { _maxReconnectDelay = value; }
-        }
-
-        public int ReconnectDelay
-        {
-            get { return _reconnectDelay; }
-            set { _reconnectDelay = value; }
-        }
-
-        public int ReconnectDelayExponent
-        {
-            get { return _backOffMultiplier; }
-            set { _backOffMultiplier = value; }
-        }
-
-        public ITransport ConnectedTransport
-        {
-            get { return connectedTransport.Value; }
-            set { connectedTransport.Value = value; }
-        }
-
-        public Uri ConnectedTransportURI
-        {
-            get { return connectedTransportURI; }
-            set { connectedTransportURI = value; }
-        }
-
-        public int MaxReconnectAttempts
-        {
-            get { return _maxReconnectAttempts; }
-            set { _maxReconnectAttempts = value; }
-        }
-
-        public bool Randomize
-        {
-            get { return _randomize; }
-            set { _randomize = value; }
-        }
-
-        public bool Backup
-        {
-            get { return _backup; }
-            set { _backup = value; }
-        }
-
-        public int BackupPoolSize
-        {
-            get { return _backupPoolSize; }
-            set { _backupPoolSize = value; }
-        }
-
-        public bool TrackMessages
-        {
-            get { return _trackMessages; }
-            set { _trackMessages = value; }
-        }
-
-        public int MaxCacheSize
-        {
-            get { return _maxCacheSize; }
-            set { _maxCacheSize = value; }
-        }
-
-        /// <summary>
-        /// </summary>
-        /// <param name="command"></param>
-        /// <returns>Returns true if the command is one sent when a connection is being closed.</returns>
-        private bool IsShutdownCommand(Command command)
-        {
-            return (command != null && (command.IsShutdownInfo || command is RemoveInfo));
-        }
-
-        public void Oneway(Command command)
-        {
-            Exception error = null;
-            try
-            {
-                reconnectMutex.WaitOne();
-
-                if(IsShutdownCommand(command) && ConnectedTransport == null)
-                {
-                    if(command.IsShutdownInfo)
-                    {
-                        // Skipping send of ShutdownInfo command when not connected.
-                        return;
-                    }
-
-                    if(command is RemoveInfo)
-                    {
-                        // Simulate response to RemoveInfo command
-                        Response response = new Response();
-                        response.CorrelationId = command.CommandId;
-                        onCommand(this, response);
-                        return;
-                    }
-                }
-                // Keep trying until the message is sent.
-                for(int i = 0; !disposed; i++)
-                {
-                    try
-                    {
-                        // Wait for transport to be connected.
-                        ITransport transport = ConnectedTransport;
-                        while(transport == null && !disposed
-                            && connectionFailure == null
-                            // && !Thread.CurrentThread.isInterrupted()
-                            )
-                        {
-                            Tracer.Info("Waiting for transport to reconnect.");
-                            try
-                            {
-                                // Release so that the reconnect task can run
-                                reconnectMutex.ReleaseMutex();
-                                try
-                                {
-                                    // Wait for something
-                                    Thread.Sleep(1000);
-                                }
-                                catch(ThreadInterruptedException e)
-                                {
-                                    Tracer.DebugFormat("Interupted: {0}", e.Message);
-                                }
-                            }
-                            finally
-                            {
-                                reconnectMutex.WaitOne();
-                            }
-
-                            transport = ConnectedTransport;
-                        }
-
-                        if(transport == null)
-                        {
-                            // Previous loop may have exited due to use being
-                            // disposed.
-                            if(disposed)
-                            {
-                                error = new IOException("Transport disposed.");
-                            }
-                            else if(connectionFailure != null)
-                            {
-                                error = connectionFailure;
-                            }
-                            else
-                            {
-                                error = new IOException("Unexpected failure.");
-                            }
-                            break;
-                        }
-
-                        // If it was a request and it was not being tracked by
-                        // the state tracker,
-                        // then hold it in the requestMap so that we can replay
-                        // it later.
-                        Tracked tracked = stateTracker.track(command);
-                        lock(requestMap)
-                        {
-                            if(tracked != null && tracked.WaitingForResponse)
-                            {
-                                requestMap.Add(command.CommandId, tracked);
-                            }
-                            else if(tracked == null && command.ResponseRequired)
-                            {
-                                requestMap.Add(command.CommandId, command);
-                            }
-                        }
-
-                        // Send the message.
-                        try
-                        {
-                            transport.Oneway(command);
-                            stateTracker.trackBack(command);
-                        }
-                        catch(Exception e)
-                        {
-
-                            // If the command was not tracked.. we will retry in
-                            // this method
-                            if(tracked == null)
-                            {
-
-                                // since we will retry in this method.. take it
-                                // out of the request
-                                // map so that it is not sent 2 times on
-                                // recovery
-                                if(command.ResponseRequired)
-                                {
-                                    requestMap.Remove(command.CommandId);
-                                }
-
-                                // Rethrow the exception so it will handled by
-                                // the outer catch
-                                throw e;
-                            }
-
-                        }
-
-                        return;
-
-                    }
-                    catch(Exception e)
-                    {
-                        Tracer.DebugFormat("Send Oneway attempt: {0} failed.", i);
-                        handleTransportFailure(e);
-                    }
-                }
-            }
-            finally
-            {
-                reconnectMutex.ReleaseMutex();
-            }
-
-            if(!disposed)
-            {
-                if(error != null)
-                {
-                    throw error;
-                }
-            }
-        }
-
-        /*
-        public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) {
-        throw new AssertionError("Unsupported Method");
-        }
-        */
-
-        public Object Request(Object command)
-        {
-            throw new ApplicationException("FailoverTransport does not support Request(Object)");
-        }
-
-        public Object Request(Object command, int timeout)
-        {
-            throw new ApplicationException("FailoverTransport does not support Request(Object, Int)");
-        }
-
-        public void add(Uri[] u)
-        {
-            lock(uris)
-            {
-                for(int i = 0; i < u.Length; i++)
-                {
-                    if(!uris.Contains(u[i]))
-                    {
-                        uris.Add(u[i]);
-                    }
-                }
-            }
-
-            Reconnect();
-        }
-
-        public void remove(Uri[] u)
-        {
-            lock(uris)
-            {
-                for(int i = 0; i < u.Length; i++)
-                {
-                    uris.Remove(u[i]);
-                }
-            }
-
-            Reconnect();
-        }
-
-        public void add(String u)
-        {
-            try
-            {
-                Uri uri = new Uri(u);
-                lock(uris)
-                {
-                    if(!uris.Contains(uri))
-                    {
-                        uris.Add(uri);
-                    }
-                }
-
-                Reconnect();
-            }
-            catch(Exception e)
-            {
-                Tracer.ErrorFormat("Failed to parse URI: {0} because {1}", u, e.Message);
-            }
-        }
-
-        public void Reconnect()
-        {
-            try
-            {
-                reconnectMutex.WaitOne();
-
-                if(started)
-                {
-                    if(reconnectTask == null)
-                    {
-                        Tracer.Debug("Creating reconnect task");
-                        reconnectTask = DefaultThreadPools.DefaultTaskRunnerFactory.CreateTaskRunner(new FailoverTask(this),
-                                            "ActiveMQ Failover Worker: " + this.GetHashCode().ToString());
-                    }
-
-                    Tracer.Debug("Waking up reconnect task");
-                    try
-                    {
-                        reconnectTask.wakeup();
-                    }
-                    catch(ThreadInterruptedException)
-                    {
-                    }
-                }
-                else
-                {
-                    Tracer.Debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
-                }
-            }
-            finally
-            {
-                reconnectMutex.ReleaseMutex();
-            }
-        }
-
-        private List<Uri> ConnectList
-        {
-            get
-            {
-                List<Uri> l = new List<Uri>(uris);
-                bool removed = false;
-                if(failedConnectTransportURI != null)
-                {
-                    removed = l.Remove(failedConnectTransportURI);
-                }
-
-                if(Randomize)
-                {
-                    // Randomly, reorder the list by random swapping
-                    Random r = new Random(DateTime.Now.Millisecond);
-                    for(int i = 0; i < l.Count; i++)
-                    {
-                        int p = r.Next(l.Count);
-                        Uri t = l[p];
-                        l[p] = l[i];
-                        l[i] = t;
-                    }
-                }
-
-                if(removed)
-                {
-                    l.Add(failedConnectTransportURI);
-                }
-
-                return l;
-            }
-        }
-
-        protected void restoreTransport(ITransport t)
-        {
-            t.Start();
-            //send information to the broker - informing it we are an ft client
-            ConnectionControl cc = new ConnectionControl();
-            cc.FaultTolerant = true;
-            t.Oneway(cc);
-            stateTracker.DoRestore(t);
-            Dictionary<int, Command> tmpMap = null;
-            lock(requestMap)
-            {
-                tmpMap = new Dictionary<int, Command>(requestMap);
-            }
-
-            foreach(Command command in tmpMap.Values)
-            {
-                t.Oneway(command);
-            }
-        }
-
-        public bool UseExponentialBackOff
-        {
-            get { return _useExponentialBackOff; }
-            set { _useExponentialBackOff = value; }
-        }
-
-        public override String ToString()
-        {
-            return ConnectedTransportURI == null ? "unconnected" : ConnectedTransportURI.ToString();
-        }
-
-        public String RemoteAddress
-        {
-            get
-            {
-                FailoverTransport transport = ConnectedTransport as FailoverTransport;
-                if(transport != null)
-                {
-                    return transport.RemoteAddress;
-                }
-                return null;
-            }
-        }
-
-        public bool IsFaultTolerant
-        {
-            get { return true; }
-        }
-
-        bool doReconnect()
-        {
-            Exception failure = null;
-            try
-            {
-                reconnectMutex.WaitOne();
-
-                if(disposed || connectionFailure != null)
-                {
-                }
-
-                if(ConnectedTransport != null || disposed || connectionFailure != null)
-                {
-                    return false;
-                }
-                else
-                {
-                    List<Uri> connectList = ConnectList;
-                    if(connectList.Count == 0)
-                    {
-                        failure = new IOException("No uris available to connect to.");
-                    }
-                    else
-                    {
-                        if(!UseExponentialBackOff)
-                        {
-                            ReconnectDelay = InitialReconnectDelay;
-                        }
-                        try
-                        {
-                            backupMutex.WaitOne();
-                            if(Backup && backups.Count != 0)
-                            {
-                                BackupTransport bt = backups[0];
-                                backups.RemoveAt(0);
-                                ITransport t = bt.Transport;
-                                Uri uri = bt.Uri;
-                                t.Command = new CommandHandler(onCommand);
-                                t.Exception = new ExceptionHandler(onException);
-                                try
-                                {
-                                    if(started)
-                                    {
-                                        restoreTransport(t);
-                                    }
-                                    ReconnectDelay = InitialReconnectDelay;
-                                    failedConnectTransportURI = null;
-                                    ConnectedTransportURI = uri;
-                                    ConnectedTransport = t;
-                                    connectFailures = 0;
-                                    Tracer.InfoFormat("Successfully reconnected to backup {0}", uri);
-                                    return false;
-                                }
-                                catch(Exception e)
-                                {
-                                    e.GetType();
-                                    Tracer.Debug("Backup transport failed");
-                                }
-                            }
-                        }
-                        finally
-                        {
-                            backupMutex.ReleaseMutex();
-                        }
-
-                        foreach(Uri uri in connectList)
-                        {
-                            if(ConnectedTransport != null || disposed)
-                            {
-                                break;
-                            }
-
-                            try
-                            {
-                                Tracer.DebugFormat("Attempting connect to: {0}", uri);
-                                ITransport t = TransportFactory.CompositeConnect(uri);
-                                t.Command = new CommandHandler(onCommand);
-                                t.Exception = new ExceptionHandler(onException);
-                                t.Start();
-
-                                if(started)
-                                {
-                                    restoreTransport(t);
-                                }
-
-                                Tracer.Debug("Connection established");
-                                ReconnectDelay = InitialReconnectDelay;
-                                ConnectedTransportURI = uri;
-                                ConnectedTransport = t;
-                                connectFailures = 0;
-
-                                if(firstConnection)
-                                {
-                                    firstConnection = false;
-                                    Tracer.InfoFormat("Successfully connected to: {0}", uri);
-                                }
-                                else
-                                {
-                                    Tracer.InfoFormat("Successfully reconnected to: {0}", uri);
-                                }
-
-                                connected = true;
-                                return false;
-                            }
-                            catch(Exception e)
-                            {
-                                failure = e;
-                                Tracer.DebugFormat("Connect fail to: {0}, reason: {1}", uri, e.Message);
-                            }
-                        }
-                    }
-                }
-
-                if(MaxReconnectAttempts > 0 && ++connectFailures >= MaxReconnectAttempts)
-                {
-                    Tracer.ErrorFormat("Failed to connect to transport after: {0} attempt(s)", connectFailures);
-                    connectionFailure = failure;
-                    onException(this, connectionFailure);
-                    return false;
-                }
-            }
-            finally
-            {
-                reconnectMutex.ReleaseMutex();
-            }
-
-            if(!disposed)
-            {
-
-                Tracer.DebugFormat("Waiting {0}ms before attempting connection.", ReconnectDelay);
-                try
-                {
-                    sleepMutex.WaitOne();
-                    try
-                    {
-                        Thread.Sleep(ReconnectDelay);
-                    }
-                    catch(ThreadInterruptedException)
-                    {
-                    }
-                }
-                finally
-                {
-                    sleepMutex.ReleaseMutex();
-                }
-
-                if(UseExponentialBackOff)
-                {
-                    // Exponential increment of reconnect delay.
-                    ReconnectDelay *= ReconnectDelayExponent;
-                    if(ReconnectDelay > MaxReconnectDelay)
-                    {
-                        ReconnectDelay = MaxReconnectDelay;
-                    }
-                }
-            }
-            return !disposed;
-        }
-
-
-        bool buildBackups()
-        {
-            try
-            {
-                backupMutex.WaitOne();
-                if(!disposed && Backup && backups.Count < BackupPoolSize)
-                {
-                    List<Uri> connectList = ConnectList;
-                    foreach(BackupTransport bt in backups)
-                    {
-                        if(bt.Disposed)
-                        {
-                            backups.Remove(bt);
-                        }
-                    }
-
-                    foreach(Uri uri in connectList)
-                    {
-                        if(ConnectedTransportURI != null && !ConnectedTransportURI.Equals(uri))
-                        {
-                            try
-                            {
-                                BackupTransport bt = new BackupTransport(this);
-                                bt.Uri = uri;
-                                if(!backups.Contains(bt))
-                                {
-                                    ITransport t = TransportFactory.CompositeConnect(uri);
-                                    t.Command = new CommandHandler(bt.onCommand);
-                                    t.Exception = new ExceptionHandler(bt.onException);
-                                    t.Start();
-                                    bt.Transport = t;
-                                    backups.Add(bt);
-                                }
-                            }
-                            catch(Exception e)
-                            {
-                                Tracer.DebugFormat("Failed to build backup: {0}", e.Message);
-                            }
-                        }
-
-                        if(backups.Count < BackupPoolSize)
-                        {
-                            break;
-                        }
-                    }
-                }
-            }
-            finally
-            {
-                backupMutex.ReleaseMutex();
-            }
-
-            return false;
-        }
-
-        public bool IsDisposed
-        {
-            get { return disposed; }
-        }
-
-        public bool Connected
-        {
-            get { return connected; }
-        }
-
-        public void Reconnect(Uri uri)
-        {
-            add(new Uri[] { uri });
-        }
-
-        public FutureResponse AsyncRequest(Command command)
-        {
-            throw new ApplicationException("FailoverTransport does not implement AsyncRequest(Command)");
-        }
-
-        public Response Request(Command command)
-        {
-            throw new ApplicationException("FailoverTransport does not implement Request(Command)");
-        }
-
-        public Response Request(Command command, TimeSpan ts)
-        {
-            throw new ApplicationException("FailoverTransport does not implement Request(Command, TimeSpan)");
-        }
-
-        public CommandHandler Command
-        {
-            get { return _commandHandler; }
-            set { _commandHandler = value; }
-        }
-
-        public ExceptionHandler Exception
-        {
-            get { return _exceptionHandler; }
-            set { _exceptionHandler = value; }
-        }
-
-        public bool IsStarted
-        {
-            get { return started; }
-        }
-
-        public void Dispose()
-        {
-            Dispose(true);
-            GC.SuppressFinalize(this);
-        }
-
-        public void Dispose(bool disposing)
-        {
-            if(disposing)
-            {
-                // get rid of unmanaged stuff
-            }
-
-            disposed = true;
-        }
-
-        public int CompareTo(Object o)
-        {
-            if(o is FailoverTransport)
-            {
-                FailoverTransport oo = o as FailoverTransport;
-
-                return this._id - oo._id;
-            }
-            else
-            {
-                throw new ArgumentException();
-            }
-        }
-    }
+	/// <summary>
+	/// A Transport that is made reliable by being able to fail over to another
+	/// transport when a transport failure is detected.
+	/// </summary>
+	public class FailoverTransport : ICompositeTransport, IComparable
+	{
+
+		private static int _idCounter = 0;
+		private int _id;
+
+		private bool disposed;
+		private bool connected;
+		private List<Uri> uris = new List<Uri>();
+		private CommandHandler _commandHandler;
+		private ExceptionHandler _exceptionHandler;
+
+		private Mutex reconnectMutex = new Mutex();
+		private Mutex backupMutex = new Mutex();
+		private Mutex sleepMutex = new Mutex();
+		private ConnectionStateTracker stateTracker = new ConnectionStateTracker();
+		private Dictionary<int, Command> requestMap = new Dictionary<int, Command>();
+
+		private Uri connectedTransportURI;
+		private Uri failedConnectTransportURI;
+		private AtomicReference<ITransport> connectedTransport = new AtomicReference<ITransport>(null);
+		private TaskRunner reconnectTask = null;
+		private bool started;
+
+		private int _initialReconnectDelay = 10;
+		private int _maxReconnectDelay = 1000 * 30;
+		private int _backOffMultiplier = 2;
+		private bool _useExponentialBackOff = true;
+		private bool _randomize = true;
+		private bool initialized;
+		private int _maxReconnectAttempts;
+		private int connectFailures;
+		private int _reconnectDelay = 10;
+		private Exception connectionFailure;
+		private bool firstConnection = true;
+		//optionally always have a backup created
+		private bool _backup = false;
+		private List<BackupTransport> backups = new List<BackupTransport>();
+		private int _backupPoolSize = 1;
+		private bool _trackMessages = false;
+		private int _maxCacheSize = 256;
+		private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout;
+
+		public TimeSpan RequestTimeout
+		{
+			get { return requestTimeout; }
+			set { requestTimeout = value; }
+		}
+
+		private class FailoverTask : Task
+		{
+			private FailoverTransport parent;
+
+			public FailoverTask(FailoverTransport p)
+			{
+				parent = p;
+			}
+
+			public bool iterate()
+			{
+				bool result = false;
+				bool buildBackup = true;
+				bool doReconnect = !parent.disposed;
+				try
+				{
+					parent.backupMutex.WaitOne();
+					if(parent.ConnectedTransport == null && doReconnect)
+					{
+						result = parent.doReconnect();
+						buildBackup = false;
+					}
+				}
+				finally
+				{
+					parent.backupMutex.ReleaseMutex();
+				}
+
+				if(buildBackup)
+				{
+					parent.buildBackups();
+				}
+				else
+				{
+					//build backups on the next iteration
+					result = true;
+					try
+					{
+						parent.reconnectTask.wakeup();
+					}
+					catch(ThreadInterruptedException)
+					{
+						Tracer.Debug("Reconnect task has been interrupted.");
+					}
+				}
+				return result;
+			}
+		}
+
+		public FailoverTransport()
+		{
+			_id = _idCounter++;
+
+			stateTracker.TrackTransactions = true;
+		}
+
+		~FailoverTransport()
+		{
+			Dispose(false);
+		}
+
+		public void onCommand(ITransport sender, Command command)
+		{
+			if(command != null)
+			{
+				if(command.IsResponse)
+				{
+					Object oo = null;
+					lock(requestMap)
+					{
+						int v = ((Response) command).CorrelationId;
+						try
+						{
+							oo = requestMap[v];
+							requestMap.Remove(v);
+						}
+						catch
+						{
+						}
+					}
+
+					Tracked t = oo as Tracked;
+					if(t != null)
+					{
+						t.onResponses();
+					}
+				}
+
+				if(!initialized)
+				{
+					if(command.IsBrokerInfo)
+					{
+						BrokerInfo info = (BrokerInfo) command;
+						BrokerInfo[] peers = info.PeerBrokerInfos;
+						if(peers != null)
+						{
+							for(int i = 0; i < peers.Length; i++)
+							{
+								String brokerString = peers[i].BrokerURL;
+								add(brokerString);
+							}
+						}
+
+						initialized = true;
+					}
+				}
+			}
+
+			this.Command(sender, command);
+		}
+
+		public void onException(ITransport sender, Exception error)
+		{
+			try
+			{
+				handleTransportFailure(error);
+			}
+			catch(Exception e)
+			{
+				e.GetType();
+				// What to do here?
+			}
+		}
+
+		public void disposedOnCommand(ITransport sender, Command c)
+		{
+		}
+
+		public void disposedOnException(ITransport sender, Exception e)
+		{
+		}
+
+		public void handleTransportFailure(Exception e)
+		{
+			ITransport transport = connectedTransport.GetAndSet(null);
+			if(transport != null)
+			{
+				transport.Command = new CommandHandler(disposedOnCommand);
+				transport.Exception = new ExceptionHandler(disposedOnException);
+				try
+				{
+					transport.Stop();
+				}
+				catch(Exception ex)
+				{
+					ex.GetType();	// Ignore errors but this lets us see the error during debugging
+				}
+
+				try
+				{
+					reconnectMutex.WaitOne();
+					bool reconnectOk = false;
+					if(started)
+					{
+						Tracer.WarnFormat("Transport failed to {0}, attempting to automatically reconnect due to: {1}", ConnectedTransportURI.ToString(), e.Message);
+						reconnectOk = true;
+					}
+
+					initialized = false;
+					failedConnectTransportURI = ConnectedTransportURI;
+					ConnectedTransportURI = null;
+					connected = false;
+					if(reconnectOk)
+					{
+						reconnectTask.wakeup();
+					}
+				}
+				finally
+				{
+					reconnectMutex.ReleaseMutex();
+				}
+			}
+		}
+
+		public void Start()
+		{
+			try
+			{
+				reconnectMutex.WaitOne();
+				Tracer.Debug("Started.");
+				if(started)
+				{
+					return;
+				}
+				started = true;
+				stateTracker.MaxCacheSize = MaxCacheSize;
+				stateTracker.TrackMessages = TrackMessages;
+				if(ConnectedTransport != null)
+				{
+					stateTracker.DoRestore(ConnectedTransport);
+				}
+				else
+				{
+					Reconnect();
+				}
+			}
+			finally
+			{
+				reconnectMutex.ReleaseMutex();
+			}
+		}
+
+		public virtual void Stop()
+		{
+			ITransport transportToStop = null;
+			try
+			{
+				reconnectMutex.WaitOne();
+				Tracer.Debug("Stopped.");
+				if(!started)
+				{
+					return;
+				}
+
+				started = false;
+				disposed = true;
+				connected = false;
+				foreach(BackupTransport t in backups)
+				{
+					t.Disposed = true;
+				}
+				backups.Clear();
+
+				if(ConnectedTransport != null)
+				{
+					transportToStop = connectedTransport.GetAndSet(null);
+				}
+			}
+			finally
+			{
+				reconnectMutex.ReleaseMutex();
+			}
+
+			try
+			{
+				sleepMutex.WaitOne();
+			}
+			finally
+			{
+				sleepMutex.ReleaseMutex();
+			}
+
+			reconnectTask.shutdown();
+			if(transportToStop != null)
+			{
+				transportToStop.Stop();
+			}
+		}
+
+		public int InitialReconnectDelay
+		{
+			get { return _initialReconnectDelay; }
+			set { _initialReconnectDelay = value; }
+		}
+
+		public int MaxReconnectDelay
+		{
+			get { return _maxReconnectDelay; }
+			set { _maxReconnectDelay = value; }
+		}
+
+		public int ReconnectDelay
+		{
+			get { return _reconnectDelay; }
+			set { _reconnectDelay = value; }
+		}
+
+		public int ReconnectDelayExponent
+		{
+			get { return _backOffMultiplier; }
+			set { _backOffMultiplier = value; }
+		}
+
+		public ITransport ConnectedTransport
+		{
+			get { return connectedTransport.Value; }
+			set { connectedTransport.Value = value; }
+		}
+
+		public Uri ConnectedTransportURI
+		{
+			get { return connectedTransportURI; }
+			set { connectedTransportURI = value; }
+		}
+
+		public int MaxReconnectAttempts
+		{
+			get { return _maxReconnectAttempts; }
+			set { _maxReconnectAttempts = value; }
+		}
+
+		public bool Randomize
+		{
+			get { return _randomize; }
+			set { _randomize = value; }
+		}
+
+		public bool Backup
+		{
+			get { return _backup; }
+			set { _backup = value; }
+		}
+
+		public int BackupPoolSize
+		{
+			get { return _backupPoolSize; }
+			set { _backupPoolSize = value; }
+		}
+
+		public bool TrackMessages
+		{
+			get { return _trackMessages; }
+			set { _trackMessages = value; }
+		}
+
+		public int MaxCacheSize
+		{
+			get { return _maxCacheSize; }
+			set { _maxCacheSize = value; }
+		}
+
+		/// <summary>
+		/// </summary>
+		/// <param name="command"></param>
+		/// <returns>Returns true if the command is one sent when a connection is being closed.</returns>
+		private bool IsShutdownCommand(Command command)
+		{
+			return (command != null && (command.IsShutdownInfo || command is RemoveInfo));
+		}
+
+		public void Oneway(Command command)
+		{
+			Exception error = null;
+			try
+			{
+				reconnectMutex.WaitOne();
+
+				if(IsShutdownCommand(command) && ConnectedTransport == null)
+				{
+					if(command.IsShutdownInfo)
+					{
+						// Skipping send of ShutdownInfo command when not connected.
+						return;
+					}
+
+					if(command is RemoveInfo)
+					{
+						// Simulate response to RemoveInfo command
+						Response response = new Response();
+						response.CorrelationId = command.CommandId;
+						onCommand(this, response);
+						return;
+					}
+				}
+				// Keep trying until the message is sent.
+				for(int i = 0; !disposed; i++)
+				{
+					try
+					{
+						// Wait for transport to be connected.
+						ITransport transport = ConnectedTransport;
+						while(transport == null && !disposed
+							&& connectionFailure == null
+							// && !Thread.CurrentThread.isInterrupted()
+							)
+						{
+							Tracer.Info("Waiting for transport to reconnect.");
+							try
+							{
+								// Release so that the reconnect task can run
+								reconnectMutex.ReleaseMutex();
+								try
+								{
+									// Wait for something
+									Thread.Sleep(1000);
+								}
+								catch(ThreadInterruptedException e)
+								{
+									Tracer.DebugFormat("Interupted: {0}", e.Message);
+								}
+							}
+							finally
+							{
+								reconnectMutex.WaitOne();
+							}
+
+							transport = ConnectedTransport;
+						}
+
+						if(transport == null)
+						{
+							// Previous loop may have exited due to use being
+							// disposed.
+							if(disposed)
+							{
+								error = new IOException("Transport disposed.");
+							}
+							else if(connectionFailure != null)
+							{
+								error = connectionFailure;
+							}
+							else
+							{
+								error = new IOException("Unexpected failure.");
+							}
+							break;
+						}
+
+						// If it was a request and it was not being tracked by
+						// the state tracker,
+						// then hold it in the requestMap so that we can replay
+						// it later.
+						Tracked tracked = stateTracker.track(command);
+						lock(requestMap)
+						{
+							if(tracked != null && tracked.WaitingForResponse)
+							{
+								requestMap.Add(command.CommandId, tracked);
+							}
+							else if(tracked == null && command.ResponseRequired)
+							{
+								requestMap.Add(command.CommandId, command);
+							}
+						}
+
+						// Send the message.
+						try
+						{
+							transport.Oneway(command);
+							stateTracker.trackBack(command);
+						}
+						catch(Exception e)
+						{
+
+							// If the command was not tracked.. we will retry in
+							// this method
+							if(tracked == null)
+							{
+
+								// since we will retry in this method.. take it
+								// out of the request
+								// map so that it is not sent 2 times on
+								// recovery
+								if(command.ResponseRequired)
+								{
+									requestMap.Remove(command.CommandId);
+								}
+
+								// Rethrow the exception so it will handled by
+								// the outer catch
+								throw e;
+							}
+
+						}
+
+						return;
+
+					}
+					catch(Exception e)
+					{
+						Tracer.DebugFormat("Send Oneway attempt: {0} failed.", i);
+						handleTransportFailure(e);
+					}
+				}
+			}
+			finally
+			{
+				reconnectMutex.ReleaseMutex();
+			}
+
+			if(!disposed)
+			{
+				if(error != null)
+				{
+					throw error;
+				}
+			}
+		}
+
+		/*
+		public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) {
+		throw new AssertionError("Unsupported Method");
+		}
+		*/
+
+		public Object Request(Object command)
+		{
+			throw new ApplicationException("FailoverTransport does not support Request(Object)");
+		}
+
+		public Object Request(Object command, int timeout)
+		{
+			throw new ApplicationException("FailoverTransport does not support Request(Object, Int)");
+		}
+
+		public void add(Uri[] u)
+		{
+			lock(uris)
+			{
+				for(int i = 0; i < u.Length; i++)
+				{
+					if(!uris.Contains(u[i]))
+					{
+						uris.Add(u[i]);
+					}
+				}
+			}
+
+			Reconnect();
+		}
+
+		public void remove(Uri[] u)
+		{
+			lock(uris)
+			{
+				for(int i = 0; i < u.Length; i++)
+				{
+					uris.Remove(u[i]);
+				}
+			}
+
+			Reconnect();
+		}
+
+		public void add(String u)
+		{
+			try
+			{
+				Uri uri = new Uri(u);
+				lock(uris)
+				{
+					if(!uris.Contains(uri))
+					{
+						uris.Add(uri);
+					}
+				}
+
+				Reconnect();
+			}
+			catch(Exception e)
+			{
+				Tracer.ErrorFormat("Failed to parse URI '{0}': {1}", u, e.Message);
+			}
+		}
+
+		public void Reconnect()
+		{
+			try
+			{
+				reconnectMutex.WaitOne();
+
+				if(started)
+				{
+					if(reconnectTask == null)
+					{
+						Tracer.Debug("Creating reconnect task");
+						reconnectTask = DefaultThreadPools.DefaultTaskRunnerFactory.CreateTaskRunner(new FailoverTask(this),
+											"ActiveMQ Failover Worker: " + this.GetHashCode().ToString());
+					}
+
+					Tracer.Debug("Waking up reconnect task");
+					try
+					{
+						reconnectTask.wakeup();
+					}
+					catch(ThreadInterruptedException)
+					{
+					}
+				}
+				else
+				{
+					Tracer.Debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
+				}
+			}
+			finally
+			{
+				reconnectMutex.ReleaseMutex();
+			}
+		}
+
+		private List<Uri> ConnectList
+		{
+			get
+			{
+				List<Uri> l = new List<Uri>(uris);
+				bool removed = false;
+				if(failedConnectTransportURI != null)
+				{
+					removed = l.Remove(failedConnectTransportURI);
+				}
+
+				if(Randomize)
+				{
+					// Randomly, reorder the list by random swapping
+					Random r = new Random(DateTime.Now.Millisecond);
+					for(int i = 0; i < l.Count; i++)
+					{
+						int p = r.Next(l.Count);
+						Uri t = l[p];
+						l[p] = l[i];
+						l[i] = t;
+					}
+				}
+
+				if(removed)
+				{
+					l.Add(failedConnectTransportURI);
+				}
+
+				return l;
+			}
+		}
+
+		protected void restoreTransport(ITransport t)
+		{
+			t.Start();
+			//send information to the broker - informing it we are an ft client
+			ConnectionControl cc = new ConnectionControl();
+			cc.FaultTolerant = true;
+			t.Oneway(cc);
+			stateTracker.DoRestore(t);
+			Dictionary<int, Command> tmpMap = null;
+			lock(requestMap)
+			{
+				tmpMap = new Dictionary<int, Command>(requestMap);
+			}
+
+			foreach(Command command in tmpMap.Values)
+			{
+				t.Oneway(command);
+			}
+		}
+
+		public bool UseExponentialBackOff
+		{
+			get { return _useExponentialBackOff; }
+			set { _useExponentialBackOff = value; }
+		}
+
+		public override String ToString()
+		{
+			return ConnectedTransportURI == null ? "unconnected" : ConnectedTransportURI.ToString();
+		}
+
+		public String RemoteAddress
+		{
+			get
+			{
+				FailoverTransport transport = ConnectedTransport as FailoverTransport;
+				if(transport != null)
+				{
+					return transport.RemoteAddress;
+				}
+				return null;
+			}
+		}
+
+		public bool IsFaultTolerant
+		{
+			get { return true; }
+		}
+
+		bool doReconnect()
+		{
+			Exception failure = null;
+			try
+			{
+				reconnectMutex.WaitOne();
+
+				if(disposed || connectionFailure != null)
+				{
+				}
+
+				if(ConnectedTransport != null || disposed || connectionFailure != null)
+				{
+					return false;
+				}
+				else
+				{
+					List<Uri> connectList = ConnectList;
+					if(connectList.Count == 0)
+					{
+						failure = new IOException("No URIs available for connection.");
+					}
+					else
+					{
+						if(!UseExponentialBackOff)
+						{
+							ReconnectDelay = InitialReconnectDelay;
+						}
+						try
+						{
+							backupMutex.WaitOne();
+							if(Backup && backups.Count != 0)
+							{
+								BackupTransport bt = backups[0];
+								backups.RemoveAt(0);
+								ITransport t = bt.Transport;
+								Uri uri = bt.Uri;
+								t.Command = new CommandHandler(onCommand);
+								t.Exception = new ExceptionHandler(onException);
+								try
+								{
+									if(started)
+									{
+										restoreTransport(t);
+									}
+									ReconnectDelay = InitialReconnectDelay;
+									failedConnectTransportURI = null;
+									ConnectedTransportURI = uri;
+									ConnectedTransport = t;
+									connectFailures = 0;
+									Tracer.InfoFormat("Successfully reconnected to backup {0}", uri.ToString());
+									return false;
+								}
+								catch(Exception e)
+								{
+									e.GetType();
+									Tracer.Debug("Backup transport failed");
+								}
+							}
+						}
+						finally
+						{
+							backupMutex.ReleaseMutex();
+						}
+
+						foreach(Uri uri in connectList)
+						{
+							if(ConnectedTransport != null || disposed)
+							{
+								break;
+							}
+
+							try
+							{
+								Tracer.DebugFormat("Attempting connect to: {0}", uri.ToString());
+								ITransport t = TransportFactory.CompositeConnect(uri);
+								t.Command = new CommandHandler(onCommand);
+								t.Exception = new ExceptionHandler(onException);
+								t.Start();
+
+								if(started)
+								{
+									restoreTransport(t);
+								}
+
+								Tracer.Debug("Connection established");
+								ReconnectDelay = InitialReconnectDelay;
+								ConnectedTransportURI = uri;
+								ConnectedTransport = t;
+								connectFailures = 0;
+
+								if(firstConnection)
+								{
+									firstConnection = false;
+									Tracer.InfoFormat("Successfully connected to: {0}", uri.ToString());
+								}
+								else
+								{
+									Tracer.InfoFormat("Successfully reconnected to: {0}", uri.ToString());
+								}
+
+								connected = true;
+								return false;
+							}
+							catch(Exception e)
+							{
+								failure = e;
+								Tracer.DebugFormat("Connect fail to '{0}': {1}", uri.ToString(), e.Message);
+							}
+						}
+					}
+				}
+
+				if(MaxReconnectAttempts > 0 && ++connectFailures >= MaxReconnectAttempts)
+				{
+					Tracer.ErrorFormat("Failed to connect to transport after {0} attempt(s)", connectFailures);
+					connectionFailure = failure;
+					onException(this, connectionFailure);
+					return false;
+				}
+			}
+			finally
+			{
+				reconnectMutex.ReleaseMutex();
+			}
+
+			if(!disposed)
+			{
+
+				Tracer.DebugFormat("Waiting {0}ms before attempting connection.", ReconnectDelay);
+				try
+				{
+					sleepMutex.WaitOne();
+					try
+					{
+						Thread.Sleep(ReconnectDelay);
+					}
+					catch(ThreadInterruptedException)
+					{
+					}
+				}
+				finally
+				{
+					sleepMutex.ReleaseMutex();
+				}
+
+				if(UseExponentialBackOff)
+				{
+					// Exponential increment of reconnect delay.
+					ReconnectDelay *= ReconnectDelayExponent;
+					if(ReconnectDelay > MaxReconnectDelay)
+					{
+						ReconnectDelay = MaxReconnectDelay;
+					}
+				}
+			}
+			return !disposed;
+		}
+
+
+		bool buildBackups()
+		{
+			try
+			{
+				backupMutex.WaitOne();
+				if(!disposed && Backup && backups.Count < BackupPoolSize)
+				{
+					List<Uri> connectList = ConnectList;
+					foreach(BackupTransport bt in backups)
+					{
+						if(bt.Disposed)
+						{
+							backups.Remove(bt);
+						}
+					}
+
+					foreach(Uri uri in connectList)
+					{
+						if(ConnectedTransportURI != null && !ConnectedTransportURI.Equals(uri))
+						{
+							try
+							{
+								BackupTransport bt = new BackupTransport(this);
+								bt.Uri = uri;
+								if(!backups.Contains(bt))
+								{
+									ITransport t = TransportFactory.CompositeConnect(uri);
+									t.Command = new CommandHandler(bt.onCommand);
+									t.Exception = new ExceptionHandler(bt.onException);
+									t.Start();
+									bt.Transport = t;
+									backups.Add(bt);
+								}
+							}
+							catch(Exception e)
+							{
+								Tracer.DebugFormat("Failed to build backup: {0}", e.Message);
+							}
+						}
+
+						if(backups.Count < BackupPoolSize)
+						{
+							break;
+						}
+					}
+				}
+			}
+			finally
+			{
+				backupMutex.ReleaseMutex();
+			}
+
+			return false;
+		}
+
+		public bool IsDisposed
+		{
+			get { return disposed; }
+		}
+
+		public bool Connected
+		{
+			get { return connected; }
+		}
+
+		public void Reconnect(Uri uri)
+		{
+			add(new Uri[] { uri });
+		}
+
+		public FutureResponse AsyncRequest(Command command)
+		{
+			throw new ApplicationException("FailoverTransport does not implement AsyncRequest(Command)");
+		}
+
+		public Response Request(Command command)
+		{
+			throw new ApplicationException("FailoverTransport does not implement Request(Command)");
+		}
+
+		public Response Request(Command command, TimeSpan ts)
+		{
+			throw new ApplicationException("FailoverTransport does not implement Request(Command, TimeSpan)");
+		}
+
+		public CommandHandler Command
+		{
+			get { return _commandHandler; }
+			set { _commandHandler = value; }
+		}
+
+		public ExceptionHandler Exception
+		{
+			get { return _exceptionHandler; }
+			set { _exceptionHandler = value; }
+		}
+
+		public bool IsStarted
+		{
+			get { return started; }
+		}
+
+		public void Dispose()
+		{
+			Dispose(true);
+			GC.SuppressFinalize(this);
+		}
+
+		public void Dispose(bool disposing)
+		{
+			if(disposing)
+			{
+				// get rid of unmanaged stuff
+			}
+
+			disposed = true;
+		}
+
+		public int CompareTo(Object o)
+		{
+			if(o is FailoverTransport)
+			{
+				FailoverTransport oo = o as FailoverTransport;
+
+				return this._id - oo._id;
+			}
+			else
+			{
+				throw new ArgumentException();
+			}
+		}
+	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs?rev=776509&r1=776508&r2=776509&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs Wed May 20 00:40:53 2009
@@ -17,26 +17,20 @@
 
 using System;
 using System.Collections.Specialized;
-
 using Apache.NMS.Util;
 
 namespace Apache.NMS.ActiveMQ.Transport.Failover
 {
 	public class FailoverTransportFactory : ITransportFactory
 	{
-		private ITransport wrapTransport(ITransport transport)
+		private ITransport doConnect(Uri location)
 		{
+			ITransport transport = CreateTransport(URISupport.parseComposite(location));
 			transport = new MutexTransport(transport);
 			transport = new ResponseCorrelator(transport);
 			return transport;
 		}
 
-		private ITransport doConnect(Uri location)
-		{
-			ITransport transport = CreateTransport(URISupport.parseComposite(location));
-			return wrapTransport(transport);
-		}
-
 		public ITransport CompositeConnect(Uri location)
 		{
 			return CreateTransport(URISupport.parseComposite(location));

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs?rev=776509&r1=776508&r2=776509&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFactory.cs Wed May 20 00:40:53 2009
@@ -26,15 +26,9 @@
 {
 	public class TransportFactory
 	{
-		private static readonly Dictionary<String, ITransportFactory> factoryCache;
 		public static event ExceptionListener OnException;
 
-		static TransportFactory()
-		{
-			TransportFactory.factoryCache = new Dictionary<string, ITransportFactory>();
-		}
-
-		private static void HandleException(Exception ex)
+		public static void HandleException(Exception ex)
 		{
 			if(TransportFactory.OnException != null)
 			{
@@ -42,35 +36,6 @@
 			}
 		}
 
-		private static ITransportFactory AddTransportFactory(string scheme)
-		{
-			ITransportFactory factory;
-
-			switch(scheme)
-			{
-				case "tcp":
-					factory = new TcpTransportFactory();
-					break;
-				case "discovery":
-					factory = new DiscoveryTransportFactory();
-					DiscoveryTransportFactory.OnException += TransportFactory.HandleException;
-					break;
-				case "failover":
-					factory = new FailoverTransportFactory();
-					break;
-				default:
-					throw new ApplicationException("The transport " + scheme + " is not supported.");
-			}
-
-			if(null == factory)
-			{
-				throw new ApplicationException("Unable to create a transport.");
-			}
-
-			TransportFactory.factoryCache.Add(scheme, factory);
-			return factory;
-		}
-
 		/// <summary>
 		/// Creates a normal transport. 
 		/// </summary>
@@ -78,42 +43,65 @@
 		/// <returns>the transport</returns>
 		public static ITransport CreateTransport(Uri location)
 		{
-			ITransportFactory tf = TransportFactory.findTransportFactory(location);
+			ITransportFactory tf = TransportFactory.CreateTransportFactory(location);
 			return tf.CreateTransport(location);
 		}
 
 		public static ITransport CompositeConnect(Uri location)
 		{
-			ITransportFactory tf = TransportFactory.findTransportFactory(location);
+			ITransportFactory tf = TransportFactory.CreateTransportFactory(location);
 			return tf.CompositeConnect(location);
 		}
 
 		/// <summary>
-		/// Find the transport factory for the scheme.  We will cache the transport
-		/// factory in a lookup table.  If we do not support the transport protocol,
-		/// an ApplicationException will be thrown.
+		/// Create a transport factory for the scheme.  If we do not support the transport protocol,
+		/// an NMSConnectionException will be thrown.
 		/// </summary>
 		/// <param name="location"></param>
 		/// <returns></returns>
-		private static ITransportFactory findTransportFactory(Uri location)
+		private static ITransportFactory CreateTransportFactory(Uri location)
 		{
 			string scheme = location.Scheme;
 
-			if(null == scheme)
+			if(null == scheme || 0 == scheme.Length)
 			{
-				throw new IOException("Transport not scheme specified: [" + location + "]");
+				throw new NMSConnectionException(String.Format("Transport scheme invalid: [{0}]", location.ToString()));
 			}
 
-			ITransportFactory tf;
+			ITransportFactory factory = null;
+
+			try
+			{
+				switch(scheme.ToLower())
+				{
+					case "tcp":
+						factory = new TcpTransportFactory();
+						break;
+					case "discovery":
+						factory = new DiscoveryTransportFactory();
+						break;
+					case "failover":
+						factory = new FailoverTransportFactory();
+						break;
+					default:
+						throw new NMSConnectionException(String.Format("The transport {0} is not supported.", scheme));
+				}
+			}
+			catch(NMSConnectionException)
+			{
+				throw;
+			}
+			catch
+			{
+				throw new NMSConnectionException("Error creating transport.");
+			}
 
-			scheme = scheme.ToLower();
-			if(!TransportFactory.factoryCache.TryGetValue(scheme, out tf))
+			if(null == factory)
 			{
-			    // missing in the cache - go add request it if it exists
-			    tf = TransportFactory.AddTransportFactory(scheme);
+				throw new NMSConnectionException("Unable to create a transport.");
 			}
 
-			return tf;
+			return factory;
 		}
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSConnectionFactoryTest.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSConnectionFactoryTest.cs?rev=776509&r1=776508&r2=776509&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSConnectionFactoryTest.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NMSConnectionFactoryTest.cs Wed May 20 00:40:53 2009
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 
+using System;
 using NUnit.Framework;
 using NUnit.Framework.Extensions;
 
@@ -24,11 +25,19 @@
 	public class NMSConnectionFactoryTest
 	{
 		[RowTest]
-		[Row("tcp://localhost:61616")]
-		[Row("stomp://localhost:61613")]
-		[Row("activemq:tcp://localhost:61616")]
-		[Row("activemq:failover://localhost:61616")]
-		[Row("activemq:failover://(tcp://localhost:61616,tcp://localhost:61616)")]
+		[Row("tcp://activemqhost:61616")]
+		[Row("activemq:tcp://activemqhost:61616")]
+		[Row("activemq:multicast://activemqhost:6155")]
+		[Row("activemq:failover://activemqhost:61616")]
+		[Row("activemq:failover://(tcp://activemqhost:61616,tcp://activemqhost:61616)")]
+
+		[Row("ftp://activemqhost:61616", ExpectedException = typeof(NMSConnectionException))]
+		[Row("http://activemqhost:61616", ExpectedException = typeof(NMSConnectionException))]
+		[Row("discovery://activemqhost:6155", ExpectedException = typeof(NMSConnectionException))]
+		[Row("sms://activemqhost:61616", ExpectedException = typeof(NMSConnectionException))]
+
+		[Row("(tcp://activemqhost:61616,tcp://activemqhost:61616)", ExpectedException = typeof(UriFormatException))]
+		[Row("tcp://activemqhost:61616,tcp://activemqhost:61616", ExpectedException = typeof(UriFormatException))]
 		public void TestURI(string connectionURI)
 		{
 			NMSConnectionFactory factory = new NMSConnectionFactory(connectionURI);



Mime
View raw message