activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1529284 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x: ./ src/main/csharp/Transport/ src/main/csharp/Transport/Discovery/ src/main/csharp/Transport/Discovery/Multicast/ src/main/csharp/Transport/Discovery/http/ src/main...
Date Fri, 04 Oct 2013 20:33:19 GMT
Author: tabish
Date: Fri Oct  4 20:33:18 2013
New Revision: 1529284

URL: http://svn.apache.org/r1529284
Log:
merge fix for:

https://issues.apache.org/jira/browse/AMQNET-445

Clean up transport code a bit.
Improve Discovery transport to use an underlying FailoverTransport to manage connect / reconnect.
Added a more defined discovery agent interface and an attribute to use to tag them so that they can be auto discovered.  
Cleaned up and improved the Multicast agent.

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Discovery/ActiveMQDiscoveryAgentFactoryAttribute.cs
      - copied unchanged from r1529281, activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/ActiveMQDiscoveryAgentFactoryAttribute.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Discovery/DiscoveryAgentFactory.cs
      - copied unchanged from r1529281, activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/DiscoveryAgentFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Discovery/DiscoveryTransport.cs
      - copied unchanged from r1529281, activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/DiscoveryTransport.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Discovery/IDiscoveryAgent.cs
      - copied unchanged from r1529281, activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/IDiscoveryAgent.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Discovery/IDiscoveryAgentFactory.cs
      - copied unchanged from r1529281, activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/IDiscoveryAgentFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgentFactory.cs
      - copied unchanged from r1529281, activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgentFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Discovery/http/
      - copied from r1529281, activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/http/
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Util/ISuspendable.cs
      - copied unchanged from r1529281, activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/ISuspendable.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Util/ServiceStopper.cs
      - copied unchanged from r1529281, activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/ServiceStopper.cs
Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/   (props changed)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Discovery/DiscoveryTransportFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/ITransportFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Mock/MockTransportFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/TransportFactory.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/vs2008-activemq.csproj   (contents, props changed)

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/
------------------------------------------------------------------------------
  Merged /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk:r1529024-1529281

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Discovery/DiscoveryTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Discovery/DiscoveryTransportFactory.cs?rev=1529284&r1=1529283&r2=1529284&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Discovery/DiscoveryTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Discovery/DiscoveryTransportFactory.cs Fri Oct  4 20:33:18 2013
@@ -1,133 +1,53 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using System.Threading;
-using Apache.NMS.ActiveMQ.Transport.Discovery.Multicast;
-using Apache.NMS.ActiveMQ.Transport.Tcp;
-using Apache.NMS.Util;
-
-namespace Apache.NMS.ActiveMQ.Transport.Discovery
-{
-    [ActiveMQTransportFactory("discovery")]
-	public class DiscoveryTransportFactory : ITransportFactory
-	{
-		private const int TIMEOUT_IN_SECONDS = 20;
-
-		private static Uri discoveredUri;
-		private static readonly MulticastDiscoveryAgent agent;
-		private static string currentServiceName;
-		private static readonly object uriLock = new object();
-		private static readonly AutoResetEvent discoveredUriEvent = new AutoResetEvent(false);
-		private static event ExceptionListener OnException;
-
-		static DiscoveryTransportFactory()
-		{
-			DiscoveryTransportFactory.OnException += TransportFactory.HandleException;
-			agent = new MulticastDiscoveryAgent();
-			agent.OnNewServiceFound += agent_OnNewServiceFound;
-			agent.OnServiceRemoved += agent_OnServiceRemoved;
-		}
-
-		public DiscoveryTransportFactory()
-		{
-			lock(uriLock)
-			{
-				currentServiceName = String.Empty;
-			}
-		}
-
-		public static Uri DiscoveredUri
-		{
-			get { lock(uriLock) { return discoveredUri; } }
-			set { lock(uriLock) { discoveredUri = value; } }
-		}
-
-		private static void agent_OnNewServiceFound(string brokerName, string serviceName)
-		{
-			lock(uriLock)
-			{
-				if(discoveredUri == null)
-				{
-					currentServiceName = serviceName;
-					discoveredUri = new Uri(currentServiceName);
-				}
-			}
-
-			// This will end the wait in the CreateTransport method.
-			discoveredUriEvent.Set();
-		}
-
-		private static void agent_OnServiceRemoved(string brokerName, string serviceName)
-		{
-			lock(uriLock)
-			{
-				if(serviceName == currentServiceName)
-				{
-					DiscoveredUri = null;
-					DiscoveryTransportFactory.OnException(new Exception("Broker connection is no longer valid."));
-				}
-			}
-		}
-
-		#region Overloaded ITransportFactory Members
-
-		public ITransport CreateTransport(Uri location)
-		{
-			URISupport.CompositeData cd = URISupport.ParseComposite(location);
-
-			if(cd.Components.Length > 0)
-			{
-				agent.DiscoveryURI = cd.Components[0];
-			}
-
-			if(!agent.IsStarted)
-			{
-				agent.Start();
-			}
-
-			Uri hostUri = DiscoveredUri;
-
-			if(null == hostUri)
-			{
-				// If a new broker is found the agent will fire an event which will result in discoveredUri being set.
-				discoveredUriEvent.WaitOne(TIMEOUT_IN_SECONDS * 1000, true);
-				hostUri = DiscoveredUri;
-				if(null == hostUri)
-				{
-					throw new NMSConnectionException(String.Format("Unable to find a connection to {0} before the timeout period expired.", location.ToString()));
-				}
-			}
-
-			TcpTransportFactory tcpTransFactory = new TcpTransportFactory();
-			return tcpTransFactory.CompositeConnect(new Uri(hostUri + location.Query));
-		}
-
-		public ITransport CompositeConnect(Uri location)
-		{
-			return CreateTransport(location);
-		}
-
-
-		public ITransport CompositeConnect(Uri location, SetTransport setTransport)
-		{
-			return CreateTransport(location);
-		}
-
-		#endregion
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Specialized;
+using Apache.NMS.ActiveMQ.Transport.Failover;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Discovery
+{
+    [ActiveMQTransportFactory("discovery")]
+	public class DiscoveryTransportFactory : FailoverTransportFactory
+	{
+        public override ITransport CreateTransport(URISupport.CompositeData compositData)
+        {
+            StringDictionary options = compositData.Parameters;
+            FailoverTransport failoverTransport = CreateTransport(options);
+            return CreateTransport(failoverTransport, compositData, options);
+        }
+
+        /// <summary>
+        /// Factory method for creating a DiscoveryTransport.  The Discovery Transport wraps the
+        /// given ICompositeTransport and will add and remove Transport URIs as they are discovered.
+        /// </summary>
+        public static DiscoveryTransport CreateTransport(ICompositeTransport compositeTransport, URISupport.CompositeData compositeData, StringDictionary options)
+        {
+            DiscoveryTransport transport = new DiscoveryTransport(compositeTransport);
+
+            URISupport.SetProperties(transport, options, "transport.");
+            transport.Properties = options;
+            
+            Uri discoveryAgentURI = compositeData.Components[0];
+            IDiscoveryAgent discoveryAgent = DiscoveryAgentFactory.CreateAgent(discoveryAgentURI);
+            transport.DiscoveryAgent = discoveryAgent;
+
+            return transport;
+        }
+	}
+}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs?rev=1529284&r1=1529283&r2=1529284&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs Fri Oct  4 20:33:18 2013
@@ -20,150 +20,356 @@ using System.Collections.Generic;
 using System.Net;
 using System.Net.Sockets;
 using System.Threading;
+using Apache.NMS.Util;
+using Apache.NMS.ActiveMQ.Threads;
 using Apache.NMS.ActiveMQ.Transport.Tcp;
+using Apache.NMS.ActiveMQ.Commands;
 
 namespace Apache.NMS.ActiveMQ.Transport.Discovery.Multicast
 {
 	internal delegate void NewBrokerServiceFound(string brokerName, string serviceName);
 	internal delegate void BrokerServiceRemoved(string brokerName, string serviceName);
 
-	internal class MulticastDiscoveryAgent : IDisposable
+	internal class MulticastDiscoveryAgent : IDiscoveryAgent, IDisposable
 	{
-		public const int MAX_SOCKET_CONNECTION_RETRY_ATTEMPS = 3;
-		public const int DEFAULT_BACKOFF_MILLISECONDS = 100;
-		public const int BACKOFF_MULTIPLIER = 2;
-		public const string DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";
-    	public const string DEFAULT_HOST_STR = "default"; 
-    	public const string DEFAULT_HOST_IP = "239.255.2.3";
-    	public const int DEFAULT_PORT = 6155;
+        public const string DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";
+        public const string DEFAULT_HOST_STR = "default"; 
+        public const string DEFAULT_HOST_IP = "239.255.2.3";
+        public const int DEFAULT_PORT = 6155;
+
+		public const int DEFAULT_INITIAL_RECONNECT_DELAY = 1000 * 5;
+        public const int DEFAULT_BACKOFF_MULTIPLIER = 2;
+        public const int DEFAULT_MAX_RECONNECT_DELAY = 1000 * 30;
 
 		private const string TYPE_SUFFIX = "ActiveMQ-4.";
 		private const string ALIVE = "alive";
 		private const string DEAD = "dead";
 		private const char DELIMITER = '%';
 		private const int BUFF_SIZE = 8192;
-		private const string DEFAULT_GROUP = "default";
-		private const int EXPIRATION_OFFSET_IN_SECONDS = 2;
-		private const int WORKER_KILL_TIME_SECONDS = 10;
-		private const int SOCKET_TIMEOUT_MILLISECONDS = 500;
-
-		private string group;
-		private readonly object stopstartSemaphore = new object();
-		private bool isStarted = false;
-		private Uri discoveryUri;
+		private const int HEARTBEAT_MISS_BEFORE_DEATH = 10;
+		private const int DEFAULT_IDLE_TIME = 500;
+        private const string DEFAULT_GROUP = "default";
+        private const int WORKER_KILL_TIME_SECONDS = 1000;
+
+        private const int MAX_SOCKET_CONNECTION_RETRY_ATTEMPS = 3;
+        private const int SOCKET_CONNECTION_BACKOFF_TIME = 500;
+
+        private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
+        private long maxReconnectDelay = DEFAULT_MAX_RECONNECT_DELAY;
+        private long backOffMultiplier = DEFAULT_BACKOFF_MULTIPLIER;
+        private bool useExponentialBackOff;
+        private int maxReconnectAttempts;
+
+        private int timeToLive = 1;
+        private string group = DEFAULT_GROUP;
+        private bool loopBackMode;
+        private Dictionary<String, RemoteBrokerData> brokersByService = 
+            new Dictionary<String, RemoteBrokerData>();
+        private readonly object servicesLock = new object();
+        private String selfService;
+        private long keepAliveInterval = DEFAULT_IDLE_TIME;
+        private string mcInterface;
+        private string mcNetworkInterface;
+        private string mcJoinNetworkInterface;
+        private DateTime lastAdvertizeTime;
+        private bool reportAdvertizeFailed = true;
+        private readonly Atomic<bool> started = new Atomic<bool>(false);
+
+        private Uri discoveryUri;
 		private Socket multicastSocket;
 		private IPEndPoint endPoint;
 		private Thread worker;
+        private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
 
-		private event NewBrokerServiceFound onNewServiceFound;
-		private event BrokerServiceRemoved onServiceRemoved;
+        private ServiceAddHandler serviceAddHandler;
+        private ServiceRemoveHandler serviceRemoveHandler;
 
-		/// <summary>
-		/// Indexed by service name
-		/// </summary>
-		private readonly Dictionary<string, RemoteBrokerData> remoteBrokers;
+        #region Property Setters and Getters
 
-		public MulticastDiscoveryAgent()
-		{
-			group = DEFAULT_GROUP;
-			remoteBrokers = new Dictionary<string, RemoteBrokerData>();
-		}
+        public bool LoopBackMode
+        {
+            get { return this.loopBackMode; }
+            set { this.loopBackMode = value; }
+        }
+
+        public int TimeToLive
+        {
+            get { return this.timeToLive; }
+            set { this.timeToLive = value; }
+        }
+
+        public long KeepAliveInterval
+        {
+            get { return this.keepAliveInterval; }
+            set { this.keepAliveInterval = value; }
+        }
+
+        public string Interface
+        {
+            get { return this.mcInterface; }
+            set { this.mcInterface = value; }
+        }
+
+        public string NetworkInterface
+        {
+            get { return this.mcNetworkInterface; }
+            set { this.mcNetworkInterface = value; }
+        }
+
+        public string JoinNetworkInterface
+        {
+            get { return this.mcJoinNetworkInterface; }
+            set { this.mcJoinNetworkInterface = value; }
+        }
+
+        public string Type
+        {
+            get { return this.group + "." + TYPE_SUFFIX; }
+        }
+
+        public long BackOffMultiplier
+        {
+            get { return this.backOffMultiplier; }
+            set { this.backOffMultiplier = value; }
+        }
+
+        public long InitialReconnectDelay
+        {
+            get { return this.initialReconnectDelay; }
+            set { this.initialReconnectDelay = value; }
+        }
+
+        public int MaxReconnectAttempts
+        {
+            get { return this.maxReconnectAttempts; }
+            set { this.maxReconnectAttempts = value; }
+        }
+
+        public long MaxReconnectDelay
+        {
+            get { return this.maxReconnectDelay; }
+            set { this.maxReconnectDelay = value; }
+        }
+
+        public bool UseExponentialBackOff
+        {
+            get { return this.useExponentialBackOff; }
+            set { this.useExponentialBackOff = value; }
+        }
+
+        public string Group
+        {
+            get { return this.group; }
+            set { this.group = value; }
+        }
+
+        public ServiceAddHandler ServiceAdd
+        {
+            get { return serviceAddHandler; }
+            set { this.serviceAddHandler = value; }
+        }
+
+        public ServiceRemoveHandler ServiceRemove
+        {
+            get { return serviceRemoveHandler; }
+            set { this.serviceRemoveHandler = value; }
+        }
+
+        public Uri DiscoveryURI
+        {
+            get { return discoveryUri; }
+            set { discoveryUri = value; }
+        }
+
+        public bool IsStarted
+        {
+            get { return started.Value; }
+        }
+
+        #endregion
+
+        public override String ToString()
+        {
+            return "MulticastDiscoveryAgent-" + (selfService != null ? "advertise:" + selfService : "");
+        }
+
+        public void RegisterService(String name)
+        {
+            this.selfService = name;
+            if (started.Value)
+            {
+                DoAdvertizeSelf();
+            }
+        }
+
+        public void ServiceFailed(DiscoveryEvent failedEvent)
+        {
+            RemoteBrokerData data = brokersByService[failedEvent.ServiceName];
+            if (data != null && data.MarkFailed()) 
+            {
+                FireServiceRemoveEvent(data);
+            }
+        }
 
 		public void Start()
 		{
-			lock(stopstartSemaphore)
-			{
-				if (discoveryUri == null || discoveryUri.Host.Equals(DEFAULT_HOST_STR))
-				{
-					discoveryUri = new Uri(DEFAULT_DISCOVERY_URI_STRING);
-				}
-				
-				if(multicastSocket == null)
-				{
-					int numFailedAttempts = 0;
-					int backoffTime = DEFAULT_BACKOFF_MILLISECONDS;
-
-					Tracer.Info("Connecting to multicast discovery socket.");
-					while(!TryToConnectSocket())
-					{
-						numFailedAttempts++;
-						if(numFailedAttempts > MAX_SOCKET_CONNECTION_RETRY_ATTEMPS)
-						{
-							throw new ApplicationException(
-								"Could not open the socket in order to discover advertising brokers.");
-						}
-
-						Thread.Sleep(backoffTime);
-						backoffTime *= BACKOFF_MULTIPLIER;
-					}
-				}
+            if (started.CompareAndSet(false, true)) {           
+                            
+                if (String.IsNullOrEmpty(group)) 
+                {
+                    throw new IOException("You must specify a group to discover");
+                }
+                String type = Type;
+                if (!type.EndsWith(".")) 
+                {
+                    Tracer.Warn("The type '" + type + "' should end with '.' to be a valid Discovery type");
+                    type += ".";
+                }
+                
+                if (discoveryUri == null) 
+                {
+                    discoveryUri = new Uri(DEFAULT_DISCOVERY_URI_STRING);
+                }
+
+                if (Tracer.IsDebugEnabled) 
+                {
+                    Tracer.Debug("start - discoveryURI = " + discoveryUri);                              
+                }
+
+                String targetHost = discoveryUri.Host;
+                int targetPort = discoveryUri.Port;
+                     
+                if (DEFAULT_HOST_STR.Equals(targetHost)) 
+                {
+                    targetHost = DEFAULT_HOST_IP;                     
+                }
+
+                if (targetPort < 0) 
+                {
+                    targetPort = DEFAULT_PORT;              
+                }
+                  
+                if (Tracer.IsDebugEnabled) 
+                {
+                    Tracer.DebugFormat("start - myHost = {0}", targetHost); 
+                    Tracer.DebugFormat("start - myPort = {0}", targetPort);    
+                    Tracer.DebugFormat("start - group  = {0}", group);                    
+                    Tracer.DebugFormat("start - interface  = {0}", mcInterface);
+                    Tracer.DebugFormat("start - network interface  = {0}", mcNetworkInterface);
+                    Tracer.DebugFormat("start - join network interface  = {0}", mcJoinNetworkInterface);
+                } 
+
+                int numFailedAttempts = 0;
+                int backoffTime = SOCKET_CONNECTION_BACKOFF_TIME;
+
+                Tracer.Info("Connecting to multicast discovery socket.");
+                while (!TryToConnectSocket(targetHost, targetPort))
+                {
+                    numFailedAttempts++;
+                    if (numFailedAttempts > MAX_SOCKET_CONNECTION_RETRY_ATTEMPS)
+                    {
+                        throw new ApplicationException(
+                            "Could not open the socket in order to discover advertising brokers.");
+                    }
+
+                    Thread.Sleep(backoffTime);
+                    backoffTime = (int)(backoffTime * BackOffMultiplier);
+                }
+
+                if(worker == null)
+                {
+                    Tracer.Info("Starting multicast discovery agent worker thread");
+                    worker = new Thread(new ThreadStart(DiscoveryAgentRun));
+                    worker.IsBackground = true;
+                    worker.Start();
+                }
 
-				if(worker == null)
-				{
-					Tracer.Info("Starting multicast discovery agent worker thread");
-					worker = new Thread(new ThreadStart(worker_DoWork));
-					worker.IsBackground = true;
-					worker.Start();
-					isStarted = true;
-				}
-			}
+                DoAdvertizeSelf();
+            }
 		}
 
 		public void Stop()
 		{
-			Thread localThread = null;
-
-			lock(stopstartSemaphore)
+            // Changing the isStarted flag will signal the thread that it needs to shut down.
+            if (started.CompareAndSet(true, false))
 			{
 				Tracer.Info("Stopping multicast discovery agent worker thread");
-				localThread = worker;
-				worker = null;
-				// Changing the isStarted flag will signal the thread that it needs to shut down.
-				isStarted = false;
+                if (multicastSocket != null)
+                {
+                    multicastSocket.Close();
+                }
+                if(worker != null)
+                {
+                    // wait for the worker to stop.
+                    if(!worker.Join(WORKER_KILL_TIME_SECONDS))
+                    {
+                        Tracer.Info("!! Timeout waiting for multicast discovery agent localThread to stop");
+                        worker.Abort();
+                    }
+                    worker = null;
+                    Tracer.Debug("Multicast discovery agent worker thread stopped");
+                }
+                executor.Shutdown();
+                if (!executor.AwaitTermination(TimeSpan.FromMinutes(1)))
+                {
+                    Tracer.DebugFormat("Failed to properly shutdown agent executor {0}", this);
+                }
 			}
-
-			if(localThread != null)
-			{
-				// wait for the worker to stop.
-				if(!localThread.Join(WORKER_KILL_TIME_SECONDS))
-				{
-					Tracer.Info("!! Timeout waiting for multicast discovery agent localThread to stop");
-					localThread.Abort();
-				}
-			}
-
-			Tracer.Info("Multicast discovery agent worker thread joined");
 		}
 
-		private bool TryToConnectSocket()
+		private bool TryToConnectSocket(string targetHost, int targetPort)
 		{
 			bool hasSucceeded = false;
 
 			try
 			{
 				multicastSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
-				endPoint = new IPEndPoint(IPAddress.Any, discoveryUri.Port);
+				endPoint = new IPEndPoint(IPAddress.Any, targetPort);
 
-				//We have to allow reuse in the multicast socket. Otherwise, we would be unable to use multiple clients on the same machine.
+				// We have to allow reuse in the multicast socket. Otherwise, we would be unable to
+                // use multiple clients on the same machine.
 				multicastSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, 1);
 				multicastSocket.Bind(endPoint);
 
 				IPAddress ipaddress;
 
-				if(!TcpTransportFactory.TryParseIPAddress(discoveryUri.Host, out ipaddress))
+				if(!TcpTransportFactory.TryParseIPAddress(targetHost, out ipaddress))
 				{
-					ipaddress = TcpTransportFactory.GetIPAddress(discoveryUri.Host, AddressFamily.InterNetwork);
+					ipaddress = TcpTransportFactory.GetIPAddress(targetHost, AddressFamily.InterNetwork);
 					if(null == ipaddress)
 					{
 						throw new NMSConnectionException("Invalid host address.");
 					}
 				}
 
-				multicastSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership,
-												 new MulticastOption(ipaddress, IPAddress.Any));
-#if !NETCF
-				multicastSocket.ReceiveTimeout = SOCKET_TIMEOUT_MILLISECONDS;
-#endif
+                if (LoopBackMode)
+                {
+                    multicastSocket.MulticastLoopback = true;
+                }
+                if (TimeToLive != 0)
+                {
+				    multicastSocket.SetSocketOption(SocketOptionLevel.IP, 
+                                                    SocketOptionName.MulticastTimeToLive, timeToLive);
+                }
+                if (!String.IsNullOrEmpty(mcJoinNetworkInterface))
+                {
+                    // TODO figure out how to set this.
+                    throw new NotSupportedException("McJoinNetworkInterface not yet implemented.");
+                }
+                else 
+                {
+                    multicastSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership,
+                                                     new MulticastOption(ipaddress, IPAddress.Any));
+                }
+
+                if (!String.IsNullOrEmpty(mcNetworkInterface))
+                {
+                    // TODO figure out how to set this.
+                    throw new NotSupportedException("McNetworkInterface not yet implemented.");
+                }
+
+				multicastSocket.ReceiveTimeout = (int)keepAliveInterval;
+
 				hasSucceeded = true;
 			}
 			catch(SocketException)
@@ -173,15 +379,16 @@ namespace Apache.NMS.ActiveMQ.Transport.
 			return hasSucceeded;
 		}
 
-		private void worker_DoWork()
+		private void DiscoveryAgentRun()
 		{
 			Thread.CurrentThread.Name = "Discovery Agent Thread.";
 			byte[] buffer = new byte[BUFF_SIZE];
 			string receivedInfoRaw;
 			string receivedInfo;
 
-			while(isStarted)
+			while (started.Value)
 			{
+                DoTimeKeepingServices();
 				try
 				{
 					int numBytes = multicastSocket.Receive(buffer);
@@ -197,177 +404,334 @@ namespace Apache.NMS.ActiveMQ.Transport.
 						receivedInfo = receivedInfoRaw;
 					}
 
-					ProcessBrokerMessage(receivedInfo);
+					ProcessServiceAdvertisement(receivedInfo);
 				}
 				catch(SocketException)
 				{
 					// There was no multicast message sent before the timeout expired...Let us try again.
 				}
 
-				//We need to clear the buffer.
+				// We need to clear the buffer.
 				buffer[0] = 0x0;
-				ExpireOldServices();
 			}
 		}
 
