Return-Path: Delivered-To: apmail-incubator-qpid-commits-archive@locus.apache.org Received: (qmail 11772 invoked from network); 8 Jul 2008 22:59:11 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 8 Jul 2008 22:59:11 -0000 Received: (qmail 70743 invoked by uid 500); 8 Jul 2008 22:59:11 -0000 Delivered-To: apmail-incubator-qpid-commits-archive@incubator.apache.org Received: (qmail 70733 invoked by uid 500); 8 Jul 2008 22:59:11 -0000 Mailing-List: contact qpid-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: qpid-dev@incubator.apache.org Delivered-To: mailing list qpid-commits@incubator.apache.org Received: (qmail 70724 invoked by uid 99); 8 Jul 2008 22:59:11 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Jul 2008 15:59:11 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Jul 2008 22:58:16 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id ED37A23889FE; Tue, 8 Jul 2008 15:58:38 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r675017 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/ qpid/amqp_0_10/ qpid/broker/ qpid/cluster/ qpid/framing/ tests/ Date: Tue, 08 Jul 2008 22:58:38 -0000 To: qpid-commits@incubator.apache.org From: aconway@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080708225838.ED37A23889FE@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: aconway Date: Tue Jul 8 15:58:37 2008 New Revision: 675017 URL: http://svn.apache.org/viewvc?rev=675017&view=rev Log: HandlerChain: plug-in handler chain extension points. Replaces Handler::Chain. Updated Sessoin & Connection handler chains and Cluster. Added: incubator/qpid/trunk/qpid/cpp/src/qpid/HandlerChain.h (with props) Removed: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionManager.h Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=675017&r1=675016&r2=675017&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original) +++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Tue Jul 8 15:58:37 2008 @@ -248,8 +248,6 @@ qpid/amqp_0_10/Connection.cpp \ qpid/broker/Broker.cpp \ qpid/broker/BrokerSingleton.cpp \ - qpid/broker/ConnectionManager.h \ - qpid/broker/ConnectionManager.cpp \ qpid/broker/Exchange.cpp \ qpid/broker/Queue.cpp \ qpid/broker/PersistableMessage.cpp \ @@ -354,6 +352,7 @@ qpid/amqp_0_10/Exception.h \ qpid/Msg.h \ qpid/Options.h \ + qpid/HandlerChain.h \ qpid/Plugin.h \ qpid/ptr_map.h \ qpid/RangeSet.h \ Added: incubator/qpid/trunk/qpid/cpp/src/qpid/HandlerChain.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/HandlerChain.h?rev=675017&view=auto ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/HandlerChain.h (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/HandlerChain.h Tue Jul 8 15:58:37 2008 @@ -0,0 +1,97 @@ +#ifndef QPID_HANDLERCHAIN_H +#define QPID_HANDLERCHAIN_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include +#include +#include + +namespace qpid { + +/** + * Chain-of-responsibility design pattern. + * + * Construct a chain of objects deriving from Base. Each implements + * Base::f by doing its own logic and then calling Base::f on the next + * handler (or not if it chooses not to.) + * + * HandlerChain acts as a smart pointer to the first object in the chain. + */ +template +class HandlerChain { + public: + /** Base class for chainable handlers */ + class Handler : public Base { + public: + Handler() : next() {} + virtual ~Handler() {} + virtual void setNext(Base* next_) { next = next_; } + + protected: + Base* next; + }; + + typedef std::auto_ptr HandlerAutoPtr; + + /**@param target is the object at the end of the chain. */ + HandlerChain(Base& target) : first(&target) {} + + /** HandlerChain owns the ChainableHandler. */ + void push(HandlerAutoPtr h) { + handlers.push_back(h); + h->setNext(first); + first = h.get(); + } + + // Smart pointer functions + Base* operator*() { return first; } + const Base* operator*() const { return first; } + Base* operator->() { return first; } + const Base* operator->() const { return first; } + operator bool() const { return first; } + + private: + boost::ptr_vector handlers; + Base* first; +}; + +/** + * A PluginHandlerChain calls Plugin::initAll(*this) on construction, + * allowing plugins to add handlers. + * + * @param Tag can be any class, use to distinguish different plugin + * chains with the same Base type. + */ +template +struct PluginHandlerChain : public HandlerChain, + public Plugin::Target +{ + PluginHandlerChain(Base& target) : HandlerChain(target) { + Plugin::initAll(*this); + } +}; + + +} // namespace qpid + +#endif /*!QPID_HANDLERCHAIN_H*/ Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/HandlerChain.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/HandlerChain.h ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp?rev=675017&r1=675016&r2=675017&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.cpp Tue Jul 8 15:58:37 2008 @@ -20,10 +20,13 @@ #include "Plugin.h" #include "qpid/Options.h" +#include +#include namespace qpid { namespace { + Plugin::Plugins& thePlugins() { // This is a single threaded singleton implementation so // it is important to be sure that the first use of this @@ -31,8 +34,17 @@ static Plugin::Plugins plugins; return plugins; } + +void call(boost::function f) { f(); } + +} // namespace + +Plugin::Target::~Target() { + std::for_each(cleanup.begin(), cleanup.end(), &call); } +void Plugin::Target::addCleanup(const boost::function& f) { cleanup.push_back(f); } + Plugin::Plugin() { // Register myself. thePlugins().push_back(this); @@ -44,6 +56,12 @@ const Plugin::Plugins& Plugin::getPlugins() { return thePlugins(); } +namespace { +template void each_plugin(const F& f) { + std::for_each(Plugin::getPlugins().begin(), Plugin::getPlugins().end(), f); +} +} + void Plugin::addOptions(Options& opts) { for (Plugins::const_iterator i = getPlugins().begin(); i != getPlugins().end(); ++i) { if ((*i)->getOptions()) @@ -51,4 +69,7 @@ } } +void Plugin::earlyInitAll(Target& t) { each_plugin(boost::bind(&Plugin::earlyInitialize, _1, t)); } +void Plugin::initAll(Target& t) { each_plugin(boost::bind(&Plugin::initialize, _1, t)); } + } // namespace qpid Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h?rev=675017&r1=675016&r2=675017&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/Plugin.h Tue Jul 8 15:58:37 2008 @@ -40,11 +40,17 @@ public: /** * Base interface for targets that receive plug-ins. - * - * The Broker is a plug-in target, there might be others - * in future. + * Plug-ins can register clean-up functions to execute when + * the target is destroyed. */ - struct Target { virtual ~Target() {} }; + struct Target { + public: + virtual ~Target(); + void addCleanup(const boost::function& cleanupFunction); + + private: + std::vector > cleanup; + }; typedef std::vector Plugins; @@ -69,7 +75,9 @@ virtual Options* getOptions(); /** - * Initialize Plugin functionality on a Target. + * Initialize Plugin functionality on a Target, called before + * initializing the target. + * * Plugins should ignore targets they don't recognize. * * Called before the target itself is initialized. @@ -77,7 +85,9 @@ virtual void earlyInitialize(Target&) = 0; /** - * Initialize Plugin functionality on a Target. + * Initialize Plugin functionality on a Target. Called after + * initializing the target. + * * Plugins should ignore targets they don't recognize. * * Called after the target is fully initialized. @@ -89,6 +99,12 @@ */ static const Plugins& getPlugins(); + /** Call earlyInitialize() on all registered plugins */ + static void earlyInitAll(Target&); + + /** Call initialize() on all registered plugins */ + static void initAll(Target&); + /** For each registered plugin, add plugin.getOptions() to opts. */ static void addOptions(Options& opts); }; Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp?rev=675017&r1=675016&r2=675017&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp Tue Jul 8 15:58:37 2008 @@ -29,7 +29,7 @@ Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient) : frameQueueClosed(false), output(o), - connection(broker.getConnectionManager().create(this, broker, id, _isClient)), + connection(this, broker, id, _isClient), identifier(id), initialized(false), isClient(_isClient) {} size_t Connection::decode(const char* buffer, size_t size) { @@ -46,13 +46,13 @@ framing::AMQFrame frame; while(frame.decode(in)) { QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); - connection->received(frame); + connection.received(frame); } return in.getPosition(); } bool Connection::canEncode() { - if (!frameQueueClosed) connection->doOutput(); + if (!frameQueueClosed) connection.doOutput(); Mutex::ScopedLock l(frameQueueLock); return (!isClient && !initialized) || !frameQueue.empty(); } @@ -91,7 +91,7 @@ } void Connection::closed() { - connection->closed(); + connection.closed(); } void Connection::send(framing::AMQFrame& f) { Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h?rev=675017&r1=675016&r2=675017&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/Connection.h Tue Jul 8 15:58:37 2008 @@ -33,7 +33,6 @@ namespace broker { class Broker; } namespace amqp_0_10 { -// FIXME aconway 2008-03-18: Update to 0-10. class Connection : public sys::ConnectionCodec, public sys::ConnectionOutputHandler { @@ -41,7 +40,7 @@ bool frameQueueClosed; mutable sys::Mutex frameQueueLock; sys::OutputControl& output; - std::auto_ptr connection; // FIXME aconway 2008-03-18: + broker::Connection connection; std::string identifier; bool initialized; bool isClient; Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=675017&r1=675016&r2=675017&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Tue Jul 8 15:58:37 2008 @@ -23,7 +23,6 @@ */ #include "ConnectionFactory.h" -#include "ConnectionManager.h" #include "ConnectionToken.h" #include "DirectExchange.h" #include "DtxManager.h" @@ -121,7 +120,6 @@ Options& getOptions() { return config; } SessionManager& getSessionManager() { return sessionManager; } - ConnectionManager& getConnectionManager() { return connectionManager; } management::ManagementObject* GetManagementObject (void) const; management::Manageable* GetVhostObject (void) const; @@ -159,7 +157,6 @@ ConnectionFactory factory; DtxManager dtxManager; SessionManager sessionManager; - ConnectionManager connectionManager; management::ManagementAgent* managementAgent; management::Broker* mgmtObject; Vhost::shared_ptr vhostObject; Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=675017&r1=675016&r2=675017&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Tue Jul 8 15:58:37 2008 @@ -88,7 +88,7 @@ links.notifyClosed(mgmtId); } -void Connection::received(framing::AMQFrame& frame){ inChain(frame); } +void Connection::received(framing::AMQFrame& frame){ inChain->handle(frame); } void Connection::receivedLast(framing::AMQFrame& frame){ if (frame.getChannel() == 0 && frame.getMethod()) { Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=675017&r1=675016&r2=675017&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Tue Jul 8 15:58:37 2008 @@ -43,6 +43,7 @@ #include "SessionHandler.h" #include "qpid/management/Manageable.h" #include "qpid/management/Connection.h" +#include "qpid/HandlerChain.h" #include @@ -91,8 +92,6 @@ void notifyConnectionForced(const std::string& text); void setUserId(const string& uid); - framing::FrameHandler::Chain& getInChain() { return inChain; } - private: typedef boost::ptr_map ChannelMap; typedef std::vector::iterator queue_iterator; @@ -110,8 +109,7 @@ management::Connection* mgmtObject; LinkRegistry& links; framing::FrameHandler::MemFunRef lastInHandler; - framing::FrameHandler::Chain inChain; - + PluginHandlerChain inChain; }; }} Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp?rev=675017&r1=675016&r2=675017&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.cpp Tue Jul 8 15:58:37 2008 @@ -55,11 +55,8 @@ throw SessionBusyException(QPID_MSG("Session already attached: " << id)); Detached::iterator i = std::find(detached.begin(), detached.end(), id); std::auto_ptr state; - if (i == detached.end()) { + if (i == detached.end()) state.reset(new SessionState(broker, h, id, config)); - for_each(observers.begin(), observers.end(), - boost::bind(&Observer::opened, _1,boost::ref(*state))); - } else { state.reset(detached.release(i).release()); state->attach(h); @@ -99,8 +96,4 @@ } } -void SessionManager::add(const intrusive_ptr& o) { - observers.push_back(o); -} - }} // namespace qpid::broker Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h?rev=675017&r1=675016&r2=675017&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionManager.h Tue Jul 8 15:58:37 2008 @@ -46,14 +46,6 @@ */ class SessionManager : private boost::noncopyable { public: - /** - * Observer notified of SessionManager events. - */ - struct Observer : public RefCounted { - /** Called when a stateless session is attached. */ - virtual void opened(SessionState&) {} - }; - SessionManager(const qpid::SessionState::Configuration&, Broker&); ~SessionManager(); @@ -67,9 +59,6 @@ /** Forget about an attached session. Called by SessionState destructor. */ void forget(const SessionId&); - /** Add an Observer. */ - void add(const boost::intrusive_ptr&); - Broker& getBroker() const { return broker; } const qpid::SessionState::Configuration& getSessionConfig() const { return config; } @@ -77,7 +66,6 @@ private: typedef boost::ptr_vector Detached; // Sorted in expiry order. typedef std::set Attached; - typedef std::vector > Observers; void eraseExpired(); @@ -85,7 +73,6 @@ Detached detached; Attached attached; qpid::SessionState::Configuration config; - Observers observers; Broker& broker; }; Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=675017&r1=675016&r2=675017&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Tue Jul 8 15:58:37 2008 @@ -224,8 +224,8 @@ getProxy().getMessage().accept(SequenceSet(msg->getCommandId())); } -void SessionState::handleIn(AMQFrame& f) { inChain.handle(f); } -void SessionState::handleOut(AMQFrame& f) { outChain.handle(f); } +void SessionState::handleIn(AMQFrame& f) { inChain->handle(f); } +void SessionState::handleOut(AMQFrame& f) { outChain->handle(f); } void SessionState::handleInLast(AMQFrame& frame) { SequenceNumber commandId = receiverGetCurrent(); @@ -291,8 +291,4 @@ Broker& SessionState::getBroker() { return broker; } -framing::FrameHandler::Chain& SessionState::getInChain() { return inChain; } - -framing::FrameHandler::Chain& SessionState::getOutChain() { return outChain; } - }} // namespace qpid::broker Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=675017&r1=675016&r2=675017&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Jul 8 15:58:37 2008 @@ -23,6 +23,7 @@ */ #include "qpid/SessionState.h" +#include "qpid/HandlerChain.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/SequenceSet.h" #include "qpid/sys/Mutex.h" @@ -58,8 +59,8 @@ class SessionManager; /** - * Broker-side session state includes sessions handler chains, which may - * themselves have state. + * Broker-side session state includes session's handler chains, which + * may themselves have state. */ class SessionState : public qpid::SessionState, public SessionContext, @@ -101,8 +102,9 @@ void readyToSend(); - framing::FrameHandler::Chain& getInChain(); - framing::FrameHandler::Chain& getOutChain(); + // Tag types to identify PluginHandlerChains. + struct InTag {}; + struct OutTag {}; private: @@ -131,7 +133,9 @@ management::Session* mgmtObject; framing::FrameHandler::MemFunRef inLastHandler; framing::FrameHandler::MemFunRef outLastHandler; - framing::FrameHandler::Chain inChain, outChain; + + qpid::PluginHandlerChain inChain; + qpid::PluginHandlerChain outChain; friend class SessionManager; }; Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=675017&r1=675016&r2=675017&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Jul 8 15:58:37 2008 @@ -23,6 +23,7 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ClusterNotifyBody.h" #include "qpid/log/Statement.h" +#include "qpid/memory.h" #include #include #include @@ -36,25 +37,12 @@ using namespace std; using broker::Connection; -namespace { - -// FIXME aconway 2008-07-01: sending every frame to cluster, -// serializing all processing in cluster deliver thread. -// This will not perform at all, but provides a correct starting point. -// -// TODO: -// - Fake "Connection" for cluster: owns shadow sessions. -// - Maintain shadow sessions. -// - Apply foreign frames to shadow sessions. -// - - // Beginning of inbound chain: send to cluster. -struct ClusterSendHandler : public FrameHandler { - Connection& connection; +struct ClusterSendHandler : public HandlerChain::Handler { + Cluster::ConnectionChain& connection; Cluster& cluster; - ClusterSendHandler(Connection& conn, Cluster& clust) : connection(conn), cluster(clust) {} + ClusterSendHandler(Cluster::ConnectionChain& conn, Cluster& clust) : connection(conn), cluster(clust) {} void handle(AMQFrame& f) { // FIXME aconway 2008-01-29: Refcount Connections to ensure @@ -63,16 +51,8 @@ } }; -struct ConnectionObserver : public broker::ConnectionManager::Observer { - Cluster& cluster; - ConnectionObserver(Cluster& c) : cluster(c) {} - - void created(Connection& c) { - // FIXME aconway 2008-06-16: clean up chaining and observers. - ClusterSendHandler* sender=new ClusterSendHandler(c, cluster); - c.getInChain().insert(sender); - } -}; +void Cluster::initialize(Cluster::ConnectionChain& cc) { + cc.push(ConnectionChain::HandlerAutoPtr(new ClusterSendHandler(cc, *this))); } ostream& operator <<(ostream& out, const Cluster& cluster) { @@ -95,7 +75,6 @@ cpg(*this), name(name_), url(url_), - observer(new ConnectionObserver(*this)), self(cpg.self()) { QPID_LOG(trace, "Joining cluster: " << name_); Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=675017&r1=675016&r2=675017&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Jul 8 15:58:37 2008 @@ -22,6 +22,7 @@ #include "qpid/cluster/Cpg.h" #include "qpid/cluster/ShadowConnectionOutputHandler.h" +#include "qpid/HandlerChain.h" #include "qpid/broker/Broker.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Runnable.h" @@ -47,6 +48,8 @@ class Cluster : private sys::Runnable, private Cpg::Handler { public: + typedef PluginHandlerChain ConnectionChain; + /** Details of a cluster member */ struct Member { Member(const Url& url_=Url()) : url(url_) {} @@ -62,11 +65,11 @@ */ Cluster(const std::string& name, const Url& url, broker::Broker&); + // Add cluster handlers to broker chains. + void initialize(ConnectionChain&); + virtual ~Cluster(); - // FIXME aconway 2008-01-29: - boost::intrusive_ptr getObserver() { return observer; } - /** Get the current cluster membership. */ MemberList getMembers() const; @@ -124,7 +127,6 @@ MemberMap members; sys::Thread dispatcher; boost::function callback; - boost::intrusive_ptr observer; Id self; ShadowConnectionMap shadowConnectionMap; ShadowConnectionOutputHandler shadowOut; Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=675017&r1=675016&r2=675017&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Tue Jul 8 15:58:37 2008 @@ -54,24 +54,29 @@ }; struct ClusterPlugin : public Plugin { + typedef PluginHandlerChain ConnectionChain; ClusterOptions options; boost::optional cluster; - Options* getOptions() { return &options; } + template void init(Plugin::Target& t) { + Chain* c = dynamic_cast(&t); + if (c) cluster->initialize(*c); + } void earlyInitialize(Plugin::Target&) {} void initialize(Plugin::Target& target) { broker::Broker* broker = dynamic_cast(&target); - // Only provide to a Broker, and only if the --cluster config is set. if (broker && !options.name.empty()) { - assert(!cluster); // A process can only belong to one cluster. + if (cluster) throw Exception("Cluster plugin cannot be initialized twice in a process."); cluster = boost::in_place(options.name, options.getUrl(broker->getPort()), boost::ref(*broker)); - broker->getConnectionManager().add(cluster->getObserver()); + return; } + if (!cluster) return; // Ignore chain handlers if we didn't init a cluster. + init(target); } }; Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h?rev=675017&r1=675016&r2=675017&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Handler.h Tue Jul 8 15:58:37 2008 @@ -28,7 +28,6 @@ namespace qpid { namespace framing { -/** Generic handler that can be linked into chains. */ template struct Handler { typedef T HandledType; @@ -46,23 +45,6 @@ /** Pointer to next handler in a linked list. */ Handler* next; - /** A Chain is a handler holding a linked list of sub-handlers. - * Chain::next is invoked after the full chain, it is not itself part of the chain. - * Handlers inserted into the chain are deleted by the Chain dtor. - */ - class Chain : public Handler { - public: - Chain(Handler& next_) : Handler(&next_), first(&next_) {} - ~Chain() { while (first != next) pop(); } - void handle(T t) { first->handle(t); } - void insert(Handler* h) { h->next = first; first = h; } - bool empty() { return first == next; } - - private: - void pop() { Handler* p=first; first=first->next; delete p; } - Handler* first; - }; - /** Adapt any void(T) functor as a Handler. * Functor(f) will copy f. * Functor(f) will only take a reference to x. @@ -84,7 +66,7 @@ MemFunRef(X& x, Handler* next=0) : Handler(next), target(&x) {} void handle(T t) { (target->*F)(t); } - /** Allow calling with -> syntax, compatible with Chains */ + /** Allow calling with -> syntax, like a qpid::HandlerChain */ MemFunRef* operator->() { return this; } private: @@ -103,15 +85,13 @@ }; /** Support for implementing an in-out handler pair as a single class. - * Public interface is Handler::Chains pair, but implementation - * overrides handleIn, handleOut functions in a single class. + * Overrides handleIn, handleOut functions in a single class. */ struct InOutHandler : protected InOutHandlerInterface { InOutHandler(Handler* nextIn=0, Handler* nextOut=0) : in(*this, nextIn), out(*this, nextOut) {} MemFunRef in; MemFunRef out; }; - }; Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=675017&r1=675016&r2=675017&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Tue Jul 8 15:58:37 2008 @@ -174,7 +174,7 @@ } } -QPID_AUTO_TEST_CASE(testMessageReplication) { +QPID_AUTO_TEST_CASE(testMessageEnqueue) { // Enqueue on one broker, dequeue on another. ClusterFixture cluster(2); Client c0(cluster[0].getPort()); @@ -190,6 +190,28 @@ BOOST_CHECK_EQUAL(string("bar"), msg.getData()); } -// TODO aconway 2008-06-25: dequeue replication, failover. +QPID_AUTO_TEST_CASE(testMessageDequeue) { + // Enqueue on one broker, dequeue on two others. + ClusterFixture cluster (3); + Client c0(cluster[0].getPort()); + c0.session.queueDeclare("q"); + c0.session.messageTransfer(arg::content=TransferContent("foo", "q")); + c0.session.messageTransfer(arg::content=TransferContent("bar", "q")); + c0.session.close(); + + Message msg; + + Client c1(cluster[1].getPort()); + BOOST_CHECK(c1.subs.get(msg, "q")); + BOOST_CHECK_EQUAL("foo", msg.getData()); + + Client c2(cluster[2].getPort()); + BOOST_CHECK(c1.subs.get(msg, "q")); + BOOST_CHECK_EQUAL("bar", msg.getData()); + QueueQueryResult r = c2.session.queueQuery("q"); + BOOST_CHECK_EQUAL(0, r.getMessageCount()); +} + +// TODO aconway 2008-06-25: failover. QPID_AUTO_TEST_SUITE_END()