activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQCPP-511
Date Thu, 17 Oct 2013 18:31:33 GMT
Updated Branches:
  refs/heads/trunk d25147a5f -> f71491091


https://issues.apache.org/jira/browse/AMQCPP-511

Implements most of the basic bits of a Discovery Agent in an Abstract
class so that the agents can be simpler drop ins. 

Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/f7149109
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/f7149109
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/f7149109

Branch: refs/heads/trunk
Commit: f714910919e68cb27909c20dcb3b5bcd63ce0cac
Parents: d25147a
Author: Timothy Bish <tabish121@gmai.com>
Authored: Thu Oct 17 14:31:24 2013 -0400
Committer: Timothy Bish <tabish121@gmai.com>
Committed: Thu Oct 17 14:31:24 2013 -0400

----------------------------------------------------------------------
 activemq-cpp/src/main/Makefile.am               |   2 +
 .../discovery/AbstractDiscoveryAgent.cpp        | 503 +++++++++++++++++++
 .../discovery/AbstractDiscoveryAgent.h          | 178 +++++++
 .../discovery/DiscoveredBrokerData.cpp          |  12 +
 .../transport/discovery/DiscoveredBrokerData.h  |   5 +-
 .../transport/discovery/DiscoveryAgent.h        |   7 +
 .../transport/discovery/DiscoveryAgentFactory.h |   2 +-
 .../transport/discovery/DiscoveryListener.h     |   4 +-
 .../transport/discovery/DiscoveryTransport.cpp  |  10 +-
 .../transport/discovery/DiscoveryTransport.h    |   4 +-
 .../discovery/DiscoveryAgentRegistryTest.cpp    |   1 +
 .../discovery/DiscoveryTransportFactoryTest.cpp |   1 +
 12 files changed, 718 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/f7149109/activemq-cpp/src/main/Makefile.am
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/Makefile.am b/activemq-cpp/src/main/Makefile.am
index 0861789..a34c1c8 100644
--- a/activemq-cpp/src/main/Makefile.am
+++ b/activemq-cpp/src/main/Makefile.am
@@ -155,6 +155,7 @@ cc_sources = \
     activemq/transport/TransportFilter.cpp \
     activemq/transport/TransportRegistry.cpp \
     activemq/transport/correlator/ResponseCorrelator.cpp \
+    activemq/transport/discovery/AbstractDiscoveryAgent.cpp \
     activemq/transport/discovery/DiscoveredBrokerData.cpp \
     activemq/transport/discovery/DiscoveryAgent.cpp \
     activemq/transport/discovery/DiscoveryAgentFactory.cpp \
@@ -811,6 +812,7 @@ h_sources = \
     activemq/transport/TransportListener.h \
     activemq/transport/TransportRegistry.h \
     activemq/transport/correlator/ResponseCorrelator.h \