-		private void ProcessBrokerMessage(string message)
+		private void ProcessServiceAdvertisement(string message)
 		{
 			string payload;
 			string brokerName;
 			string serviceName;
 
-			if(message.StartsWith(MulticastType))
+			if (message.StartsWith(Type))
 			{
-				payload = message.Substring(MulticastType.Length);
+				payload = message.Substring(Type.Length);
 				brokerName = GetBrokerName(payload);
 				serviceName = GetServiceName(payload);
 
-				if(payload.StartsWith(ALIVE))
+				if (payload.StartsWith(ALIVE))
 				{
-					ProcessAliveBrokerMessage(brokerName, serviceName);
+					ProcessAlive(brokerName, serviceName);
 				}
-				else if(payload.StartsWith(DEAD))
+				else if (payload.StartsWith(DEAD))
 				{
-					ProcessDeadBrokerMessage(brokerName, serviceName);
+					ProcessDead(serviceName);
 				}
 				else
 				{
-					//Malformed Payload
+					// Malformed Payload
 				}
 			}
 		}
 
-		private void ProcessDeadBrokerMessage(string brokerName, string serviceName)
-		{
-			if(remoteBrokers.ContainsKey(serviceName))
-			{
-				remoteBrokers.Remove(serviceName);
-				if(onServiceRemoved != null)
-				{
-					onServiceRemoved(brokerName, serviceName);
-				}
-			}
-		}
-
-		private void ProcessAliveBrokerMessage(string brokerName, string serviceName)
-		{
-			if(remoteBrokers.ContainsKey(serviceName))
-			{
-				remoteBrokers[serviceName].UpdateHeartBeat();
-			}
-			else
-			{
-				remoteBrokers.Add(serviceName, new RemoteBrokerData(brokerName, serviceName));
-
-				if(onNewServiceFound != null)
-				{
-					onNewServiceFound(brokerName, serviceName);
-				}
-			}
-		}
+        private void DoTimeKeepingServices() 
+        {
+            if (started.Value)
+            {
+                DateTime currentTime = DateTime.Now;
+                if (currentTime < lastAdvertizeTime || 
+                    ((currentTime - TimeSpan.FromMilliseconds(keepAliveInterval)) > lastAdvertizeTime))
+                {
+                    DoAdvertizeSelf();
+                    lastAdvertizeTime = currentTime;
+                }
+                DoExpireOldServices();
+            }
+        }
+
+        private void DoAdvertizeSelf()
+        {
+            if (!String.IsNullOrEmpty(selfService)) 
+            {
+                String payload = Type;
+                payload += started.Value ? ALIVE : DEAD;
+                payload += DELIMITER + "localhost" + DELIMITER;
+                payload += selfService;
+                try 
+                {
+                    byte[] data = System.Text.Encoding.UTF8.GetBytes(payload);
+                    multicastSocket.Send(data);
+                } 
+                catch (Exception e) 
+                {
+                    // If a send fails, chances are all subsequent sends will fail
+                    // too.. No need to keep reporting the
+                    // same error over and over.
+                    if (reportAdvertizeFailed) 
+                    {
+                        reportAdvertizeFailed = false;
+                        Tracer.ErrorFormat("Failed to advertise our service: {0} cause: {1}", payload, e.Message);
+                    }
+                }
+            }
+        }
+
+        private void ProcessAlive(string brokerName, string service)
+        {
+            if (selfService == null || !service.Equals(selfService)) 
+            {
+                RemoteBrokerData remoteBroker = null;
+                lock (servicesLock)
+                {
+                    brokersByService.TryGetValue(service, out remoteBroker);
+                }
+                if (remoteBroker == null) 
+                {
+                    remoteBroker = new RemoteBrokerData(this, brokerName, service);
+                    brokersByService.Add(service, remoteBroker);      
+                    FireServiceAddEvent(remoteBroker);
+                    DoAdvertizeSelf();
+                } 
+                else 
+                {
+                    remoteBroker.UpdateHeartBeat();
+                    if (remoteBroker.IsTimeForRecovery()) 
+                    {
+                        FireServiceAddEvent(remoteBroker);
+                    }
+                }
+            }
+        }
+
+        private void ProcessDead(string service) 
+        {
+            if (!service.Equals(selfService)) 
+            {
+                RemoteBrokerData remoteBroker = null;
+                lock (servicesLock)
+                {
+                    brokersByService.TryGetValue(service, out remoteBroker);
+                    if (remoteBroker != null)
+                    {
+                        brokersByService.Remove(service);
+                    }
+                }
+                if (remoteBroker != null && !remoteBroker.Failed) 
+                {
+                    FireServiceRemoveEvent(remoteBroker);
+                }
+            }
+        }
+
+        private void DoExpireOldServices() 
+        {
+            DateTime expireTime = DateTime.Now - TimeSpan.FromMilliseconds(keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH); 
+
+            RemoteBrokerData[] services = null;
+            lock (servicesLock)
+            {
+                services = new RemoteBrokerData[this.brokersByService.Count];
+                this.brokersByService.Values.CopyTo(services, 0);
+            }
+
+            foreach(RemoteBrokerData service in services)
+            {
+                if (service.LastHeartBeat < expireTime) 
+                {
+                    ProcessDead(service.ServiceName);
+                }
+            }
+        }
 
 		private static string GetBrokerName(string payload)
 		{
 			string[] results = payload.Split(DELIMITER);
-			return results[1];
+            if (results.Length >= 2)
+            {
+			    return results[1];
+            }
+            return null;
 		}
 
 		private static string GetServiceName(string payload)
 		{
 			string[] results = payload.Split(DELIMITER);
-			return results[2];
-		}
-
-		private void ExpireOldServices()
-		{
-			DateTime expireTime;
-			List<RemoteBrokerData> deadServices = new List<RemoteBrokerData>();
-
-			foreach(KeyValuePair<string, RemoteBrokerData> brokerService in remoteBrokers)
-			{
-				expireTime = brokerService.Value.lastHeartBeat.AddSeconds(EXPIRATION_OFFSET_IN_SECONDS);
-				if(DateTime.Now > expireTime)
-				{
-					deadServices.Add(brokerService.Value);
-				}
-			}
-
-			// Remove all of the dead services
-			for(int i = 0; i < deadServices.Count; i++)
-			{
-				ProcessDeadBrokerMessage(deadServices[i].brokerName, deadServices[i].serviceName);
-			}
-		}
-
-		#region Properties
-
-		/// <summary>
-		/// This property indicates whether or not async send is enabled.
-		/// </summary>
-		public Uri DiscoveryURI
-		{
-			get { return discoveryUri; }
-			set { discoveryUri = value; }
-		}
-
-		public bool IsStarted
-		{
-			get { return isStarted; }
-		}
-
-		public string Group
-		{
-			get { return group; }
-			set { group = value; }
-		}
-
-		#endregion
-
-		internal string MulticastType
-		{
-			get { return group + "." + TYPE_SUFFIX; }
-		}
-
-		internal event NewBrokerServiceFound OnNewServiceFound
-		{
-			add { onNewServiceFound += value; }
-			remove { onNewServiceFound -= value; }
-		}
-
-		internal event BrokerServiceRemoved OnServiceRemoved
-		{
-			add { onServiceRemoved += value; }
-			remove { onServiceRemoved += value; }
+            if (results.Length >= 3)
+            {
+                return results[2];
+            }
+            return null;
 		}
 
 		public void Dispose()
 		{
-			if(isStarted)
+			if(started.Value)
 			{
 				Stop();
 			}
-
-			multicastSocket.Shutdown(SocketShutdown.Both);
-			multicastSocket = null;
 		}
 
