Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 45871 invoked from network); 27 Jul 2010 15:59:47 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 27 Jul 2010 15:59:47 -0000 Received: (qmail 48933 invoked by uid 500); 27 Jul 2010 15:59:47 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 48869 invoked by uid 500); 27 Jul 2010 15:59:47 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 48862 invoked by uid 99); 27 Jul 2010 15:59:46 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Jul 2010 15:59:46 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Jul 2010 15:59:45 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 71B83238899C; Tue, 27 Jul 2010 15:58:53 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r979759 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport: ./ Failover/ Mock/ Tcp/ Date: Tue, 27 Jul 2010 15:58:53 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100727155853.71B83238899C@eris.apache.org> Author: tabish Date: Tue Jul 27 15:58:52 2010 New Revision: 979759 URL: http://svn.apache.org/viewvc?rev=979759&view=rev Log: https://issues.apache.org/activemq/browse/AMQNET-266 * ITransport.cs: * TransportFilter.cs: * Tcp/TcpTransport.cs: * Mock/MockTransport.cs: * ICompositeTransport.cs: * Failover/BackupTransport.cs: * Failover/FailoverTransport.cs: * Failover/FailoverTransportFactory.cs: Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs?rev=979759&r1=979758&r2=979759&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/BackupTransport.cs Tue Jul 27 15:58:52 2010 @@ -41,7 +41,7 @@ namespace Apache.NMS.ActiveMQ.Transport. this.disposed = true; if(failoverTransport != null) { - this.failoverTransport.Reconnect(); + this.failoverTransport.Reconnect(false); } } Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=979759&r1=979758&r2=979759&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs Tue Jul 27 15:58:52 2010 @@ -38,6 +38,7 @@ namespace Apache.NMS.ActiveMQ.Transport. private bool disposed; private bool connected; private List uris = new List(); + private List updated = new List(); private CommandHandler commandHandler; private ExceptionHandler exceptionHandler; private InterruptedHandler interruptedHandler; @@ -76,7 +77,9 @@ namespace Apache.NMS.ActiveMQ.Transport. private int maxCacheSize = 256; private volatile Exception failure; private readonly object mutex = new object(); - + private bool reconnectSupported = true; + private bool updateURIsSupported = true; + public FailoverTransport() { id = idCounter++; @@ -303,7 +306,17 @@ namespace Apache.NMS.ActiveMQ.Transport. { get { return started; } } - + + public bool IsReconnectSupported + { + get{ return this.reconnectSupported; } + } + + public bool IsUpdateURIsSupported + { + get{ return this.updateURIsSupported; } + } + /// /// /// @@ -399,7 +412,7 @@ namespace Apache.NMS.ActiveMQ.Transport. } else { - Reconnect(); + Reconnect(false); } } } @@ -499,22 +512,13 @@ namespace Apache.NMS.ActiveMQ.Transport. if(!initialized) { - if(command.IsBrokerInfo) - { - BrokerInfo info = (BrokerInfo) command; - BrokerInfo[] peers = info.PeerBrokerInfos; - if(peers != null) - { - for(int i = 0; i < peers.Length; i++) - { - String brokerString = peers[i].BrokerURL; - Add(brokerString); - } - } - - initialized = true; - } + initialized = true; } + + if(command.IsConnectionControl) + { + this.HandleConnectionControl(command as ConnectionControl); + } } this.Command(sender, command); @@ -669,7 +673,7 @@ namespace Apache.NMS.ActiveMQ.Transport. } } - public void Add(Uri[] u) + public void Add(bool rebalance, Uri[] u) { lock(uris) { @@ -682,10 +686,10 @@ namespace Apache.NMS.ActiveMQ.Transport. } } - Reconnect(); + Reconnect(rebalance); } - public void Remove(Uri[] u) + public void Remove(bool rebalance, Uri[] u) { lock(uris) { @@ -695,10 +699,10 @@ namespace Apache.NMS.ActiveMQ.Transport. } } - Reconnect(); + Reconnect(rebalance); } - public void Add(String u) + public void Add(bool rebalance, String u) { try { @@ -708,10 +712,9 @@ namespace Apache.NMS.ActiveMQ.Transport. if(!uris.Contains(uri)) { uris.Add(uri); + Reconnect(rebalance); } - } - - Reconnect(); + } } catch(Exception e) { @@ -721,10 +724,10 @@ namespace Apache.NMS.ActiveMQ.Transport. public void Reconnect(Uri uri) { - Add(new Uri[] { uri }); + Add(true, new Uri[] { uri }); } - public void Reconnect() + public void Reconnect(bool rebalance) { lock(reconnectMutex) { @@ -1161,7 +1164,118 @@ namespace Apache.NMS.ActiveMQ.Transport. } } - public void Dispose() + + public void UpdateURIs(bool rebalance, Uri[] updatedURIs) + { + if(IsUpdateURIsSupported) + { + List copy = new List(this.updated); + List added = new List(); + + if(updatedURIs != null && updatedURIs.Length > 0) + { + HashSet uriSet = new HashSet(); + for(int i = 0; i < updatedURIs.Length; i++) + { + Uri uri = updatedURIs[i]; + if(uri != null) + { + uriSet.Add(uri); + } + } + + foreach(Uri uri in uriSet) + { + if(copy.Remove(uri) == false) + { + uriSet.Add(uri); + } + } + + lock(reconnectMutex) + { + this.updated.Clear(); + this.updated.AddRange(added); + + foreach(Uri uri in copy) + { + this.uris.Remove(uri); + } + + this.Add(rebalance, added.ToArray()); + } + } + } + } + + public void HandleConnectionControl(ConnectionControl control) + { + string reconnectStr = control.ReconnectTo; + + if(reconnectStr != null) + { + reconnectStr = reconnectStr.Trim(); + if(reconnectStr.Length > 0) + { + try + { + Uri uri = new Uri(reconnectStr); + if(IsReconnectSupported) + { + Reconnect(uri); + Tracer.Info("Reconnected to: " + uri.OriginalString); + } + } + catch(Exception e) + { + Tracer.ErrorFormat("Failed to handle ConnectionControl reconnect to {0}: {1}", reconnectStr, e); + } + } + } + + ProcessNewTransports(control.RebalanceConnection, control.ConnectedBrokers); + } + + private void ProcessNewTransports(bool rebalance, String newTransports) + { + if(newTransports != null) + { + newTransports = newTransports.Trim(); + + if(newTransports.Length > 0 && IsUpdateURIsSupported) + { + List list = new List(); + String[] tokens = newTransports.Split(new Char []{','}); + + foreach(String str in tokens) + { + try + { + Uri uri = new Uri(str); + list.Add(uri); + } + catch + { + Tracer.Error("Failed to parse broker address: " + str); + } + } + + if(list.Count != 0) + { + try + { + UpdateURIs(rebalance, list.ToArray()); + } + catch + { + Tracer.Error("Failed to update transport URI's from: " + newTransports); + } + } + } + } + } + + public void Dispose() { Dispose(true); GC.SuppressFinalize(this); Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs?rev=979759&r1=979758&r2=979759&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs Tue Jul 27 15:58:52 2010 @@ -54,7 +54,7 @@ namespace Apache.NMS.ActiveMQ.Transport. { StringDictionary options = compositData.Parameters; FailoverTransport transport = CreateTransport(options); - transport.Add(compositData.Components); + transport.Add(false, compositData.Components); return transport; } Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs?rev=979759&r1=979758&r2=979759&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ICompositeTransport.cs Tue Jul 27 15:58:52 2010 @@ -21,8 +21,29 @@ namespace Apache.NMS.ActiveMQ.Transport { public interface ICompositeTransport : ITransport { - void Add(Uri[] uris); - void Remove(Uri[] uris); + /// + /// Adds a new set of Uris to the list of Uris that this Transport can connect to. + /// + /// + /// A + /// Should the current connection be broken and a new one created. + /// + /// + /// A + /// + void Add(bool rebalance, Uri[] uris); + + /// + /// Remove the given Uris from this Transports list of known Uris. + /// + /// + /// A + /// Should the current connection be broken and a new one created. + /// + /// + /// A + /// + void Remove(bool rebalance, Uri[] uris); } } Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs?rev=979759&r1=979758&r2=979759&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/ITransport.cs Tue Jul 27 15:58:52 2010 @@ -156,6 +156,36 @@ namespace Apache.NMS.ActiveMQ.Transport get; } + /// + /// Returns true if this Transport supports reconnections. + /// + bool IsReconnectSupported + { + get; + } + + /// + /// Returns true if this Transport can accept updated lists of connection Uri's. + /// + bool IsUpdateURIsSupported + { + get; + } + + /// + /// Updates the Uri's that this Transport is aware of and will use to + /// connect itself to. If the rebalance option is true this method will + /// terminate any current connection and reconnect to another available + /// Uri. + /// + /// + /// A + /// + /// + /// A + /// + void UpdateURIs(bool rebalance, Uri[] updatedURIs); + } } Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs?rev=979759&r1=979758&r2=979759&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Mock/MockTransport.cs Tue Jul 27 15:58:52 2010 @@ -372,6 +372,21 @@ namespace Apache.NMS.ActiveMQ.Transport. get{ return new Uri("mock://mock"); } } + public bool IsReconnectSupported + { + get{ return false; } + } + + public bool IsUpdateURIsSupported + { + get{ return false; } + } + + public void UpdateURIs(bool rebalance, Uri[] updatedURIs) + { + throw new IOException(); + } + #endregion } } Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=979759&r1=979758&r2=979759&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs Tue Jul 27 15:58:52 2010 @@ -365,6 +365,20 @@ namespace Apache.NMS.ActiveMQ.Transport. return null; } + public bool IsReconnectSupported + { + get{ return false; } + } + + public bool IsUpdateURIsSupported + { + get{ return false; } + } + + public void UpdateURIs(bool rebalance, Uri[] updatedURIs) + { + throw new IOException(); + } } } Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs?rev=979759&r1=979758&r2=979759&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/TransportFilter.cs Tue Jul 27 15:58:52 2010 @@ -217,6 +217,22 @@ namespace Apache.NMS.ActiveMQ.Transport { get{ return next.RemoteAddress; } } + + public bool IsReconnectSupported + { + get{ return next.IsReconnectSupported; } + } + + public bool IsUpdateURIsSupported + { + get{ return next.IsUpdateURIsSupported; } + } + + public void UpdateURIs(bool rebalance, Uri[] updatedURIs) + { + next.UpdateURIs(rebalance, updatedURIs); + } + } }