activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1530315 - in /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport: Discovery/ Discovery/Multicast/ Discovery/http/ Failover/
Date Tue, 08 Oct 2013 15:45:15 GMT
Author: tabish
Date: Tue Oct  8 15:45:14 2013
New Revision: 1530315

URL: http://svn.apache.org/r1530315
Log:
https://issues.apache.org/jira/browse/AMQNET-446

Add Http based discovery agent, refactored the common bits out of Multicast into an abstract agent so new agents can be easily added.  
Fixes [AMQNET-AMQNET-446]. (See https://issues.apache.org/activemq/browse/AMQNET-AMQNET-446)

Added:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/AbstractDiscoveryAgent.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/DiscoveredServiceData.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/http/HttpDiscoveryAgent.cs   (with props)
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/http/HttpDiscoveryAgentFactory.cs   (with props)
Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/AbstractDiscoveryAgent.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/AbstractDiscoveryAgent.cs?rev=1530315&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/AbstractDiscoveryAgent.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/AbstractDiscoveryAgent.cs Tue Oct  8 15:45:14 2013
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Threading;
+using System.Collections.Generic;
+using Apache.NMS.Util;
+using Apache.NMS.ActiveMQ.Threads;
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.Transport.Discovery
+{
+    public abstract class AbstractDiscoveryAgent : IDiscoveryAgent, IDisposable
+    {
+        public const int DEFAULT_INITIAL_RECONNECT_DELAY = 1000 * 5;
+        public const int DEFAULT_BACKOFF_MULTIPLIER = 2;
+        public const int DEFAULT_MAX_RECONNECT_DELAY = 1000 * 30;
+
+        private const int WORKER_KILL_TIME_SECONDS = 1000;
+        private const int HEARTBEAT_MISS_BEFORE_DEATH = 10;
+
+        private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
+        private long maxReconnectDelay = DEFAULT_MAX_RECONNECT_DELAY;
+        private long backOffMultiplier = DEFAULT_BACKOFF_MULTIPLIER;
+        private bool useExponentialBackOff;
+        private int maxReconnectAttempts;
+
+        protected readonly Atomic<bool> started = new Atomic<bool>(false);
+        protected Thread worker;
+        protected readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
+
+        protected Dictionary<String, DiscoveredServiceData> discoveredServices = 
+            new Dictionary<String, DiscoveredServiceData>();
+        protected readonly object discoveredServicesLock = new object();
+
+        private Uri discoveryUri;
+        private String selfService;
+        private String group;
+        private ServiceAddHandler serviceAddHandler;
+        private ServiceRemoveHandler serviceRemoveHandler;
+        private DateTime lastAdvertizeTime;
+        private bool reportAdvertizeFailed = true;
+
+        #region Property Getters and Setters
+
+        internal string SelfService
+        {
+            get { return this.selfService; }
+        }
+
+        internal DateTime LastAdvertizeTime
+        {
+            get { return this.lastAdvertizeTime; }
+            set { this.lastAdvertizeTime = value; }
+        }
+
+        public long BackOffMultiplier
+        {
+            get { return this.backOffMultiplier; }
+            set { this.backOffMultiplier = value; }
+        }
+
+        public long InitialReconnectDelay
+        {
+            get { return this.initialReconnectDelay; }
+            set { this.initialReconnectDelay = value; }
+        }
+
+        public int MaxReconnectAttempts
+        {
+            get { return this.maxReconnectAttempts; }
+            set { this.maxReconnectAttempts = value; }
+        }
+
+        public long MaxReconnectDelay
+        {
+            get { return this.maxReconnectDelay; }
+            set { this.maxReconnectDelay = value; }
+        }
+
+        public bool UseExponentialBackOff
+        {
+            get { return this.useExponentialBackOff; }
+            set { this.useExponentialBackOff = value; }
+        }
+
+        public string Group
+        {
+            get { return this.group; }
+            set { this.group = value; }
+        }
+
+        public ServiceAddHandler ServiceAdd
+        {
+            get { return serviceAddHandler; }
+            set { this.serviceAddHandler = value; }
+        }
+
+        public ServiceRemoveHandler ServiceRemove
+        {
+            get { return serviceRemoveHandler; }
+            set { this.serviceRemoveHandler = value; }
+        }
+
+        public Uri DiscoveryURI
+        {
+            get { return discoveryUri; }
+            set { discoveryUri = value; }
+        }
+
+        public bool IsStarted
+        {
+            get { return started.Value; }
+        }
+
+        #endregion
+
+        #region Abstract methods
+
+        /// <summary>
+        /// Gets or sets the keep alive interval.  This interval controls the amount 
+        /// of time that a service is kept before being considered idle and removed from
+        /// the list of discovered services.  This value is also used to control the
+        /// period of time that this service will wait before advertising itself.
+        /// </summary>
+        public abstract long KeepAliveInterval
+        {
+            get;
+            set;
+        }
+
+        /// <summary>
+        /// Overriden by the actual agent class to handle the publish of this service
+        /// if supported by the agent.
+        /// </summary>
+        protected abstract void DoAdvertizeSelf();
+
+        /// <summary>
+        /// Overriden by the agent class to handle starting any agent related services
+        /// or opening resources needed for the agent.
+        /// </summary>
+        protected abstract void DoStartAgent();
+
+        /// <summary>
+        /// Overriden by the agent to handle shutting down any agent created resources.
+        /// </summary>
+        protected abstract void DoStopAgent();
+
+        /// <summary>
+        /// Called from the Agent background thread to allow the concrete agent implementation
+        /// to perform its discovery of new services.  
+        /// </summary>
+        protected abstract void DoDiscovery();
+
+        #endregion
+
+        public void Start()
+        {
+            if (started.CompareAndSet(false, true)) {                                      
+                DoStartAgent();
+
+                if (worker == null)
+                {
+                    Tracer.Info("Starting multicast discovery agent worker thread");
+                    worker = new Thread(new ThreadStart(DiscoveryAgentRun));
+                    worker.IsBackground = true;
+                    worker.Start();
+                }
+
+                DoAdvertizeSelf();
+            }
+        }
+
+        public void Stop()
+        {
+            // Changing the isStarted flag will signal the thread that it needs to shut down.
+            if (started.CompareAndSet(true, false))
+            {
+                DoStopAgent();
+                if(worker != null)
+                {
+                    // wait for the worker to stop.
+                    if(!worker.Join(WORKER_KILL_TIME_SECONDS))
+                    {
+                        Tracer.Info("!! Timeout waiting for discovery agent localThread to stop");
+                        worker.Abort();
+                    }
+                    worker = null;
+                    Tracer.Debug("Multicast discovery agent worker thread stopped");
+                }
+                executor.Shutdown();
+                if (!executor.AwaitTermination(TimeSpan.FromMinutes(1)))
+                {
+                    Tracer.DebugFormat("Failed to properly shutdown agent executor {0}", this);
+                }
+            }
+        }
+
+        public void Dispose()
+        {
+            if (started.Value)
+            {
+                Stop();
+            }
+        }
+
+        public void RegisterService(String name)
+        {
+            this.selfService = name;
+            if (started.Value)
+            {
+                try 
+                {
+                    DoAdvertizeSelf();
+                } 
+                catch (Exception e) 
+                {
+                    // If a the advertise fails, chances are all subsequent sends will fail
+                    // too.. No need to keep reporting the same error over and over.
+                    if (reportAdvertizeFailed) 
+                    {
+                        reportAdvertizeFailed = false;
+                        Tracer.ErrorFormat("Failed to advertise our service: {0} cause: {1}", selfService, e.Message);
+                    }
+                }
+            }
+        }
+
+        public void ServiceFailed(DiscoveryEvent failedEvent)
+        {
+            DiscoveredServiceData data = null;
+            discoveredServices.TryGetValue(failedEvent.ServiceName, out data);
+            if (data != null && MarkFailed(data)) 
+            {
+                FireServiceRemoveEvent(data);
+            }
+        }
+
+        protected void FireServiceRemoveEvent(DiscoveryEvent data) 
+        {
+            if (serviceRemoveHandler != null && started.Value) 
+            {
+                // Have the listener process the event async so that
+                // he does not block this thread since we are doing time sensitive
+                // processing of events.
+                executor.QueueUserWorkItem(ServiceRemoveCallback, data);
+            }
+        }
+
+        private void ServiceRemoveCallback(object data)
+        {
+            DiscoveryEvent serviceData = data as DiscoveryEvent;
+            this.serviceRemoveHandler(serviceData);
+        }
+
+        protected void FireServiceAddEvent(DiscoveryEvent data) 
+        {
+            if (serviceAddHandler != null && started.Value) 
+            {
+                // Have the listener process the event async so that
+                // he does not block this thread since we are doing time sensitive
+                // processing of events.
+                executor.QueueUserWorkItem(ServiceAddCallback, data);
+            }
+        }
+
+        private void ServiceAddCallback(object data)
+        {
+            DiscoveryEvent serviceData = data as DiscoveryEvent;
+            this.serviceAddHandler(serviceData);
+        }
+
+        private void DiscoveryAgentRun()
+        {
+            Thread.CurrentThread.Name = "Discovery Agent Thread.";
+            while (started.Value)
+            {
+                DoTimeKeepingServices();
+                try
+                {
+                    DoDiscovery();
+                }
+                catch (ThreadInterruptedException)
+                {
+                    return;
+                }
+                catch (Exception)
+                {
+                }
+            }
+        }
+
+        private void DoTimeKeepingServices() 
+        {
+            if (started.Value)
+            {
+                DateTime currentTime = DateTime.Now;
+                if (currentTime < LastAdvertizeTime || 
+                    ((currentTime - TimeSpan.FromMilliseconds(KeepAliveInterval)) > LastAdvertizeTime))
+                {
+                    DoAdvertizeSelf();
+                    LastAdvertizeTime = currentTime;
+                }
+                DoExpireOldServices();
+            }
+        }
+
+        private void DoExpireOldServices() 
+        {
+            DateTime expireTime = DateTime.Now - TimeSpan.FromMilliseconds(KeepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH); 
+
+            DiscoveredServiceData[] services = null;
+            lock (discoveredServicesLock)
+            {
+                services = new DiscoveredServiceData[this.discoveredServices.Count];
+                this.discoveredServices.Values.CopyTo(services, 0);
+            }
+
+            foreach(DiscoveredServiceData service in services)
+            {
+                if (service.LastHeartBeat < expireTime) 
+                {
+                    ProcessDeadService(service.ServiceName);
+                }
+            }
+        }
+
+        protected void ProcessLiveService(string brokerName, string service)
+        {
+            if (SelfService == null || !service.Equals(SelfService)) 
+            {
+                DiscoveredServiceData remoteBroker = null;
+                lock (discoveredServicesLock)
+                {
+                    discoveredServices.TryGetValue(service, out remoteBroker);
+                }
+                if (remoteBroker == null) 
+                {
+                    remoteBroker = new DiscoveredServiceData(brokerName, service);
+                    discoveredServices.Add(service, remoteBroker);      
+                    FireServiceAddEvent(remoteBroker);
+                    DoAdvertizeSelf();
+                } 
+                else 
+                {
+                    UpdateHeartBeat(remoteBroker);
+                    if (IsTimeForRecovery(remoteBroker)) 
+                    {
+                        FireServiceAddEvent(remoteBroker);
+                    }
+                }
+            }
+        }
+
+        protected void ProcessDeadService(string service) 
+        {
+            if (!service.Equals(SelfService)) 
+            {
+                DiscoveredServiceData remoteBroker = null;
+                lock (discoveredServicesLock)
+                {
+                    discoveredServices.TryGetValue(service, out remoteBroker);
+                    if (remoteBroker != null)
+                    {
+                        discoveredServices.Remove(service);
+                    }
+                }
+                if (remoteBroker != null && !remoteBroker.Failed) 
+                {
+                    FireServiceRemoveEvent(remoteBroker);
+                }
+            }
+        }
+
+        #region DiscoveredServiceData maintenance methods
+
+        /// <summary>
+        /// Returns true if this Broker has been marked as failed and it is now time to
+        /// start a recovery attempt.
+        /// </summary>
+        public bool IsTimeForRecovery(DiscoveredServiceData service) 
+        {
+            lock (service.SyncRoot)
+            {
+                if (!service.Failed) 
+                {
+                    return false;
+                }
+
+                int maxReconnectAttempts = MaxReconnectAttempts;
+
+                // Are we done trying to recover this guy?
+                if (maxReconnectAttempts > 0 && service.FailureCount > maxReconnectAttempts) 
+                {
+                    Tracer.DebugFormat("Max reconnect attempts of the {0} service has been reached.", service.ServiceName);
+                    return false;
+                }
+
+                // Is it not yet time?
+                if (DateTime.Now < service.RecoveryTime) 
+                {
+                    return false;
+                }
+
+                Tracer.DebugFormat("Resuming event advertisement of the {0} service.", service.ServiceName);
+
+                service.Failed = false;
+                return true;
+            }
+        }
+
+        internal void UpdateHeartBeat(DiscoveredServiceData service)
+        {
+            lock (service.SyncRoot)
+            {
+                service.LastHeartBeat = DateTime.Now;
+
+                // Consider that the broker recovery has succeeded if it has not failed in 60 seconds.
+                if (!service.Failed && service.FailureCount > 0 && 
+                    (service.LastHeartBeat - service.RecoveryTime) > TimeSpan.FromMilliseconds(1000 * 60)) {
+
+                    Tracer.DebugFormat("I now think that the {0} service has recovered.", service.ServiceName);
+
+                    service.FailureCount = 0;
+                    service.RecoveryTime = DateTime.MinValue;
+                }
+            }
+        }
+
+        internal bool MarkFailed(DiscoveredServiceData service)
+        {
+            lock (service.SyncRoot)
+            {
+                if (!service.Failed) 
+                {
+                    service.Failed = true;
+                    service.FailureCount++;
+
+                    long reconnectDelay = 0;
+                    if (!UseExponentialBackOff) 
+                    {
+                        reconnectDelay = InitialReconnectDelay;
+                    } 
+                    else 
+                    {
+                        reconnectDelay = (long)Math.Pow(BackOffMultiplier, service.FailureCount);
+                        reconnectDelay = Math.Min(reconnectDelay, MaxReconnectDelay);
+                    }
+
+                    Tracer.DebugFormat("Remote failure of {0} while still receiving multicast advertisements.  " +
+                                       "Advertising events will be suppressed for {1} ms, the current " +
+                                       "failure count is: {2}", 
+                                       service.ServiceName, reconnectDelay, service.FailureCount);
+
+                    service.RecoveryTime = DateTime.Now + TimeSpan.FromMilliseconds(reconnectDelay);
+                    return true;
+                }
+            }
+            return false;            
+        }
+
+        #endregion
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/AbstractDiscoveryAgent.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/DiscoveredServiceData.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/DiscoveredServiceData.cs?rev=1530315&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/DiscoveredServiceData.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/DiscoveredServiceData.cs Tue Oct  8 15:45:14 2013
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.Transport.Discovery
+{
+    /// <summary>
+    /// Discovered service data event object.  Used to contain information on the
+    /// services that an agent discovers and track heartbeat and other service
+    /// events used to determine if a service has failed or timed out due to a
+    /// lack of recent reporting.
+    /// </summary>
+    public class DiscoveredServiceData : DiscoveryEvent
+    {
+        private DateTime recoveryTime = DateTime.MinValue;
+        private int failureCount;
+        private bool failed;
+        private DateTime lastHeartBeat;
+
+        private readonly object syncRoot = new object();
+
+        public DiscoveredServiceData(string brokerName, string serviceName) : base()
+        {
+            this.BrokerName = brokerName;
+            this.ServiceName = serviceName;
+            this.lastHeartBeat = DateTime.Now;
+        }
+
+        public DiscoveredServiceData(string serviceName) : base()
+        {
+            this.ServiceName = serviceName;
+            this.lastHeartBeat = DateTime.Now;
+        }
+
+        internal object SyncRoot
+        {
+            get { return this.syncRoot; }
+        }
+
+        internal bool Failed
+        {
+            get { return this.failed; }
+            set { this.failed = value; }
+        }
+
+        internal int FailureCount
+        {
+            get { return this.failureCount; }
+            set { this.failureCount = value; }
+        }
+
+        internal DateTime LastHeartBeat
+        {
+            get { return this.lastHeartBeat; }
+            set { this.lastHeartBeat = value; }
+        }
+
+        internal DateTime RecoveryTime
+        {
+            get { return this.recoveryTime; }
+            set { this.recoveryTime = value; }
+        }
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/DiscoveredServiceData.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs?rev=1530315&r1=1530314&r2=1530315&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/Multicast/MulticastDiscoveryAgent.cs Tue Oct  8 15:45:14 2013
@@ -27,62 +27,34 @@ using Apache.NMS.ActiveMQ.Commands;
 
 namespace Apache.NMS.ActiveMQ.Transport.Discovery.Multicast
 {
-	internal delegate void NewBrokerServiceFound(string brokerName, string serviceName);
-	internal delegate void BrokerServiceRemoved(string brokerName, string serviceName);
-
-	internal class MulticastDiscoveryAgent : IDiscoveryAgent, IDisposable
+	internal class MulticastDiscoveryAgent : AbstractDiscoveryAgent
 	{
         public const string DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";
         public const string DEFAULT_HOST_STR = "default"; 
         public const string DEFAULT_HOST_IP = "239.255.2.3";
         public const int DEFAULT_PORT = 6155;
 
-		public const int DEFAULT_INITIAL_RECONNECT_DELAY = 1000 * 5;
-        public const int DEFAULT_BACKOFF_MULTIPLIER = 2;
-        public const int DEFAULT_MAX_RECONNECT_DELAY = 1000 * 30;
-
 		private const string TYPE_SUFFIX = "ActiveMQ-4.";
 		private const string ALIVE = "alive";
 		private const string DEAD = "dead";
 		private const char DELIMITER = '%';
 		private const int BUFF_SIZE = 8192;
-		private const int HEARTBEAT_MISS_BEFORE_DEATH = 10;
 		private const int DEFAULT_IDLE_TIME = 500;
         private const string DEFAULT_GROUP = "default";
-        private const int WORKER_KILL_TIME_SECONDS = 1000;
 
         private const int MAX_SOCKET_CONNECTION_RETRY_ATTEMPS = 3;
         private const int SOCKET_CONNECTION_BACKOFF_TIME = 500;
 
-        private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
-        private long maxReconnectDelay = DEFAULT_MAX_RECONNECT_DELAY;
-        private long backOffMultiplier = DEFAULT_BACKOFF_MULTIPLIER;
-        private bool useExponentialBackOff;
-        private int maxReconnectAttempts;
-
         private int timeToLive = 1;
         private string group = DEFAULT_GROUP;
         private bool loopBackMode;
-        private Dictionary<String, RemoteBrokerData> brokersByService = 
-            new Dictionary<String, RemoteBrokerData>();
-        private readonly object servicesLock = new object();
-        private String selfService;
         private long keepAliveInterval = DEFAULT_IDLE_TIME;
         private string mcInterface;
         private string mcNetworkInterface;
         private string mcJoinNetworkInterface;
-        private DateTime lastAdvertizeTime;
-        private bool reportAdvertizeFailed = true;
-        private readonly Atomic<bool> started = new Atomic<bool>(false);
 
-        private Uri discoveryUri;
 		private Socket multicastSocket;
 		private IPEndPoint endPoint;
-		private Thread worker;
-        private readonly ThreadPoolExecutor executor = new ThreadPoolExecutor();
-
-        private ServiceAddHandler serviceAddHandler;
-        private ServiceRemoveHandler serviceRemoveHandler;
 
         #region Property Setters and Getters
 
@@ -98,12 +70,6 @@ namespace Apache.NMS.ActiveMQ.Transport.
             set { this.timeToLive = value; }
         }
 
-        public long KeepAliveInterval
-        {
-            get { return this.keepAliveInterval; }
-            set { this.keepAliveInterval = value; }
-        }
-
         public string Interface
         {
             get { return this.mcInterface; }
@@ -127,194 +93,89 @@ namespace Apache.NMS.ActiveMQ.Transport.
             get { return this.group + "." + TYPE_SUFFIX; }
         }
 
-        public long BackOffMultiplier
-        {
-            get { return this.backOffMultiplier; }
-            set { this.backOffMultiplier = value; }
-        }
-
-        public long InitialReconnectDelay
-        {
-            get { return this.initialReconnectDelay; }
-            set { this.initialReconnectDelay = value; }
-        }
-
-        public int MaxReconnectAttempts
-        {
-            get { return this.maxReconnectAttempts; }
-            set { this.maxReconnectAttempts = value; }
-        }
-
-        public long MaxReconnectDelay
-        {
-            get { return this.maxReconnectDelay; }
-            set { this.maxReconnectDelay = value; }
-        }
-
-        public bool UseExponentialBackOff
-        {
-            get { return this.useExponentialBackOff; }
-            set { this.useExponentialBackOff = value; }
-        }
-
-        public string Group
+        public override long KeepAliveInterval
         {
-            get { return this.group; }
-            set { this.group = value; }
-        }
-
-        public ServiceAddHandler ServiceAdd
-        {
-            get { return serviceAddHandler; }
-            set { this.serviceAddHandler = value; }
-        }
-
-        public ServiceRemoveHandler ServiceRemove
-        {
-            get { return serviceRemoveHandler; }
-            set { this.serviceRemoveHandler = value; }
-        }
-
-        public Uri DiscoveryURI
-        {
-            get { return discoveryUri; }
-            set { discoveryUri = value; }
-        }
-
-        public bool IsStarted
-        {
-            get { return started.Value; }
+            get { return this.keepAliveInterval; }
+            set { this.keepAliveInterval = value; }
         }
 
         #endregion
 
         public override String ToString()
         {
-            return "MulticastDiscoveryAgent-" + (selfService != null ? "advertise:" + selfService : "");
+            return "MulticastDiscoveryAgent-" + (SelfService != null ? "advertise:" + SelfService : "");
         }
 
-        public void RegisterService(String name)
-        {
-            this.selfService = name;
-            if (started.Value)
+		protected override void DoStartAgent()
+		{
+            if (String.IsNullOrEmpty(group)) 
             {
-                DoAdvertizeSelf();
+                throw new IOException("You must specify a group to discover");
             }
-        }
-
-        public void ServiceFailed(DiscoveryEvent failedEvent)
-        {
-            RemoteBrokerData data = brokersByService[failedEvent.ServiceName];
-            if (data != null && data.MarkFailed()) 
+            String type = Type;
+            if (!type.EndsWith(".")) 
             {
-                FireServiceRemoveEvent(data);
+                Tracer.Warn("The type '" + type + "' should end with '.' to be a valid Discovery type");
+                type += ".";
+            }
+            
+            if (DiscoveryURI == null) 
+            {
+                DiscoveryURI = new Uri(DEFAULT_DISCOVERY_URI_STRING);
             }
-        }
-
-		public void Start()
-		{
-            if (started.CompareAndSet(false, true)) {           
-                            
-                if (String.IsNullOrEmpty(group)) 
-                {
-                    throw new IOException("You must specify a group to discover");
-                }
-                String type = Type;
-                if (!type.EndsWith(".")) 
-                {
-                    Tracer.Warn("The type '" + type + "' should end with '.' to be a valid Discovery type");
-                    type += ".";
-                }
-                
-                if (discoveryUri == null) 
-                {
-                    discoveryUri = new Uri(DEFAULT_DISCOVERY_URI_STRING);
-                }
-
-                if (Tracer.IsDebugEnabled) 
-                {
-                    Tracer.Debug("start - discoveryURI = " + discoveryUri);                              
-                }
-
-                String targetHost = discoveryUri.Host;
-                int targetPort = discoveryUri.Port;
-                     
-                if (DEFAULT_HOST_STR.Equals(targetHost)) 
-                {
-                    targetHost = DEFAULT_HOST_IP;                     
-                }
 
-                if (targetPort < 0) 
-                {
-                    targetPort = DEFAULT_PORT;              
-                }
-                  
-                if (Tracer.IsDebugEnabled) 
-                {
-                    Tracer.DebugFormat("start - myHost = {0}", targetHost); 
-                    Tracer.DebugFormat("start - myPort = {0}", targetPort);    
-                    Tracer.DebugFormat("start - group  = {0}", group);                    
-                    Tracer.DebugFormat("start - interface  = {0}", mcInterface);
-                    Tracer.DebugFormat("start - network interface  = {0}", mcNetworkInterface);
-                    Tracer.DebugFormat("start - join network interface  = {0}", mcJoinNetworkInterface);
-                } 
+            if (Tracer.IsDebugEnabled) 
+            {
+                Tracer.Debug("start - discoveryURI = " + DiscoveryURI);                              
+            }
 
-                int numFailedAttempts = 0;
-                int backoffTime = SOCKET_CONNECTION_BACKOFF_TIME;
+            String targetHost = DiscoveryURI.Host;
+            int targetPort = DiscoveryURI.Port;
+                 
+            if (DEFAULT_HOST_STR.Equals(targetHost)) 
+            {
+                targetHost = DEFAULT_HOST_IP;                     
+            }
 
-                Tracer.Info("Connecting to multicast discovery socket.");
-                while (!TryToConnectSocket(targetHost, targetPort))
-                {
-                    numFailedAttempts++;
-                    if (numFailedAttempts > MAX_SOCKET_CONNECTION_RETRY_ATTEMPS)
-                    {
-                        throw new ApplicationException(
-                            "Could not open the socket in order to discover advertising brokers.");
-                    }
+            if (targetPort < 0) 
+            {
+                targetPort = DEFAULT_PORT;              
+            }
+              
+            if (Tracer.IsDebugEnabled) 
+            {
+                Tracer.DebugFormat("start - myHost = {0}", targetHost); 
+                Tracer.DebugFormat("start - myPort = {0}", targetPort);    
+                Tracer.DebugFormat("start - group  = {0}", group);                    
+                Tracer.DebugFormat("start - interface  = {0}", mcInterface);
+                Tracer.DebugFormat("start - network interface  = {0}", mcNetworkInterface);
+                Tracer.DebugFormat("start - join network interface  = {0}", mcJoinNetworkInterface);
+            } 
 
-                    Thread.Sleep(backoffTime);
-                    backoffTime = (int)(backoffTime * BackOffMultiplier);
-                }
+            int numFailedAttempts = 0;
+            int backoffTime = SOCKET_CONNECTION_BACKOFF_TIME;
 
-                if(worker == null)
+            Tracer.Info("Connecting to multicast discovery socket.");
+            while (!TryToConnectSocket(targetHost, targetPort))
+            {
+                numFailedAttempts++;
+                if (numFailedAttempts > MAX_SOCKET_CONNECTION_RETRY_ATTEMPS)
                 {
-                    Tracer.Info("Starting multicast discovery agent worker thread");
-                    worker = new Thread(new ThreadStart(DiscoveryAgentRun));
-                    worker.IsBackground = true;
-                    worker.Start();
+                    throw new ApplicationException(
+                        "Could not open the socket in order to discover advertising brokers.");
                 }
 
-                DoAdvertizeSelf();
+                Thread.Sleep(backoffTime);
+                backoffTime = (int)(backoffTime * BackOffMultiplier);
             }
 		}
 
-		public void Stop()
+		protected override void DoStopAgent()
 		{
-            // Changing the isStarted flag will signal the thread that it needs to shut down.
-            if (started.CompareAndSet(true, false))
-			{
-				Tracer.Info("Stopping multicast discovery agent worker thread");
-                if (multicastSocket != null)
-                {
-                    multicastSocket.Close();
-                }
-                if(worker != null)
-                {
-                    // wait for the worker to stop.
-                    if(!worker.Join(WORKER_KILL_TIME_SECONDS))
-                    {
-                        Tracer.Info("!! Timeout waiting for multicast discovery agent localThread to stop");
-                        worker.Abort();
-                    }
-                    worker = null;
-                    Tracer.Debug("Multicast discovery agent worker thread stopped");
-                }
-                executor.Shutdown();
-                if (!executor.AwaitTermination(TimeSpan.FromMinutes(1)))
-                {
-                    Tracer.DebugFormat("Failed to properly shutdown agent executor {0}", this);
-                }
-			}
+            if (multicastSocket != null)
+            {
+                multicastSocket.Close();
+            }
 		}
 
 		private bool TryToConnectSocket(string targetHost, int targetPort)
@@ -379,175 +240,72 @@ namespace Apache.NMS.ActiveMQ.Transport.
 			return hasSucceeded;
 		}
 
-		private void DiscoveryAgentRun()
+		protected override void DoDiscovery()
 		{
-			Thread.CurrentThread.Name = "Discovery Agent Thread.";
 			byte[] buffer = new byte[BUFF_SIZE];
 			string receivedInfoRaw;
 			string receivedInfo;
 
-			while (started.Value)
+			try
 			{
-                DoTimeKeepingServices();
-				try
+				int numBytes = multicastSocket.Receive(buffer);
+				receivedInfoRaw = System.Text.Encoding.UTF8.GetString(buffer, 0, numBytes);
+				// We have to remove all of the null bytes if there are any otherwise we just
+				// take the whole string as is.
+				if (receivedInfoRaw.IndexOf("\0") != -1)
 				{
-					int numBytes = multicastSocket.Receive(buffer);
-					receivedInfoRaw = System.Text.Encoding.UTF8.GetString(buffer, 0, numBytes);
-					// We have to remove all of the null bytes if there are any otherwise we just
-					// take the whole string as is.
-					if (receivedInfoRaw.IndexOf("\0") != -1)
-					{
-						receivedInfo = receivedInfoRaw.Substring(0, receivedInfoRaw.IndexOf("\0"));
-					}
-					else
-					{
-						receivedInfo = receivedInfoRaw;
-					}
-
-					ProcessServiceAdvertisement(receivedInfo);
+					receivedInfo = receivedInfoRaw.Substring(0, receivedInfoRaw.IndexOf("\0"));
 				}
-				catch(SocketException)
+				else
 				{
-					// There was no multicast message sent before the timeout expired...Let us try again.
+					receivedInfo = receivedInfoRaw;
 				}
 
-				// We need to clear the buffer.
-				buffer[0] = 0x0;
+				ProcessServiceAdvertisement(receivedInfo);
 			}
-		}
-
-		private void ProcessServiceAdvertisement(string message)
-		{
-			string payload;
-			string brokerName;
-			string serviceName;
-
-			if (message.StartsWith(Type))
+			catch (SocketException)
 			{
-				payload = message.Substring(Type.Length);
-				brokerName = GetBrokerName(payload);
-				serviceName = GetServiceName(payload);
-
-				if (payload.StartsWith(ALIVE))
-				{
-					ProcessAlive(brokerName, serviceName);
-				}
-				else if (payload.StartsWith(DEAD))
-				{
-					ProcessDead(serviceName);
-				}
-				else
-				{
-					// Malformed Payload
-				}
+				// There was no multicast message sent before the timeout expired...Let us try again.
 			}
 		}
 
-        private void DoTimeKeepingServices() 
-        {
-            if (started.Value)
-            {
-                DateTime currentTime = DateTime.Now;
-                if (currentTime < lastAdvertizeTime || 
-                    ((currentTime - TimeSpan.FromMilliseconds(keepAliveInterval)) > lastAdvertizeTime))
-                {
-                    DoAdvertizeSelf();
-                    lastAdvertizeTime = currentTime;
-                }
-                DoExpireOldServices();
-            }
-        }
-
-        private void DoAdvertizeSelf()
+        protected override void DoAdvertizeSelf()
         {
-            if (!String.IsNullOrEmpty(selfService)) 
+            if (!String.IsNullOrEmpty(SelfService)) 
             {
                 String payload = Type;
                 payload += started.Value ? ALIVE : DEAD;
                 payload += DELIMITER + "localhost" + DELIMITER;
-                payload += selfService;
-                try 
-                {
-                    byte[] data = System.Text.Encoding.UTF8.GetBytes(payload);
-                    multicastSocket.Send(data);
-                } 
-                catch (Exception e) 
-                {
-                    // If a send fails, chances are all subsequent sends will fail
-                    // too.. No need to keep reporting the
-                    // same error over and over.
-                    if (reportAdvertizeFailed) 
-                    {
-                        reportAdvertizeFailed = false;
-                        Tracer.ErrorFormat("Failed to advertise our service: {0} cause: {1}", payload, e.Message);
-                    }
-                }
-            }
-        }
+                payload += SelfService;
 
-        private void ProcessAlive(string brokerName, string service)
-        {
-            if (selfService == null || !service.Equals(selfService)) 
-            {
-                RemoteBrokerData remoteBroker = null;
-                lock (servicesLock)
-                {
-                    brokersByService.TryGetValue(service, out remoteBroker);
-                }
-                if (remoteBroker == null) 
-                {
-                    remoteBroker = new RemoteBrokerData(this, brokerName, service);
-                    brokersByService.Add(service, remoteBroker);      
-                    FireServiceAddEvent(remoteBroker);
-                    DoAdvertizeSelf();
-                } 
-                else 
-                {
-                    remoteBroker.UpdateHeartBeat();
-                    if (remoteBroker.IsTimeForRecovery()) 
-                    {
-                        FireServiceAddEvent(remoteBroker);
-                    }
-                }
+                byte[] data = System.Text.Encoding.UTF8.GetBytes(payload);
+                multicastSocket.Send(data);
             }
         }
 
-        private void ProcessDead(string service) 
+        private void ProcessServiceAdvertisement(string message)
         {
-            if (!service.Equals(selfService)) 
+            string payload;
+            string brokerName;
+            string serviceName;
+
+            if (message.StartsWith(Type))
             {
-                RemoteBrokerData remoteBroker = null;
-                lock (servicesLock)
+                payload = message.Substring(Type.Length);
+                brokerName = GetBrokerName(payload);
+                serviceName = GetServiceName(payload);
+
+                if (payload.StartsWith(ALIVE))
                 {
-                    brokersByService.TryGetValue(service, out remoteBroker);
-                    if (remoteBroker != null)
-                    {
-                        brokersByService.Remove(service);
-                    }
+                    ProcessLiveService(brokerName, serviceName);
                 }
-                if (remoteBroker != null && !remoteBroker.Failed) 
+                else if (payload.StartsWith(DEAD))
                 {
-                    FireServiceRemoveEvent(remoteBroker);
+                    ProcessDeadService(serviceName);
                 }
-            }
-        }
-
-        private void DoExpireOldServices() 
-        {
-            DateTime expireTime = DateTime.Now - TimeSpan.FromMilliseconds(keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH); 
-
-            RemoteBrokerData[] services = null;
-            lock (servicesLock)
-            {
-                services = new RemoteBrokerData[this.brokersByService.Count];
-                this.brokersByService.Values.CopyTo(services, 0);
-            }
-
-            foreach(RemoteBrokerData service in services)
-            {
-                if (service.LastHeartBeat < expireTime) 
+                else
                 {
-                    ProcessDead(service.ServiceName);
+                    // Malformed Payload
                 }
             }
         }
@@ -571,167 +329,5 @@ namespace Apache.NMS.ActiveMQ.Transport.
             }
             return null;
 		}
-
-		public void Dispose()
-		{
-			if(started.Value)
-			{
-				Stop();
-			}
-		}
-
-        private void FireServiceRemoveEvent(RemoteBrokerData data) 
-        {
-            if (serviceRemoveHandler != null && started.Value) 
-            {
-                // Have the listener process the event async so that
-                // he does not block this thread since we are doing time sensitive
-                // processing of events.
-                executor.QueueUserWorkItem(ServiceRemoveCallback, data);
-            }
-        }
-
-        private void ServiceRemoveCallback(object data)
-        {
-            RemoteBrokerData serviceData = data as RemoteBrokerData;
-            this.serviceRemoveHandler(serviceData);
-        }
-
-        private void FireServiceAddEvent(RemoteBrokerData data) 
-        {
-            if (serviceAddHandler != null && started.Value) 
-            {
-                // Have the listener process the event async so that
-                // he does not block this thread since we are doing time sensitive
-                // processing of events.
-                executor.QueueUserWorkItem(ServiceAddCallback, data);
-            }
-        }
-
-        private void ServiceAddCallback(object data)
-        {
-            RemoteBrokerData serviceData = data as RemoteBrokerData;
-            this.serviceAddHandler(serviceData);
-        }
-
-		internal class RemoteBrokerData : DiscoveryEvent
-		{
-            internal DateTime recoveryTime = DateTime.MinValue;
-            internal int failureCount;
-            internal bool failed;
-			internal DateTime lastHeartBeat;
-
-            private readonly object syncRoot = new object();
-            private readonly MulticastDiscoveryAgent parent;
-
-			internal RemoteBrokerData(MulticastDiscoveryAgent parent, string brokerName, string serviceName) : base()
-			{
-                this.parent = parent;
-				this.BrokerName = brokerName;
-				this.ServiceName = serviceName;
-				this.lastHeartBeat = DateTime.Now;
-			}
-
-            internal bool Failed
-            {
-                get { return this.failed; }
-            }
-
-            internal DateTime LastHeartBeat
-            {
-                get 
-                { 
-                    lock(syncRoot)
-                    {
-                        return this.lastHeartBeat; 
-                    }
-                }
-            }
-
-			internal void UpdateHeartBeat()
-			{
-                lock (syncRoot)
-                {
-                    this.lastHeartBeat = DateTime.Now;
-
-                    // Consider that the broker recovery has succeeded if it has not
-                    // failed in 60 seconds.
-                    if (!failed && failureCount > 0 && 
-                        (lastHeartBeat - recoveryTime) > TimeSpan.FromMilliseconds(1000 * 60)) {
-
-                        Tracer.DebugFormat("I now think that the {0} service has recovered.", ServiceName);
-
-                        failureCount = 0;
-                        recoveryTime = DateTime.MinValue;
-                    }
-                }
-			}
-
-            internal bool MarkFailed()
-            {
-                lock (syncRoot)
-                {
-                    if (!failed) 
-                    {
-                        failed = true;
-                        failureCount++;
-
-                        long reconnectDelay;
-                        if (!parent.UseExponentialBackOff) 
-                        {
-                            reconnectDelay = parent.InitialReconnectDelay;
-                        } 
-                        else 
-                        {
-                            reconnectDelay = (long)Math.Pow(parent.BackOffMultiplier, failureCount);
-                            reconnectDelay = Math.Min(reconnectDelay, parent.MaxReconnectDelay);
-                        }
-
-                        Tracer.DebugFormat("Remote failure of {0} while still receiving multicast advertisements.  " +
-                                           "Advertising events will be suppressed for {1} ms, the current " +
-                                           "failure count is: {2}", ServiceName, reconnectDelay, failureCount);
-
-                        recoveryTime = DateTime.Now + TimeSpan.FromMilliseconds(reconnectDelay);
-                        return true;
-                    }
-                }
-                return false;            
-            }
-
-            /// <summary>
-            /// Returns true if this Broker has been marked as failed and it is now time to
-            /// start a recovery attempt.
-            /// </summary>
-            public bool IsTimeForRecovery() 
-            {
-                lock (syncRoot)
-                {
-                    if (!failed) 
-                    {
-                        return false;
-                    }
-
-                    int maxReconnectAttempts = parent.MaxReconnectAttempts;
-
-                    // Are we done trying to recover this guy?
-                    if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) 
-                    {
-                        Tracer.DebugFormat("Max reconnect attempts of the {0} service has been reached.", ServiceName);
-                        return false;
-                    }
-
-                    // Is it not yet time?
-                    if (DateTime.Now < recoveryTime) 
-                    {
-                        return false;
-                    }
-
-                    Tracer.DebugFormat("Resuming event advertisement of the {0} service.", ServiceName);
-
-                    failed = false;
-                    return true;
-                }
-            }
-		}
 	}
 }

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/http/HttpDiscoveryAgent.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/http/HttpDiscoveryAgent.cs?rev=1530315&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/http/HttpDiscoveryAgent.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/http/HttpDiscoveryAgent.cs Tue Oct  8 15:45:14 2013
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Net;
+using System.Threading;
+using System.IO;
+using System.Collections.Generic;
+using Apache.NMS.Util;
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ.Transport.Discovery.Http
+{
+	public class HttpDiscoveryAgent : AbstractDiscoveryAgent, ISuspendable
+	{
+        private enum UpdateState
+        {
+            SUSPENDED,
+            RESUMING,
+            RESUMED
+        }
+
+        public const string DEFAULT_DISCOVERY_URI_STRING = "http://localhost:8080/discovery-registry";
+        public const string DEFAULT_GROUP = "default";
+
+        private readonly object updateMutex = new object();
+        private UpdateState state = UpdateState.RESUMED;
+        private const int DEFAULT_UPDATE_INTERVAL = 10 * 1000;
+        private long keepAliveInterval = DEFAULT_UPDATE_INTERVAL;
+
+		public HttpDiscoveryAgent()
+		{
+            this.Group = DEFAULT_GROUP;
+		}
+
+        #region Property setters and getters
+
+        public override long KeepAliveInterval
+        {
+            get { return this.keepAliveInterval; }
+            set { this.keepAliveInterval = value; }
+        }
+
+        #endregion
+
+        public override String ToString()
+        {
+            return "HttpDiscoveryAgent-" + (SelfService != null ? "advertise:" + SelfService : "");
+        }
+
+        protected override void DoStartAgent()
+        {
+            if (DiscoveryURI == null || DiscoveryURI.Host.Equals("default")) 
+            {
+                DiscoveryURI = new Uri(DEFAULT_DISCOVERY_URI_STRING + "/" + Group);
+            }
+
+            if (Tracer.IsDebugEnabled) 
+            {
+                Tracer.DebugFormat("http agent started with discoveryURI = {0}", DiscoveryURI);                              
+            }
+        }
+
+        protected override void DoStopAgent()
+        {
+            // Ensure the worker thread exits its wait so it can detect the stopped event.
+            Resume();
+        }
+
+        public void Suspend()
+        {
+            Monitor.Enter(updateMutex);
+            try 
+            {
+                this.state = UpdateState.SUSPENDED;
+            }
+            finally
+            {
+                Monitor.Exit(updateMutex);
+            }
+        }
+
+        public void Resume()
+        {
+            Monitor.Enter(updateMutex);
+            try 
+            {
+                this.state = UpdateState.RESUMING;
+                Monitor.Pulse(updateMutex);
+            }
+            finally
+            {
+                Monitor.Exit(updateMutex);
+            }
+        }
+
+        protected override void DoDiscovery()
+        {
+            DoUpdate();
+            Monitor.Enter(updateMutex);
+            try
+            {
+                do 
+                {
+                    if (state == UpdateState.RESUMING) 
+                    {
+                        state = UpdateState.RESUMED;
+                    }
+                    else 
+                    {
+                        Monitor.Wait(updateMutex, TimeSpan.FromMilliseconds(KeepAliveInterval));
+                    }
+                }
+                while (state == UpdateState.SUSPENDED && started.Value);
+            }
+            finally
+            {
+                Monitor.Exit(updateMutex);
+            }
+        }
+
+        private void DoUpdate()
+        {
+            List<string> activeServices = DoLookup(KeepAliveInterval * 3);
+            // If there is error talking the the central server, then activeServices == null
+            if (activeServices != null) 
+            {
+                lock (discoveredServicesLock) 
+                {
+                    foreach(String service in activeServices)
+                    {
+                        Tracer.DebugFormat("Http discovery found live service: {0}", service);
+                        ProcessLiveService("", service);
+                    }
+                }
+            }
+        }
+
+        private List<string> DoLookup(long freshness) 
+        {
+            String url = DiscoveryURI + "?freshness=" + freshness;
+            try 
+            {
+                WebClient client = new WebClient();
+                string response = client.DownloadString(url);
+                if (response != null)
+                {
+                    Tracer.DebugFormat("GET to {0} got a {1}", url, response);
+                    List<string> rc = new List<string>();
+
+                    StringReader reader = new StringReader(response);
+                    while (true)
+                    {
+                        string line = reader.ReadLine();
+                        if (line == null)
+                        {
+                            break;
+                        }
+
+                        line = line.Trim();
+                        if (line.Length != 0 && !rc.Contains(line))
+                        {
+                            rc.Add(line);
+                        }
+                    }
+                    return rc;
+                }
+
+                Tracer.DebugFormat("GET to {0} failed to retrieve any services.", url);
+                return null;
+            } 
+            catch (Exception e)
+            {
+                Tracer.WarnFormat("GET to {0} failed with: {1}", url, e.Message);
+                return null;
+            }
+        }
+
+        protected override void DoAdvertizeSelf()
+        {
+            // TODO - Don't need this yet unless we want to do some testing.
+        }
+	}
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/http/HttpDiscoveryAgent.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/http/HttpDiscoveryAgentFactory.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/http/HttpDiscoveryAgentFactory.cs?rev=1530315&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/http/HttpDiscoveryAgentFactory.cs (added)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/http/HttpDiscoveryAgentFactory.cs Tue Oct  8 15:45:14 2013
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+using System;
+using System.Collections.Specialized;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.ActiveMQ.Transport.Discovery.Http
+{
+    [ActiveMQDiscoveryAgentFactory("http")]
+    public class HttpDiscoveryAgentFactory : IDiscoveryAgentFactory
+	{
+        public IDiscoveryAgent CreateAgent(Uri uri)
+        {
+            Tracer.DebugFormat("Creating DiscoveryAgent:[{0}]", uri);
+
+            try
+            {
+                HttpDiscoveryAgent agent = new HttpDiscoveryAgent();
+                agent.DiscoveryURI = uri;
+
+                // allow Agent's params to be set via query arguments  
+                // (e.g., http://localhost:8080?group=default
+
+                StringDictionary parameters = URISupport.ParseParameters(uri);
+                URISupport.SetProperties(agent, parameters);
+
+                return agent;
+            }
+            catch(Exception e)
+            {
+                throw new IOException("Could not create HTTP discovery agent", e);
+            }
+        }
+    }
+}
+

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Discovery/http/HttpDiscoveryAgentFactory.cs
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs
URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs?rev=1530315&r1=1530314&r2=1530315&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs (original)
+++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Failover/FailoverTransport.cs Tue Oct  8 15:45:14 2013
@@ -714,7 +714,7 @@ namespace Apache.NMS.ActiveMQ.Transport.
                         bool timedout = false;
                         while(transport == null && !disposed && connectionFailure == null)
                         {
-                            Tracer.Info("Waiting for transport to reconnect.");
+                            Tracer.Debug("Waiting for transport to reconnect.");
 
                             int elapsed = (int) (DateTime.Now - start).TotalMilliseconds;
                             if(this.timeout > 0 && elapsed > this.timeout)



Mime
View raw message