-		internal class RemoteBrokerData
-		{
-			internal string brokerName;
-			internal string serviceName;
+        private void FireServiceRemoveEvent(RemoteBrokerData data) 
+        {
+            if (serviceRemoveHandler != null && started.Value) 
+            {
+                // Have the listener process the event async so that
+                // he does not block this thread since we are doing time sensitive
+                // processing of events.
+                executor.QueueUserWorkItem(ServiceRemoveCallback, data);
+            }
+        }
+
+        private void ServiceRemoveCallback(object data)
+        {
+            RemoteBrokerData serviceData = data as RemoteBrokerData;
+            this.serviceRemoveHandler(serviceData);
+        }
+
+        private void FireServiceAddEvent(RemoteBrokerData data) 
+        {
+            if (serviceAddHandler != null && started.Value) 
+            {
+                // Have the listener process the event async so that
+                // he does not block this thread since we are doing time sensitive
+                // processing of events.
+                executor.QueueUserWorkItem(ServiceAddCallback, data);
+            }
+        }
+
+        private void ServiceAddCallback(object data)
+        {
+            RemoteBrokerData serviceData = data as RemoteBrokerData;
+            this.serviceAddHandler(serviceData);
+        }
+
+		internal class RemoteBrokerData : DiscoveryEvent
+		{
+            internal DateTime recoveryTime = DateTime.MinValue;
+            internal int failureCount;
+            internal bool failed;
 			internal DateTime lastHeartBeat;
 
-			internal RemoteBrokerData(string brokerName, string serviceName)
+            private readonly object syncRoot = new object();
+            private readonly MulticastDiscoveryAgent parent;
+
+			internal RemoteBrokerData(MulticastDiscoveryAgent parent, string brokerName, string serviceName) : base()
 			{
-				this.brokerName = brokerName;
-				this.serviceName = serviceName;
+                this.parent = parent;
+				this.BrokerName = brokerName;
+				this.ServiceName = serviceName;
 				this.lastHeartBeat = DateTime.Now;
 			}
 
+            internal bool Failed
+            {
+                get { return this.failed; }
+            }
+
+            internal DateTime LastHeartBeat
+            {
+                get 
+                { 
+                    lock(syncRoot)
+                    {
+                        return this.lastHeartBeat; 
+                    }
+                }
+            }
+
 			internal void UpdateHeartBeat()
 			{
-				this.lastHeartBeat = DateTime.Now;
-			}
+                lock (syncRoot)
+                {
+                    this.lastHeartBeat = DateTime.Now;
+
+                    // Consider that the broker recovery has succeeded if it has not
+                    // failed in 60 seconds.
+                    if (!failed && failureCount > 0 && 
+                        (lastHeartBeat - recoveryTime) > TimeSpan.FromMilliseconds(1000 * 60)) {
+
+                        Tracer.DebugFormat("I now think that the {0} service has recovered.", ServiceName);
+
+                        failureCount = 0;
+                        recoveryTime = DateTime.MinValue;
+                    }
+                }
+			}
+
+            internal bool MarkFailed()
+            {
+                lock (syncRoot)
+                {
+                    if (!failed) 
+                    {
+                        failed = true;
+                        failureCount++;
+
+                        long reconnectDelay;
+                        if (!parent.UseExponentialBackOff) 
+                        {
+                            reconnectDelay = parent.InitialReconnectDelay;
+                        } 
+                        else 
+                        {
+                            reconnectDelay = (long)Math.Pow(parent.BackOffMultiplier, failureCount);
+                            reconnectDelay = Math.Min(reconnectDelay, parent.MaxReconnectDelay);
+                        }
+
+                        Tracer.DebugFormat("Remote failure of {0} while still receiving multicast advertisements.  " +
+                                           "Advertising events will be suppressed for {1} ms, the current " +
+                                           "failure count is: {2}", ServiceName, reconnectDelay, failureCount);
+
+                        recoveryTime = DateTime.Now + TimeSpan.FromMilliseconds(reconnectDelay);
+                        return true;
+                    }
+                }
+                return false;            
+            }
+
+            /// <summary>
+            /// Returns true if this Broker has been marked as failed and it is now time to
+            /// start a recovery attempt.
+            /// </summary>
+            public bool IsTimeForRecovery() 
+            {
+                lock (syncRoot)
+                {
+                    if (!failed) 
+                    {
+                        return false;
+                    }
+
+                    int maxReconnectAttempts = parent.MaxReconnectAttempts;
+
+                    // Are we done trying to recover this guy?
+                    if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) 
+                    {
+                        Tracer.DebugFormat("Max reconnect attempts of the {0} service has been reached.", ServiceName);
+                        return false;
+                    }
+
+                    // Is it not yet time?
+                    if (DateTime.Now < recoveryTime) 
+                    {
+                        return false;
+                    }
+
+                    Tracer.DebugFormat("Resuming event advertisement of the {0} service.", ServiceName);
+
+                    failed = false;
+                    return true;
+                }
+            }
 		}
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs?rev=1529284&r1=1529283&r2=1529284&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Failover/FailoverTransportFactory.cs Fri Oct  4 20:33:18 2013
@@ -1,69 +1,68 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using System.Collections.Specialized;
-using Apache.NMS.Util;
-
-namespace Apache.NMS.ActiveMQ.Transport.Failover
-{
-    [ActiveMQTransportFactory("failover")]
-	public class FailoverTransportFactory : ITransportFactory
-	{
-		private ITransport doConnect(Uri location)
-		{
-			ITransport transport = CreateTransport(URISupport.ParseComposite(location));
-			transport = new MutexTransport(transport);
-			transport = new ResponseCorrelator(transport);
-			return transport;
-		}
-
-		public ITransport CompositeConnect(Uri location)
-		{
-			return CreateTransport(URISupport.ParseComposite(location));
-		}
-
-		public ITransport CompositeConnect(Uri location, SetTransport setTransport)
-		{
-			throw new NMSConnectionException("Asynchronous composite connection not supported with Failover transport.");
-		}
-
-		public ITransport CreateTransport(Uri location)
-		{
-			return doConnect(location);
-		}
-
-		/// <summary>
-		/// </summary>
-		/// <param name="compositData"></param>
-		/// <returns></returns>
-		public ITransport CreateTransport(URISupport.CompositeData compositData)
-		{
-			StringDictionary options = compositData.Parameters;
-			FailoverTransport transport = CreateTransport(options);
-			transport.Add(false, compositData.Components);
-			return transport;
-		}
-
-		public FailoverTransport CreateTransport(StringDictionary parameters)
-		{
-			FailoverTransport transport = new FailoverTransport();
-			URISupport.SetProperties(transport, parameters, "transport.");
-			return transport;
-		}
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Specialized;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Failover
+{
+    [ActiveMQTransportFactory("failover")]
+	public class FailoverTransportFactory : ITransportFactory
+	{
+		private ITransport doConnect(Uri location)
+		{
+			ITransport transport = CreateTransport(URISupport.ParseComposite(location));
+			transport = new MutexTransport(transport);
+			transport = new ResponseCorrelator(transport);
+			return transport;
+		}
+
+		public ITransport CompositeConnect(Uri location)
+		{
+			return CreateTransport(URISupport.ParseComposite(location));
+		}
+
+		public ITransport CreateTransport(Uri location)
+		{
+			return doConnect(location);
+		}
+
+		/// <summary>
+        /// Virtual transport create method which can be overriden by subclasses to provide
+        /// an alternate FailoverTransport implementation.  All transport creation methods in
+        /// this factory calls through this method to create the ITransport instance so this
+        /// is the only method that needs to be overriden.  
+		/// </summary>
+		/// <param name="compositData"></param>
+		/// <returns></returns>
+		public virtual ITransport CreateTransport(URISupport.CompositeData compositData)
+		{
+			StringDictionary options = compositData.Parameters;
+			FailoverTransport transport = CreateTransport(options);
+			transport.Add(false, compositData.Components);
+			return transport;
+		}
+
+		protected FailoverTransport CreateTransport(StringDictionary parameters)
+		{
+			FailoverTransport transport = new FailoverTransport();
+			URISupport.SetProperties(transport, parameters, "transport.");
+			return transport;
+		}
+	}
+}

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/ITransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/ITransportFactory.cs?rev=1529284&r1=1529283&r2=1529284&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/ITransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/ITransportFactory.cs Fri Oct  4 20:33:18 2013
@@ -25,6 +25,5 @@ namespace Apache.NMS.ActiveMQ.Transport
 	{
 		ITransport CreateTransport(Uri location);
 		ITransport CompositeConnect(Uri location);
-		ITransport CompositeConnect(Uri location, SetTransport setTransport);
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Mock/MockTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Mock/MockTransportFactory.cs?rev=1529284&r1=1529283&r2=1529284&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Mock/MockTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Mock/MockTransportFactory.cs Fri Oct  4 20:33:18 2013
@@ -137,10 +137,5 @@ namespace Apache.NMS.ActiveMQ.Transport.
 
 			return transport;
 		}
-
-		public ITransport CompositeConnect(Uri location, SetTransport setTransport)
-		{
-			throw new NMSConnectionException("Asynchronous composite connection not supported with Mock transport.");
-		}
 	}
 }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=1529284&r1=1529283&r2=1529284&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs Fri Oct  4 20:33:18 2013
