Return-Path: Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: (qmail 61404 invoked from network); 4 Feb 2009 17:05:12 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 4 Feb 2009 17:05:12 -0000 Received: (qmail 55682 invoked by uid 500); 4 Feb 2009 17:05:12 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 55670 invoked by uid 500); 4 Feb 2009 17:05:12 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 55660 invoked by uid 99); 4 Feb 2009 17:05:12 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Feb 2009 09:05:12 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Feb 2009 17:05:06 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 403B02388A91; Wed, 4 Feb 2009 17:04:46 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r740793 - in /qpid/trunk/qpid/cpp/src: qpid/broker/Broker.cpp qpid/broker/Broker.h qpid/cluster/Cluster.cpp qpid/cluster/Cluster.h qpid/cluster/ClusterPlugin.cpp tests/run_acl_tests Date: Wed, 04 Feb 2009 17:04:45 -0000 To: commits@qpid.apache.org From: aconway@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090204170446.403B02388A91@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: aconway Date: Wed Feb 4 17:04:45 2009 New Revision: 740793 URL: http://svn.apache.org/viewvc?rev=740793&view=rev Log: Cluster sets recovery flag on Broker for first member in cluster. Disable recovery from local store if the recovery flag is not set. Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp qpid/trunk/qpid/cpp/src/tests/run_acl_tests Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=740793&r1=740792&r2=740793&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Wed Feb 4 17:04:45 2009 @@ -149,6 +149,7 @@ *this), queueCleaner(queues, timer), queueEvents(poller), + recovery(true), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) { if (conf.enableMgmt) { @@ -209,11 +210,17 @@ setStore (new NullMessageStore()); exchanges.declare(empty, DirectExchange::typeName); // Default exchange. - + if (store.get() != 0) { - RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, - conf.stagingThreshold); - store->recover(recoverer); + // The cluster plug-in will setRecovery(false) on all but the first + // broker to join a cluster. + if (getRecovery()) { + RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, + conf.stagingThreshold); + store->recover(recoverer); + } + else + QPID_LOG(notice, "Recovering from cluster, no recovery from local journal"); } //ensure standard exchanges exist (done after recovery from store) Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=740793&r1=740792&r2=740793&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Wed Feb 4 17:04:45 2009 @@ -139,6 +139,8 @@ std::vector getKnownBrokersImpl(); std::string federationTag; + bool recovery; + public: @@ -223,6 +225,9 @@ boost::function ()> getKnownBrokers; static const std::string TCP_TRANSPORT; + + void setRecovery(bool set) { recovery = set; } + bool getRecovery() const { return recovery; } }; }} Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=740793&r1=740792&r2=740793&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Feb 4 17:04:45 2009 @@ -21,28 +21,30 @@ #include "UpdateClient.h" #include "FailoverExchange.h" +#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" +#include "qmf/org/apache/qpid/cluster/Package.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/SessionState.h" #include "qpid/broker/Connection.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/SessionState.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" -#include "qpid/framing/ClusterUpdateRequestBody.h" -#include "qpid/framing/ClusterReadyBody.h" #include "qpid/framing/ClusterConfigChangeBody.h" -#include "qpid/framing/ClusterUpdateOfferBody.h" -#include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" -#include "qpid/log/Statement.h" +#include "qpid/framing/ClusterReadyBody.h" +#include "qpid/framing/ClusterShutdownBody.h" +#include "qpid/framing/ClusterUpdateOfferBody.h" +#include "qpid/framing/ClusterUpdateRequestBody.h" #include "qpid/log/Helpers.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/LatencyMetric.h" +#include "qpid/log/Statement.h" +#include "qpid/management/IdAllocator.h" +#include "qpid/management/ManagementBroker.h" #include "qpid/memory.h" #include "qpid/shared_ptr.h" -#include "qmf/org/apache/qpid/cluster/Package.h" -#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" +#include "qpid/sys/LatencyMetric.h" +#include "qpid/sys/Thread.h" #include #include @@ -101,11 +103,28 @@ poller), connections(*this), decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)), + initialized(false), state(INIT), lastSize(0), lastBroker(false), sequence(0) { + failoverExchange.reset(new FailoverExchange(this)); + if (quorum_) quorum.init(); + cpg.join(name); + // pump the CPG dispatch manually till we get initialized. + while (!initialized) + cpg.dispatchOne(); +} + +Cluster::~Cluster() { + if (updateThread.id()) updateThread.join(); // Join the previous updatethread. +} + +void Cluster::initialize() { + if (myUrl.empty()) + myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT)); + QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); mAgent = ManagementAgent::Singleton::getInstance(); if (mAgent != 0){ _qmf::Package packageInit(mAgent); @@ -114,18 +133,11 @@ mgmtObject->set_status("JOINING"); } broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); - failoverExchange.reset(new FailoverExchange(this)); dispatcher.start(); deliverEventQueue.start(); deliverFrameQueue.start(); - QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); - if (quorum_) quorum.init(); - cpg.join(name); - broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); // Must be last for exception safety. -} - -Cluster::~Cluster() { - if (updateThread.id()) updateThread.join(); // Join the previous updatethread. + // Add finalizer last for exception safety. + broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); } // Called in connection thread to insert a client connection. @@ -279,6 +291,11 @@ cpg_address */*joined*/, int /*nJoined*/) { Mutex::ScopedLock l(lock); + if (state == INIT) { // First config change. + // Recover only if we are first in cluster. + broker.setRecovery(nCurrent == 1); + initialized = true; + } QPID_LOG(debug, *this << " config change: " << AddrList(current, nCurrent) << AddrList(left, nLeft, "( ", ")")); std::string addresses; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=740793&r1=740792&r2=740793&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed Feb 4 17:04:45 2009 @@ -64,15 +64,17 @@ public: typedef boost::intrusive_ptr ConnectionPtr; typedef std::vector Connections; - - /** - * Join a cluster. - */ + + /** Construct the cluster in plugin earlyInitialize */ Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum, size_t readMax, size_t writeEstimate); virtual ~Cluster(); + /** Join the cluster in plugin initialize. Requires transport + * plugins to be available.. */ + void initialize(); + // Connection map - called in connection threads. void addLocalConnection(const ConnectionPtr&); void addShadowConnection(const ConnectionPtr&); @@ -177,7 +179,7 @@ boost::shared_ptr poller; Cpg cpg; const std::string name; - const Url myUrl; + Url myUrl; const MemberId myId; const size_t readMax; const size_t writeEstimate; @@ -197,7 +199,10 @@ // Called only from event delivery thread Decoder decoder; - + + // Used only during initialization + bool initialized; + // Remaining members are protected by lock mutable sys::Monitor lock; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=740793&r1=740792&r2=740793&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Wed Feb 4 17:04:45 2009 @@ -136,13 +136,13 @@ Options* getOptions() { return &options; } - void initialize(Plugin::Target& target) { + void earlyInitialize(Plugin::Target& target) { if (values.name.empty()) return; // Only if --cluster-name option was specified. Broker* broker = dynamic_cast(&target); if (!broker) return; cluster = new Cluster( values.name, - values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), + values.url.empty() ? Url() : Url(values.url), *broker, values.quorum, values.readMax, values.writeEstimate*1024 @@ -158,7 +158,9 @@ } } - void earlyInitialize(Plugin::Target&) {} + void initialize(Plugin::Target& ) { + cluster->initialize(); + } }; static ClusterPlugin instance; // Static initialization. Modified: qpid/trunk/qpid/cpp/src/tests/run_acl_tests URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_acl_tests?rev=740793&r1=740792&r2=740793&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/run_acl_tests (original) +++ qpid/trunk/qpid/cpp/src/tests/run_acl_tests Wed Feb 4 17:04:45 2009 @@ -20,6 +20,7 @@ # # Run the acl tests. $srcdir is set by the Makefile. +set -x PYTHON_DIR=$srcdir/../../../python DATA_DIR=`pwd`/data_dir --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscribe@qpid.apache.org