+    activemq/transport/discovery/AbstractDiscoveryAgent.h \
     activemq/transport/discovery/DiscoveredBrokerData.h \
     activemq/transport/discovery/DiscoveryAgent.h \
     activemq/transport/discovery/DiscoveryAgentFactory.h \

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/f7149109/activemq-cpp/src/main/activemq/transport/discovery/AbstractDiscoveryAgent.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/transport/discovery/AbstractDiscoveryAgent.cpp
b/activemq-cpp/src/main/activemq/transport/discovery/AbstractDiscoveryAgent.cpp
new file mode 100644
index 0000000..4bb0821
--- /dev/null
+++ b/activemq-cpp/src/main/activemq/transport/discovery/AbstractDiscoveryAgent.cpp
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <activemq/transport/discovery/AbstractDiscoveryAgent.h>
+
+#include <activemq/transport/discovery/DiscoveredBrokerData.h>
+#include <activemq/transport/discovery/DiscoveryListener.h>
+
+#include <decaf/net/URI.h>
+#include <decaf/lang/Thread.h>
+#include <decaf/util/HashMap.h>
+#include <decaf/util/concurrent/Mutex.h>
+#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
+#include <decaf/util/concurrent/ThreadPoolExecutor.h>
+#include <decaf/util/concurrent/TimeUnit.h>
+#include <decaf/util/concurrent/LinkedBlockingQueue.h>
+
+using namespace activemq;
+using namespace activemq::commands;
+using namespace activemq::transport;
+using namespace activemq::transport::discovery;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+using namespace decaf::net;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
+
+////////////////////////////////////////////////////////////////////////////////
+const int AbstractDiscoveryAgent::DEFAULT_INITIAL_RECONNECT_DELAY = 5000;
+const int AbstractDiscoveryAgent::DEFAULT_BACKOFF_MULTIPLIER = 2;
+const int AbstractDiscoveryAgent::DEFAULT_MAX_RECONNECT_DELAY = 30000;
+const int AbstractDiscoveryAgent::WORKER_KILL_TIME_SECONDS = 1000;
+const int AbstractDiscoveryAgent::HEARTBEAT_MISS_BEFORE_DEATH = 10;
+const int AbstractDiscoveryAgent::DEFAULT_KEEPALIVE_INTERVAL = 500;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace activemq {
+namespace transport {
+namespace discovery {
+
+    class AbstractDiscoveryAgentImpl {
+    private:
+
+        AbstractDiscoveryAgentImpl(const AbstractDiscoveryAgentImpl&);
+        AbstractDiscoveryAgentImpl& operator=(const AbstractDiscoveryAgentImpl&);
+
+    public:
+
+        long long initialReconnectDelay;
+        long long maxReconnectDelay;
+        long long backOffMultiplier;
+        bool useExponentialBackOff;
+        int maxReconnectAttempts;
+        long long keepAliveInterval;
+
+        AtomicBoolean started;
+        Pointer<Thread> worker;
+        Pointer<ThreadPoolExecutor> executor;
+
+        HashMap<std::string, Pointer<DiscoveredBrokerData> > discoveredServices;
+        Mutex discoveredServicesLock;
+
+        URI discoveryUri;
+        std::string selfService;
+        std::string group;
+        DiscoveryListener* listener;
+        long long lastAdvertizeTime;
+        bool reportAdvertizeFailed;
+
+    public:
+
+        AbstractDiscoveryAgentImpl() : initialReconnectDelay(AbstractDiscoveryAgent::DEFAULT_INITIAL_RECONNECT_DELAY),
+                                       maxReconnectDelay(AbstractDiscoveryAgent::DEFAULT_MAX_RECONNECT_DELAY),
+                                       backOffMultiplier(AbstractDiscoveryAgent::DEFAULT_BACKOFF_MULTIPLIER),
+                                       useExponentialBackOff(false),
+                                       maxReconnectAttempts(0),
+                                       keepAliveInterval(AbstractDiscoveryAgent::DEFAULT_KEEPALIVE_INTERVAL),
+                                       started(),
+                                       worker(),
+                                       executor(),
+                                       discoveredServices(),
+                                       discoveredServicesLock(),
+                                       discoveryUri(),
+                                       selfService(),
+                                       group("default"),
+                                       listener(),
+                                       lastAdvertizeTime(0),
+                                       reportAdvertizeFailed(true)
+        {}
+
+        Executor& getExecutor() {
+            if (executor == NULL) {
+                synchronized(&discoveredServicesLock) {
+                    if (executor == NULL) {
+                        executor.reset(
+                            new ThreadPoolExecutor(1, 1, 45, TimeUnit::SECONDS,
+                                new LinkedBlockingQueue<Runnable*>()));
+                    }
+                }
+            }
+            return *executor;
+        }
+
+        /**
+         * Returns true if this Broker has been marked as failed and it is now time to
+         * start a recovery attempt.
+         */
+        bool isTimeForRecovery(Pointer<DiscoveredBrokerData> service) {
+            synchronized(&discoveredServicesLock) {
+
+                if (!service->isFailed()) {
+                    return false;
+                }
+
+                int maxReconnectAttempts = maxReconnectAttempts;
+
+                // Are we done trying to recover this guy?
+                if (maxReconnectAttempts > 0 && service->getFailureCount()
> maxReconnectAttempts) {
+                    return false;
+                }
+
+                // Is it not yet time?
+                if (System::currentTimeMillis() < service->getNextRecoveryTime()) {
+                    return false;
+                }
+
+                service->setFailed(false);
+                return true;
+            }
+
+            return false;
+        }
+
+        void updateHeartBeat(Pointer<DiscoveredBrokerData> service) {
+            synchronized(&discoveredServicesLock) {
+
+                service->setLastHeartBeatTime(System::currentTimeMillis());
+
+                // Consider that the broker recovery has succeeded if it has not failed in
60 seconds.
+                if (!service->isFailed() && service->getFailureCount() >
0 &&
+                    (service->getLastHeartBeatTime() - service->getNextRecoveryTime())
> TimeUnit::MINUTES.toSeconds(60)) {
+
+                    service->setFailureCount(0);
+                    service->setNextRecoveryTime(System::currentTimeMillis());
+                }
+            }
+        }
+
+        bool markFailed(Pointer<DiscoveredBrokerData> service) {
+            synchronized(&discoveredServicesLock) {
+
+                if (!service->isFailed()) {
+                    service->setFailed(true);
+                    service->setFailureCount(service->getFailureCount() + 1);
+
+                    long
+                    reconnectDelay = 0;
+                    if (!useExponentialBackOff) {
+                        reconnectDelay = initialReconnectDelay;
+                    } else {
+                        reconnectDelay = (long) Math::pow((double)backOffMultiplier, (double)service->getFailureCount());
+                        reconnectDelay = Math::min(reconnectDelay, maxReconnectDelay);
+                    }
+
+                    service->setNextRecoveryTime(System::currentTimeMillis() + reconnectDelay);
+                    return true;
+                }
+            }
+            return false;
+        }
+    };
+
+    class ServiceAddedRunnable : public Runnable {
+    private:
+
+        AbstractDiscoveryAgent* agent;
+        Pointer<DiscoveredBrokerData> event;
+
+    public:
+
+        ServiceAddedRunnable(AbstractDiscoveryAgent* agent, Pointer<DiscoveredBrokerData>
event) :
+            Runnable(), agent(agent), event(event) {
+        }
+        virtual ~ServiceAddedRunnable();
+
+        virtual void run() {
+            DiscoveryListener* listener = agent->getDiscoveryListener();
+            if (listener != NULL) {
+                listener->onServiceAdd(event.get());
+            }
+        }
+    };
+
+    class ServiceRemovedRunnable : public Runnable {
+    private:
+
+        AbstractDiscoveryAgent* agent;
+        Pointer<DiscoveredBrokerData> event;
+
+    public:
+
+        ServiceRemovedRunnable(AbstractDiscoveryAgent* agent, Pointer<DiscoveredBrokerData>
event) :
+            Runnable(), agent(agent), event(event) {}
+        virtual ~ServiceRemovedRunnable();
+
+        virtual void run() {
+            DiscoveryListener* listener = agent->getDiscoveryListener();
+            if (listener != NULL) {
+                listener->onServiceRemove(event.get());
+            }
+        }
+    };
+
+}}}
+
+////////////////////////////////////////////////////////////////////////////////
+AbstractDiscoveryAgent::AbstractDiscoveryAgent() : DiscoveryAgent(), impl(new AbstractDiscoveryAgentImpl)
{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+AbstractDiscoveryAgent::~AbstractDiscoveryAgent() {
+    try {
+        delete this->impl;
+    }
+    DECAF_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::start() {
+    if (impl->started.compareAndSet(false, true)) {
+        doStart();
+
+        if (impl->worker == NULL) {
+            impl->worker.reset(new Thread(this));
+            impl->worker->start();
+        }
+
+        doAdvertizeSelf();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::stop() {
+    // Changing the isStarted flag will signal the thread that it needs to shut down.
+    if (impl->started.compareAndSet(true, false)) {
+        doStop();
+
+        if (impl->worker == NULL) {
+            impl->worker->join(WORKER_KILL_TIME_SECONDS);
+
+            if (!impl->worker->isAlive()) {
+                impl->worker->interrupt();
+            }
+
+            impl->worker.reset(NULL);
+        }
+
+        impl->executor->shutdown();
+        impl->executor->awaitTermination(1, TimeUnit::MINUTES);
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::run() {
+
+    Thread::currentThread()->setName("Discovery Agent Thread.");
+
+    while (impl->started.get()) {
+        doTimeKeepingServices();
+        try {
+            doDiscovery();
+        } catch (InterruptedException& ex) {
+            return;
+        } catch (Exception& ignore) {
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::registerService(const std::string& name) {
+    impl->selfService = name;
+    if (impl->started.get()) {
+        try {
+            doAdvertizeSelf();
+        } catch (Exception& e) {
+            // If a the advertise fails, chances are all subsequent sends will fail
+            // too.. No need to keep reporting the same error over and over.
+            if (impl->reportAdvertizeFailed) {
+                impl->reportAdvertizeFailed = false;
+            }
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::serviceFailed(const activemq::commands::DiscoveryEvent&
event) {
+
+    Pointer<DiscoveredBrokerData> service;
+    synchronized(&impl->discoveredServicesLock) {
+        try {
+            service = impl->discoveredServices.get(event.getServiceName());
+        } catch (NoSuchElementException& ex) {}
+    }
+
+    if (service != NULL && impl->markFailed(service)) {
+        fireServiceRemovedEvent(service);
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::setDiscoveryListener(DiscoveryListener* listener) {
+    this->impl->listener = listener;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+DiscoveryListener* AbstractDiscoveryAgent::getDiscoveryListener() const {
+    return this->impl->listener;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::setServiceName(const std::string& name) {
+    impl->selfService = name;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string AbstractDiscoveryAgent::getServiceName() const {
+    return impl->selfService;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::setKeepAliveInterval(long long interval) {
+    impl->keepAliveInterval = interval;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long AbstractDiscoveryAgent::getKeepAliveInterval() const {
+    return impl->keepAliveInterval;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::setInitialReconnectDelay(long long initialReconnectDelay) {
+    impl->initialReconnectDelay = initialReconnectDelay;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long AbstractDiscoveryAgent::getInitialReconnectDelay() const {
+    return impl->initialReconnectDelay;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::setMaxReconnectAttempts(int maxReconnectAttempts) {
+    impl->maxReconnectAttempts = maxReconnectAttempts;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int AbstractDiscoveryAgent::getMaxReconnectAttempts() const {
+    return impl->maxReconnectAttempts;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::setMaxReconnectDelay(long long maxReconnectDelay) {
+    impl->maxReconnectDelay = maxReconnectDelay;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long AbstractDiscoveryAgent::getMaxReconnectDelay() const {
+    return impl->maxReconnectDelay;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::setUseExponentialBackOff(bool useExponentialBackOff) {
+    impl->useExponentialBackOff = useExponentialBackOff;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool AbstractDiscoveryAgent::isUseExponentialBackOff() const {
+    return impl->useExponentialBackOff;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::setGroup(const std::string& group) {
+    impl->group = group;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string AbstractDiscoveryAgent::getGroup() const {
+    return impl->group;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::fireServiceRemovedEvent(Pointer<DiscoveredBrokerData>
event) {
+    if (impl->listener != NULL && impl->started.get()) {
+        // Have the listener process the event async so that
+        // he does not block this thread since we are doing time sensitive
+        // processing of events.
+        impl->getExecutor().execute(new ServiceRemovedRunnable(this, event));
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::fireServiceAddedEvent(Pointer<DiscoveredBrokerData> event)
{
+    if (impl->listener != NULL && impl->started.get()) {
+        // Have the listener process the event async so that
+        // he does not block this thread since we are doing time sensitive
+        // processing of events.
+        impl->getExecutor().execute(new ServiceAddedRunnable(this, event));
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::processLiveService(const std::string& brokerName, const
std::string& service) {
+
+    if (getServiceName().empty() || service != getServiceName()) {
+        Pointer<DiscoveredBrokerData> remoteBroker;
+        synchronized(&impl->discoveredServicesLock) {
+            try {
+                remoteBroker = impl->discoveredServices.get(service);
+            } catch (NoSuchElementException& ignored) {
+            }
+        }
+
+        if (remoteBroker == NULL) {
+            remoteBroker.reset(new DiscoveredBrokerData(brokerName, service));
+            impl->discoveredServices.put(service, remoteBroker);
+            fireServiceAddedEvent(remoteBroker);
+            doAdvertizeSelf();
+        } else {
+            impl->updateHeartBeat(remoteBroker);
+            if (impl->isTimeForRecovery(remoteBroker)) {
+                fireServiceAddedEvent(remoteBroker);
+            }
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::processDeadService(const std::string& service) {
+
+    if (service != getServiceName()) {
+
+        Pointer<DiscoveredBrokerData> remoteBroker;
+        synchronized(&impl->discoveredServicesLock) {
+            try {
+                remoteBroker = impl->discoveredServices.get(service);
+            } catch (NoSuchElementException& ignored) {
+            }
+        }
+
+        if (remoteBroker != NULL && !remoteBroker->isFailed()) {
+            fireServiceRemovedEvent(remoteBroker);
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::doTimeKeepingServices() {
+
+    if (impl->started.get()) {
+        long long currentTime = System::currentTimeMillis();
+        if (currentTime < impl->lastAdvertizeTime ||
+            ((currentTime - impl->keepAliveInterval) > impl->lastAdvertizeTime))
{
+
+            doAdvertizeSelf();
+            impl->lastAdvertizeTime = currentTime;
+        }
+        doExpireOldServices();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AbstractDiscoveryAgent::doExpireOldServices() {
+    long long expireTime = System::currentTimeMillis() -
+        (impl->keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH);
+
+    std::vector< Pointer<DiscoveredBrokerData> > services;
+    synchronized(&impl->discoveredServicesLock) {
+        services = impl->discoveredServices.values().toArray();
+    }
+
+    std::vector< Pointer<DiscoveredBrokerData> >::iterator iter = services.begin();
+    for (; iter != services.end(); ++iter) {
+        Pointer<DiscoveredBrokerData> service = *iter;
+        if (service->getLastHeartBeatTime() < expireTime) {
+            processDeadService(service->getServiceName());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/f7149109/activemq-cpp/src/main/activemq/transport/discovery/AbstractDiscoveryAgent.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/transport/discovery/AbstractDiscoveryAgent.h b/activemq-cpp/src/main/activemq/transport/discovery/AbstractDiscoveryAgent.h
new file mode 100644
index 0000000..58582f2
--- /dev/null
+++ b/activemq-cpp/src/main/activemq/transport/discovery/AbstractDiscoveryAgent.h
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_DISCOVERY_ABSTRACTDISCOVERYAGENT_H_
+#define _ACTIVEMQ_TRANSPORT_DISCOVERY_ABSTRACTDISCOVERYAGENT_H_
+
+#include <activemq/util/Config.h>
+
+#include <activemq/transport/discovery/DiscoveryAgent.h>
+#include <activemq/transport/discovery/DiscoveredBrokerData.h>
+
+#include <decaf/lang/Runnable.h>
+#include <decaf/lang/Pointer.h>
+
+namespace activemq {
+namespace transport {
+namespace discovery {
+
+    class AbstractDiscoveryAgentImpl;
+
+    /**
+     * Abstract base class that provides all the basic implementation needed to create
+     * a DiscoveryAgent instance without needing to implement a lot of boilerplate code.
+     *
+     * @since 3.9.0
+     */
+    class AMQCPP_API AbstractDiscoveryAgent : public DiscoveryAgent, public decaf::lang::Runnable
{
+    private:
+
+        AbstractDiscoveryAgentImpl* impl;
+
+    public:
+
+        static const int DEFAULT_INITIAL_RECONNECT_DELAY;
+        static const int DEFAULT_BACKOFF_MULTIPLIER;
+        static const int DEFAULT_MAX_RECONNECT_DELAY;
+        static const int WORKER_KILL_TIME_SECONDS;
+        static const int HEARTBEAT_MISS_BEFORE_DEATH;
+        static const int DEFAULT_KEEPALIVE_INTERVAL;
+
+    private:
+
+        AbstractDiscoveryAgent(const AbstractDiscoveryAgent&);
+        AbstractDiscoveryAgent& operator= (const AbstractDiscoveryAgent&);
+
+    public:
+
+        AbstractDiscoveryAgent();
+        virtual ~AbstractDiscoveryAgent();
+
+        virtual void start();
+        virtual void stop();
+
+        virtual void registerService(const std::string& name);
+        virtual void serviceFailed(const activemq::commands::DiscoveryEvent& event);
+
+        virtual void setDiscoveryListener(DiscoveryListener* listener);
+        virtual DiscoveryListener* getDiscoveryListener() const;
+
+        /**
+         * Sets the service that is publish by this agent if it supports publishing.
+         *
+         * @param name
+         *      The service name to publish, typically the URI.
+         */
+        void setServiceName(const std::string& name);
+
+        /**
+         * Gets the configured service to publish, not all agents can publish so this value
+         * may not mean that an actual service advertisement is ever done.
+         *
+         * @returns the configured service to publish.
+         */
+        std::string getServiceName() const;
+
+        /**
+         * Sets the keep alive interval used to control how long an service that has not
been
+         * seen is kept in the list of discovered services before being idle to long.  Also
this
+         * value controls how often this service will advertise itself if it supports that.
+         *
+         * @param interval
+         *      Time in milliseconds for the keep alive interval.
+         */
+        void setKeepAliveInterval(long long interval);
+
+        /**
+         * Gets the keep alive interval used to control how long an service that has not
been
+         * seen is kept in the list of discovered services before being idle to long.  Also
this
+         * value controls how often this service will advertise itself if it supports that.
+         *
+         * @returns Time in milliseconds for the keep alive interval.
+         */
+        long long getKeepAliveInterval() const;
+
+        /**
+         * Sets the agents reconnect backoff multiplier.
+         *
+         * @param multiplier
+         *      The back multiplier to use when calculating the next recovery time.
+         */
+        void getBackOffMultiplier(long long multiplier);
+
+        /**
+         * Gets the configured backoff multiplier for calculating the next recovery time.
+         *
+         * @returns the configured backoff multiplier for calculating the next recovery time.
+         */
+        long long getBackOffMultiplier() const;
+
+        void setInitialReconnectDelay(long long initialReconnectDelay);
+
+        long long getInitialReconnectDelay() const;
+
+        void setMaxReconnectAttempts(int maxReconnectAttempts);
+
+        int getMaxReconnectAttempts() const;
+
+        void setMaxReconnectDelay(long long maxReconnectDelay);
+
+        long long getMaxReconnectDelay() const;
+
+        void setUseExponentialBackOff(bool useExponentialBackOff);
+
+        bool isUseExponentialBackOff() const;
+
+        void setGroup(const std::string& group);
+
+        std::string getGroup() const;
+
+    protected:
+
+        /**
+         * Default implementation of the DiscoveryAgent's background worker thread processing.
+         */
+        virtual void run();
+
+        virtual void processLiveService(const std::string& brokerName, const std::string&
service);
+        virtual void processDeadService(const std::string& service);
+
+        virtual void fireServiceAddedEvent(decaf::lang::Pointer<DiscoveredBrokerData>
event);
+        virtual void fireServiceRemovedEvent(decaf::lang::Pointer<DiscoveredBrokerData>
event);
+
+    protected:
+
+        /**
+         * The real agent will implement this method to perform any necessary resource allocation
+         * prior to the completion of the start call.
+         */
+        virtual void doStart() = 0;
+
+        virtual void doStop() = 0;
+        virtual void doAdvertizeSelf() = 0;
+        virtual void doDiscovery() = 0;
+
+    private:
+
+        void doExpireOldServices();
+        void doTimeKeepingServices();
+
+    };
+
+}}}
+
+#endif /* _ACTIVEMQ_TRANSPORT_DISCOVERY_ABSTRACTDISCOVERYAGENT_H_ */

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/f7149109/activemq-cpp/src/main/activemq/transport/discovery/DiscoveredBrokerData.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/transport/discovery/DiscoveredBrokerData.cpp b/activemq-cpp/src/main/activemq/transport/discovery/DiscoveredBrokerData.cpp
index 56ff924..8657760 100644
--- a/activemq-cpp/src/main/activemq/transport/discovery/DiscoveredBrokerData.cpp
+++ b/activemq-cpp/src/main/activemq/transport/discovery/DiscoveredBrokerData.cpp
@@ -31,5 +31,17 @@ DiscoveredBrokerData::DiscoveredBrokerData() : DiscoveryEvent(),
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+DiscoveredBrokerData::DiscoveredBrokerData(const std::string& brokerName, const std::string&
service) :
+    DiscoveryEvent(),
+    lastHeartBeatTime(0),
+    nextRecoveryTime(0),
+    failureCount(0),
+    failed(false) {
+
+    setBrokerName(brokerName);
+    setServiceName(service);
+}
+
+////////////////////////////////////////////////////////////////////////////////
 DiscoveredBrokerData::~DiscoveredBrokerData() {
 }

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/f7149109/activemq-cpp/src/main/activemq/transport/discovery/DiscoveredBrokerData.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/transport/discovery/DiscoveredBrokerData.h b/activemq-cpp/src/main/activemq/transport/discovery/DiscoveredBrokerData.h
index 4af93ad..8254a97 100644
--- a/activemq-cpp/src/main/activemq/transport/discovery/DiscoveredBrokerData.h
+++ b/activemq-cpp/src/main/activemq/transport/discovery/DiscoveredBrokerData.h
@@ -18,6 +18,7 @@
 #ifndef _ACTIVEMQ_TRANSPORT_DISCOVERY_DISCOVEREDBROKERDATA_H_
 #define _ACTIVEMQ_TRANSPORT_DISCOVERY_DISCOVEREDBROKERDATA_H_
 
+#include <activemq/util/Config.h>
 #include <activemq/commands/DiscoveryEvent.h>
 
 namespace activemq {
@@ -30,7 +31,7 @@ namespace discovery {
      *
      * @since 3.9.0
      */
-    class DiscoveredBrokerData : public activemq::commands::DiscoveryEvent {
+    class AMQCPP_API DiscoveredBrokerData : public activemq::commands::DiscoveryEvent {
     private:
 
         long long lastHeartBeatTime;
@@ -41,6 +42,8 @@ namespace discovery {
     public:
 
         DiscoveredBrokerData();
+        DiscoveredBrokerData(const std::string& brokerName, const std::string& service);
+
         virtual ~DiscoveredBrokerData();
 
         /**

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/f7149109/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryAgent.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryAgent.h b/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryAgent.h
index d4a43e8..f61340a 100644
--- a/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryAgent.h
+++ b/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryAgent.h
@@ -69,6 +69,13 @@ namespace discovery {
          */
         virtual void serviceFailed(const activemq::commands::DiscoveryEvent& event) =
0;
 
+        /**
+         * Returns a descriptive string that represents this discovery agent.
+         *
+         * @return a string that descibes this discovery agent.
+         */
+        virtual std::string toString() const = 0;
+
     };
 
 }}}

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/f7149109/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryAgentFactory.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryAgentFactory.h b/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryAgentFactory.h
index 96700be..2273e95 100644
--- a/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryAgentFactory.h
+++ b/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryAgentFactory.h
@@ -35,7 +35,7 @@ namespace discovery {
      *
      * @since 3.9.0
      */
-    class DiscoveryAgentFactory {
+    class AMQCPP_API DiscoveryAgentFactory {
     public:
 
         virtual ~DiscoveryAgentFactory();

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/f7149109/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryListener.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryListener.h b/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryListener.h
index 87b06ea..461d8a3 100644
--- a/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryListener.h
+++ b/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryListener.h
@@ -37,7 +37,7 @@ namespace discovery {
          * @param event
          *      A DiscoveryEvent that contains information on the newly discovered service.
          */
-        virtual void onServiceAdd(const activemq::commands::DiscoveryEvent& event) =
0;
+        virtual void onServiceAdd(const activemq::commands::DiscoveryEvent* event) = 0;
 
         /**
          * Called when an discovery agent determines that a service is no longer available.
@@ -45,7 +45,7 @@ namespace discovery {
          * @param event
          *      A DiscoveryEvent that contains information on the removed service.
          */
-        virtual void onServiceRemove(const activemq::commands::DiscoveryEvent& event)
= 0;
+        virtual void onServiceRemove(const activemq::commands::DiscoveryEvent* event) = 0;
 
     };
 

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/f7149109/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryTransport.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryTransport.cpp b/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryTransport.cpp
index 03fef90..731957e 100644
--- a/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryTransport.cpp
+++ b/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryTransport.cpp
@@ -170,14 +170,14 @@ Properties DiscoveryTransport::getParameters() const {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void DiscoveryTransport::onServiceAdd(const DiscoveryEvent& event) {
-    std::string url = event.getServiceName();
+void DiscoveryTransport::onServiceAdd(const DiscoveryEvent* event) {
+    std::string url = event->getServiceName();
     if (!url.empty()) {
         try {
             URI uri(url);
             uri = URISupport::applyParameters(uri, this->impl->parameters, DISCOVERED_OPTION_PREFIX);
             synchronized(&this->impl->lock) {
-                this->impl->serviceURIs.put(event.getServiceName(), uri);
+                this->impl->serviceURIs.put(event->getServiceName(), uri);
             }
             LinkedList<URI> uris;
             uris.add(uri);
@@ -188,11 +188,11 @@ void DiscoveryTransport::onServiceAdd(const DiscoveryEvent& event)
{
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void DiscoveryTransport::onServiceRemove(const DiscoveryEvent& event) {
+void DiscoveryTransport::onServiceRemove(const DiscoveryEvent* event) {
     try {
         URI uri;
         synchronized(&this->impl->lock) {
-            uri = this->impl->serviceURIs.get(event.getServiceName());
+            uri = this->impl->serviceURIs.get(event->getServiceName());
         }
         LinkedList<URI> uris;
         uris.add(uri);

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/f7149109/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryTransport.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryTransport.h b/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryTransport.h
index 2bc9cc4..782d4a1 100644
--- a/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryTransport.h
+++ b/activemq-cpp/src/main/activemq/transport/discovery/DiscoveryTransport.h
@@ -88,9 +88,9 @@ namespace discovery {
 
     public:
 
-        virtual void onServiceAdd(const activemq::commands::DiscoveryEvent& event);
+        virtual void onServiceAdd(const activemq::commands::DiscoveryEvent* event);
 
-        virtual void onServiceRemove(const activemq::commands::DiscoveryEvent& event);
+        virtual void onServiceRemove(const activemq::commands::DiscoveryEvent* event);
 
         virtual void transportInterrupted();
 

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/f7149109/activemq-cpp/src/test/activemq/transport/discovery/DiscoveryAgentRegistryTest.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test/activemq/transport/discovery/DiscoveryAgentRegistryTest.cpp
b/activemq-cpp/src/test/activemq/transport/discovery/DiscoveryAgentRegistryTest.cpp
index 4e3190b..d64dd9b 100644
--- a/activemq-cpp/src/test/activemq/transport/discovery/DiscoveryAgentRegistryTest.cpp
+++ b/activemq-cpp/src/test/activemq/transport/discovery/DiscoveryAgentRegistryTest.cpp
@@ -43,6 +43,7 @@ namespace {
         virtual void setDiscoveryListener(DiscoveryListener* listener) {}
         virtual void registerService(const std::string& name) {}
         virtual void serviceFailed(const activemq::commands::DiscoveryEvent& event) {}
+        virtual std::string toString() const { return "MockDiscoveryAgent"; }
     };
 
     class MockDiscoveryAgentFactory : public DiscoveryAgentFactory {

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/f7149109/activemq-cpp/src/test/activemq/transport/discovery/DiscoveryTransportFactoryTest.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test/activemq/transport/discovery/DiscoveryTransportFactoryTest.cpp
b/activemq-cpp/src/test/activemq/transport/discovery/DiscoveryTransportFactoryTest.cpp
index 18405b2..b3878f0 100644
--- a/activemq-cpp/src/test/activemq/transport/discovery/DiscoveryTransportFactoryTest.cpp
+++ b/activemq-cpp/src/test/activemq/transport/discovery/DiscoveryTransportFactoryTest.cpp
@@ -47,6 +47,7 @@ namespace {
         virtual void setDiscoveryListener(DiscoveryListener* listener) {}
         virtual void registerService(const std::string& name) {}
         virtual void serviceFailed(const activemq::commands::DiscoveryEvent& event) {}
+        virtual std::string toString() const { return "MockDiscoveryAgent"; }
     };
 
     class MockDiscoveryAgentFactory : public DiscoveryAgentFactory {


Mime
View raw message