@@ -107,11 +107,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
 
         public ITransport CompositeConnect(Uri location)
         {
-            return CompositeConnect(location, null);
-        }
-
-        public ITransport CompositeConnect(Uri location, SetTransport setTransport)
-        {
             // Extract query parameters from broker Uri
             StringDictionary map = URISupport.ParseQuery(location.Query);
 
@@ -165,10 +160,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
             }
 
             transport = new WireFormatNegotiator(transport, wireformat);
-            if(setTransport != null)
-            {
-                setTransport(transport, location);
-            }
 
             return transport;
         }

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/TransportFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/TransportFactory.cs?rev=1529284&r1=1529283&r2=1529284&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/TransportFactory.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/Transport/TransportFactory.cs Fri Oct  4 20:33:18 2013
@@ -19,10 +19,6 @@ using System;
 using System.Reflection;
 using System.Collections.Generic;
 using Apache.NMS.ActiveMQ.Util;
-using Apache.NMS.ActiveMQ.Transport.Discovery;
-using Apache.NMS.ActiveMQ.Transport.Failover;
-using Apache.NMS.ActiveMQ.Transport.Mock;
-using Apache.NMS.ActiveMQ.Transport.Tcp;
 
 namespace Apache.NMS.ActiveMQ.Transport
 {
@@ -69,12 +65,6 @@ namespace Apache.NMS.ActiveMQ.Transport
 			return tf.CompositeConnect(location);
 		}
 
-		public static ITransport AsyncCompositeConnect(Uri location, SetTransport setTransport)
-		{
-			ITransportFactory tf = TransportFactory.CreateTransportFactory(location);
-			return tf.CompositeConnect(location, setTransport);
-		}
-
 		/// <summary>
 		/// Create a transport factory for the scheme.  If we do not support the transport protocol,
 		/// an NMSConnectionException will be thrown.

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/vs2008-activemq.csproj
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/vs2008-activemq.csproj?rev=1529284&r1=1529283&r2=1529284&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/vs2008-activemq.csproj (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/vs2008-activemq.csproj Fri Oct  4 20:33:18 2013
@@ -329,6 +329,68 @@
     <Compile Include="src\main\csharp\OpenWire\OpenWireFormat.cs">
       <SubType>Code</SubType>
     </Compile>
+    <Compile Include="src\main\csharp\OpenWire\V10\ActiveMQBlobMessageMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ActiveMQBytesMessageMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ActiveMQDestinationMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ActiveMQMapMessageMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ActiveMQMessageMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ActiveMQObjectMessageMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ActiveMQQueueMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ActiveMQStreamMessageMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ActiveMQTempDestinationMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ActiveMQTempQueueMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ActiveMQTempTopicMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ActiveMQTextMessageMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ActiveMQTopicMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\BaseCommandMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\BrokerIdMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\BrokerInfoMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ConnectionControlMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ConnectionErrorMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ConnectionIdMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ConnectionInfoMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ConsumerControlMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ConsumerIdMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ConsumerInfoMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ControlCommandMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\DataArrayResponseMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\DataResponseMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\DestinationInfoMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\DiscoveryEventMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ExceptionResponseMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\FlushCommandMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\IntegerResponseMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\JournalQueueAckMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\JournalTopicAckMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\JournalTraceMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\JournalTransactionMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\KeepAliveInfoMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\LastPartialCommandMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\LocalTransactionIdMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\MarshallerFactory.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\MessageAckMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\MessageDispatchMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\MessageDispatchNotificationMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\MessageIdMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\MessageMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\MessagePullMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\NetworkBridgeFilterMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\PartialCommandMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ProducerAckMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ProducerIdMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ProducerInfoMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\RemoveInfoMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\RemoveSubscriptionInfoMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ReplayCommandMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ResponseMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\SessionIdMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\SessionInfoMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\ShutdownInfoMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\SubscriptionInfoMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\TransactionIdMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\TransactionInfoMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\WireFormatInfoMarshaller.cs" />
+    <Compile Include="src\main\csharp\OpenWire\V10\XATransactionIdMarshaller.cs" />
     <Compile Include="src\main\csharp\OpenWire\V1\ActiveMQBlobMessageMarshaller.cs" />
     <Compile Include="src\main\csharp\OpenWire\V1\ActiveMQBytesMessageMarshaller.cs">
       <SubType>Code</SubType>
@@ -1226,6 +1288,7 @@
     <Compile Include="src\main\csharp\Util\LRUCache.cs" />
     <Compile Include="src\main\csharp\Util\MemoryUsage.cs" />
     <Compile Include="src\main\csharp\Util\MessageDispatchChannel.cs" />
+    <Compile Include="src\main\csharp\Util\ServiceStopper.cs" />
     <Compile Include="src\main\csharp\Util\SimplePriorityMessageDispatchChannel.cs" />
   </ItemGroup>
   <ItemGroup>

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/vs2008-activemq.csproj
------------------------------------------------------------------------------
  Merged /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq.csproj:r1529024-1529281



Mime
View raw message