qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1176372 - in /qpid/branches/qpid-2920-active/qpid/cpp/src: ./ qpid/cluster/exp/
Date Tue, 27 Sep 2011 13:11:15 GMT
Author: aconway
Date: Tue Sep 27 13:11:14 2011
New Revision: 1176372

URL: http://svn.apache.org/viewvc?rev=1176372&view=rev
Log:
QPID-2920: Groundwork to enable queues hashed over multiple CPG groups.

Added:
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.cpp   (with props)
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.h   (with props)
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/hash.cpp   (with props)
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/hash.h   (with props)
Modified:
    qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk?rev=1176372&r1=1176371&r2=1176372&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/cluster.mk Tue Sep 27 13:11:14 2011
@@ -120,6 +120,10 @@ cluster2_la_SOURCES =				\
 	qpid/cluster/exp/Core.h			\
 	qpid/cluster/exp/EventHandler.cpp	\
 	qpid/cluster/exp/EventHandler.h		\
+	qpid/cluster/exp/Group.cpp		\
+	qpid/cluster/exp/Group.h		\
+	qpid/cluster/exp/hash.cpp		\
+	qpid/cluster/exp/hash.h			\
 	qpid/cluster/exp/HandlerBase.cpp	\
 	qpid/cluster/exp/HandlerBase.h		\
 	qpid/cluster/exp/MessageHandler.cpp	\

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp?rev=1176372&r1=1176371&r2=1176372&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.cpp Tue Sep
27 13:11:14 2011
@@ -23,6 +23,8 @@
 #include "BrokerContext.h"
 #include "QueueContext.h"
 #include "QueueHandler.h"
+#include "Multicaster.h"
+#include "hash.h"
 #include "qpid/framing/ClusterMessageRoutingBody.h"
 #include "qpid/framing/ClusterMessageRoutedBody.h"
 #include "qpid/framing/ClusterMessageEnqueueBody.h"
@@ -50,6 +52,8 @@ using namespace framing;
 using namespace broker;
 
 namespace {
+const ProtocolVersion pv;     // shorthand
+
 // noReplicate means the current thread is handling a message
 // received from the cluster so it should not be replicated.
 QPID_TSS bool tssNoReplicate = false;
@@ -59,6 +63,20 @@ QPID_TSS bool tssNoReplicate = false;
 QPID_TSS RoutingId tssRoutingId = 0;
 }
 
+// FIXME aconway 2011-09-26: de-const the broker::Cluster interface,
+// then de-const here.
+Multicaster& BrokerContext::mcaster(const broker::QueuedMessage& qm) {
+    return core.getGroup(hashof(qm)).getMulticaster();
+}
+
+Multicaster& BrokerContext::mcaster(const broker::Queue& q) {
+    return core.getGroup(hashof(q)).getMulticaster();
+}
+
+Multicaster& BrokerContext::mcaster(const std::string& name) {
+    return core.getGroup(hashof(name)).getMulticaster();
+}
+
 BrokerContext::ScopedSuppressReplication::ScopedSuppressReplication() {
     assert(!tssNoReplicate);
     tssNoReplicate = true;
@@ -89,16 +107,17 @@ bool BrokerContext::enqueue(Queue& queue
         std::string data(msg->encodedSize(),char());
         framing::Buffer buf(&data[0], data.size());
         msg->encode(buf);
-        core.mcast(ClusterMessageRoutingBody(ProtocolVersion(), tssRoutingId, data));
+        mcaster(queue).mcast(ClusterMessageRoutingBody(pv, tssRoutingId, data));
         core.getRoutingMap().put(tssRoutingId, msg);
     }
-    core.mcast(ClusterMessageEnqueueBody(ProtocolVersion(), tssRoutingId, queue.getName()));
+    mcaster(queue).mcast(ClusterMessageEnqueueBody(pv, tssRoutingId, queue.getName()));
     return false; // Strict order, wait for CPG self-delivery to enqueue.
 }
 
 void BrokerContext::routed(const boost::intrusive_ptr<Message>&) {
     if (tssRoutingId) {             // we enqueued at least one message.
-        core.mcast(ClusterMessageRoutedBody(ProtocolVersion(), tssRoutingId));
+        core.getGroup(tssRoutingId).getMulticaster().mcast(
+            ClusterMessageRoutedBody(pv, tssRoutingId));
         // Note: routingMap is cleaned up on CPG delivery in MessageHandler.
         tssRoutingId = 0;
     }
@@ -106,20 +125,19 @@ void BrokerContext::routed(const boost::
 
 void BrokerContext::acquire(const broker::QueuedMessage& qm) {
     if (tssNoReplicate) return;
-    core.mcast(ClusterMessageAcquireBody(
-                   ProtocolVersion(), qm.queue->getName(), qm.position));
+    mcaster(qm).mcast(ClusterMessageAcquireBody(pv, qm.queue->getName(), qm.position));
 }
 
 void BrokerContext::dequeue(const broker::QueuedMessage& qm) {
     if (!tssNoReplicate)
-        core.mcast(ClusterMessageDequeueBody(
-                       ProtocolVersion(), qm.queue->getName(), qm.position));
+        mcaster(qm).mcast(
+            ClusterMessageDequeueBody(pv, qm.queue->getName(), qm.position));
 }
 
 void BrokerContext::requeue(const broker::QueuedMessage& qm) {
     if (!tssNoReplicate)
-        core.mcast(ClusterMessageRequeueBody(
-                       ProtocolVersion(),
+        mcaster(qm).mcast(ClusterMessageRequeueBody(
+                       pv,
                        qm.queue->getName(),
                        qm.position,
                        qm.payload->getRedelivered()));
@@ -130,17 +148,17 @@ void BrokerContext::create(broker::Queue
     if (tssNoReplicate) return;
     assert(!QueueContext::get(q));
     boost::intrusive_ptr<QueueContext> context(
-        new QueueContext(q, core.getSettings().getConsumeLock(), core.getMulticaster()));
+        new QueueContext(q, core.getSettings().getConsumeLock(), mcaster(q.getName())));
     std::string data(q.encodedSize(), '\0');
     framing::Buffer buf(&data[0], data.size());
     q.encode(buf);
-    core.mcast(ClusterWiringCreateQueueBody(ProtocolVersion(), data));
+    mcaster(q).mcast(ClusterWiringCreateQueueBody(pv, data));
     // FIXME aconway 2011-07-29: Need asynchronous completion.
 }
 
 void BrokerContext::destroy(broker::Queue& q) {
     if (tssNoReplicate) return;
-    core.mcast(ClusterWiringDestroyQueueBody(ProtocolVersion(), q.getName()));
+     mcaster(q).mcast(ClusterWiringDestroyQueueBody(pv, q.getName()));
 }
 
 void BrokerContext::create(broker::Exchange& ex) {
@@ -148,28 +166,27 @@ void BrokerContext::create(broker::Excha
     std::string data(ex.encodedSize(), '\0');
     framing::Buffer buf(&data[0], data.size());
     ex.encode(buf);
-    core.mcast(ClusterWiringCreateExchangeBody(ProtocolVersion(), data));
+    mcaster(ex.getName()).mcast(ClusterWiringCreateExchangeBody(pv, data));
 }
 
 void BrokerContext::destroy(broker::Exchange& ex) {
     if (tssNoReplicate) return;
-    core.mcast(ClusterWiringDestroyExchangeBody(ProtocolVersion(), ex.getName()));
+    mcaster(ex.getName()).mcast(
+        ClusterWiringDestroyExchangeBody(pv, ex.getName()));
 }
 
 void BrokerContext::bind(broker::Queue& q, broker::Exchange& ex,
                          const std::string& key, const framing::FieldTable& args)
 {
     if (tssNoReplicate) return;
-    core.mcast(ClusterWiringBindBody(
-                   ProtocolVersion(), q.getName(), ex.getName(), key, args));
+    mcaster(q).mcast(ClusterWiringBindBody(pv, q.getName(), ex.getName(), key, args));
 }
 
 void BrokerContext::unbind(broker::Queue& q, broker::Exchange& ex,
                            const std::string& key, const framing::FieldTable& args)
 {
     if (tssNoReplicate) return;
-    core.mcast(ClusterWiringUnbindBody(
-                   ProtocolVersion(), q.getName(), ex.getName(), key, args));
+    mcaster(q).mcast(ClusterWiringUnbindBody(pv, q.getName(), ex.getName(), key, args));
 }
 
 // n is the number of consumers including the one just added.
@@ -190,3 +207,4 @@ void BrokerContext::stopped(broker::Queu
 }
 
 }} // namespace qpid::cluster
+

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h?rev=1176372&r1=1176371&r2=1176372&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/BrokerContext.h Tue Sep 27
13:11:14 2011
@@ -82,6 +82,10 @@ class BrokerContext : public broker::Clu
 
   private:
     uint32_t nextRoutingId();
+    // Get multicaster associated with a queue
+    Multicaster& mcaster(const broker::QueuedMessage& qm);
+    Multicaster& mcaster(const broker::Queue& q);
+    Multicaster& mcaster(const std::string&);
 
     Core& core;
     boost::intrusive_ptr<QueueHandler> queueHandler;

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp?rev=1176372&r1=1176371&r2=1176372&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.cpp Tue Sep 27 13:11:14
2011
@@ -38,26 +38,28 @@ namespace cluster {
 
 Core::Core(const Settings& s, broker::Broker& b) :
     broker(b),
-    eventHandler(new EventHandler(*this)),
-    multicaster(eventHandler->getCpg(), b.getPoller(), boost::bind(&Core::fatal, this)),
     settings(s)
 {
+    // FIXME aconway 2011-09-23: multi-group
+    groups.push_back(new Group(*this));
     boost::intrusive_ptr<QueueHandler> queueHandler(
-        new QueueHandler(*eventHandler, multicaster, settings));
-    eventHandler->add(queueHandler);
-    eventHandler->add(boost::intrusive_ptr<HandlerBase>(
-                          new WiringHandler(*eventHandler, queueHandler)));
-    eventHandler->add(boost::intrusive_ptr<HandlerBase>(
-                          new MessageHandler(*eventHandler)));
+        new QueueHandler(groups[0]->getEventHandler(), groups[0]->getMulticaster(),
settings));
+    groups[0]->getEventHandler().add(queueHandler);
+    groups[0]->getEventHandler().add(boost::intrusive_ptr<HandlerBase>(
+                              new WiringHandler(groups[0]->getEventHandler(), queueHandler,
broker)));
+    groups[0]->getEventHandler().add(boost::intrusive_ptr<HandlerBase>(
+                              new MessageHandler(groups[0]->getEventHandler(), *this)));
 
     std::auto_ptr<BrokerContext> bh(new BrokerContext(*this, queueHandler));
     brokerHandler = bh.get();
     // BrokerContext belongs to Broker
     broker.setCluster(std::auto_ptr<broker::Cluster>(bh));
-    eventHandler->start();
-    eventHandler->getCpg().join(s.name);
+    // FIXME aconway 2011-09-26: multi-group
+    groups[0]->getEventHandler().start();
+    groups[0]->getEventHandler().getCpg().join(s.name);
     // TODO aconway 2010-11-18: logging standards
-    QPID_LOG(notice, "cluster: joined " << s.name << ", member-id="<< eventHandler->getSelf());
+    // FIXME aconway 2011-09-26: multi-group
+    QPID_LOG(notice, "cluster: joined " << s.name << ", member-id="<< groups[0]->getEventHandler().getSelf());
 }
 
 void Core::initialize() {}
@@ -68,8 +70,8 @@ void Core::fatal() {
     broker::SignalHandler::shutdown();
 }
 
-void Core::mcast(const framing::AMQBody& body) {
-    multicaster.mcast(body);
+Group& Core::getGroup(size_t hashValue) {
+    return *groups[hashValue % groups.size()];
 }
 
 }} // namespace qpid::cluster

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.h?rev=1176372&r1=1176371&r2=1176372&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Core.h Tue Sep 27 13:11:14
2011
@@ -22,15 +22,16 @@
  *
  */
 
-#include <string>
-#include <memory>
 #include "LockedMap.h"
-#include "Multicaster.h"
+#include "Group.h"
 #include "Settings.h"
 #include "qpid/cluster/types.h"
 #include "qpid/cluster/Cpg.h"
 #include "qpid/broker/QueuedMessage.h"
 #include "qpid/sys/Time.h"
+#include <boost/intrusive_ptr.hpp>
+#include <string>
+#include <memory>
 
 // TODO aconway 2010-10-19: experimental cluster code.
 
@@ -49,7 +50,8 @@ class EventHandler;
 class BrokerContext;
 
 /**
- * Cluster core state machine.
+ * Cluster core.
+ * 
  * Holds together the various objects that implement cluster behavior,
  * and holds state that is shared by multiple components.
  *
@@ -59,6 +61,7 @@ class Core
 {
   public:
     typedef LockedMap<RoutingId, boost::intrusive_ptr<broker::Message> > RoutingMap;
+    typedef std::vector<boost::intrusive_ptr<Group> > Groups;
 
     /** Constructed during Plugin::earlyInitialize() */
     Core(const Settings&, broker::Broker&);
@@ -69,13 +72,8 @@ class Core
     /** Shut down broker due to fatal error. Caller should log a critical message */
     void fatal();
 
-    /** Multicast an event */
-    void mcast(const framing::AMQBody&);
-
     broker::Broker& getBroker() { return broker; }
-    EventHandler& getEventHandler() { return *eventHandler; }
     BrokerContext& getBrokerContext() { return *brokerHandler; }
-    Multicaster& getMulticaster() { return multicaster; }
 
     /** Map of messages that are currently being routed.
      * Used to pass messages being routed from BrokerContext to MessageHandler
@@ -83,13 +81,16 @@ class Core
     RoutingMap& getRoutingMap() { return routingMap; }
 
     const Settings& getSettings() const { return settings; }
+
+    /** Get group by hash value. */
+    Group& getGroup(size_t hashValue);
+
   private:
     broker::Broker& broker;
-    std::auto_ptr<EventHandler> eventHandler; // Handles CPG events.
     BrokerContext* brokerHandler; // Handles broker events.
     RoutingMap routingMap;
-    Multicaster multicaster;
     Settings settings;
+    Groups groups;
 };
 }} // namespace qpid::cluster
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp?rev=1176372&r1=1176371&r2=1176372&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.cpp Tue Sep
27 13:11:14 2011
@@ -33,10 +33,10 @@
 namespace qpid {
 namespace cluster {
 
-EventHandler::EventHandler(Core& c) :
-    core(c),
-    cpg(*this),                 // FIXME aconway 2010-10-20: belongs on Core.
-    dispatcher(cpg, core.getBroker().getPoller(), boost::bind(&Core::fatal, &core)),
+EventHandler::EventHandler(boost::shared_ptr<sys::Poller> poller,
+                           boost::function<void()> onError) :
+    cpg(*this),
+    dispatcher(cpg, poller, onError),
     self(cpg.self())
 {}
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.h?rev=1176372&r1=1176371&r2=1176372&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/EventHandler.h Tue Sep 27
13:11:14 2011
@@ -37,7 +37,6 @@ class AMQBody;
 }
 
 namespace cluster {
-class Core;
 class HandlerBase;
 
 /**
@@ -48,7 +47,9 @@ class HandlerBase;
 class EventHandler : public Cpg::Handler
 {
   public:
-    EventHandler(Core&);
+    EventHandler(boost::shared_ptr<sys::Poller> poller,
+                 boost::function<void()> onError);
+    
     ~EventHandler();
 
     /** Add a handler */
@@ -75,13 +76,11 @@ class EventHandler : public Cpg::Handler
 
     MemberId getSender() { return sender; }
     MemberId getSelf() { return self; }
-    Core& getCore() { return core; }
     Cpg& getCpg() { return cpg; }
 
   private:
     void invoke(const framing::AMQBody& body);
 
-    Core& core;
     Cpg cpg;
     PollerDispatch dispatcher;
     MemberId sender;              // sender of current event.

Added: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.cpp?rev=1176372&view=auto
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.cpp (added)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.cpp Tue Sep 27 13:11:14
2011
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Group.h"
+#include "Core.h"
+#include "EventHandler.h"
+#include "Multicaster.h"
+
+#include "qpid/broker/Broker.h"
+
+namespace qpid {
+namespace framing {
+class AMQFrame;
+class AMQBody;
+}
+
+namespace cluster {
+
+Group::Group(Core& core) :
+    eventHandler(
+        new EventHandler(core.getBroker().getPoller(),
+                         boost::bind(&Core::fatal, &core))),
+    multicaster(
+        new Multicaster(eventHandler->getCpg(),
+                        core.getBroker().getPoller(),
+                        boost::bind(&Core::fatal, &core)))
+{}
+
+Group::~Group() {}
+
+void Group::mcast(const framing::AMQBody& b) { multicaster->mcast(b); }
+void Group::mcast(const framing::AMQFrame& f) { multicaster->mcast(f); }
+}} // namespace qpid::cluster::exp

Propchange: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.h?rev=1176372&view=auto
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.h (added)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.h Tue Sep 27 13:11:14
2011
@@ -0,0 +1,62 @@
+#ifndef QPID_CLUSTER_EXP_GROUP_H
+#define QPID_CLUSTER_EXP_GROUP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+#include <memory>
+
+namespace qpid {
+namespace framing {
+class AMQBody;
+class AMQFrame;
+}
+
+namespace cluster {
+
+class Cpg;
+class Core;
+class EventHandler;
+class Multicaster;
+
+/**
+ * A CPG instance with an event handler and a multi-caster, 
+ * along with all the per-group handler objects.
+ */
+class Group : public RefCounted
+{
+  public:
+    Group(Core& core);
+    ~Group();
+
+    EventHandler& getEventHandler() { return *eventHandler; }
+    Multicaster& getMulticaster() { return *multicaster; }
+
+    void mcast(const framing::AMQBody&);
+    void mcast(const framing::AMQFrame&);
+  private:
+    std::auto_ptr<EventHandler> eventHandler;
+    std::auto_ptr<Multicaster> multicaster;
+};
+}} // namespace qpid::cluster::exp
+
+#endif  /*!QPID_CLUSTER_EXP_GROUP_H*/

Propchange: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/Group.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp?rev=1176372&r1=1176371&r2=1176372&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp Tue Sep
27 13:11:14 2011
@@ -39,9 +39,10 @@ namespace qpid {
 namespace cluster {
 using namespace broker;
 
-MessageHandler::MessageHandler(EventHandler& e) :
+MessageHandler::MessageHandler(EventHandler& e, Core& c) :
     HandlerBase(e),
-    broker(e.getCore().getBroker())
+    broker(c.getBroker()),
+    core(c)
 {}
 
 bool MessageHandler::invoke(const framing::AMQBody& body) {
@@ -49,7 +50,7 @@ bool MessageHandler::invoke(const framin
 }
 
 void MessageHandler::routing(RoutingId routingId, const std::string& message) {
-    if (sender() == self()) return; // Already in getCore().getRoutingMap()
+    if (sender() == self()) return; // Already in core.getRoutingMap()
     boost::intrusive_ptr<Message> msg = new Message;
     // FIXME aconway 2010-10-28: decode message in bounded-size buffers.
     framing::Buffer buf(const_cast<char*>(&message[0]), message.size());
@@ -70,7 +71,7 @@ void MessageHandler::enqueue(RoutingId r
     boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue failed");
     boost::intrusive_ptr<Message> msg;
     if (sender() == self())
-        msg = eventHandler.getCore().getRoutingMap().get(routingId);
+        msg = core.getRoutingMap().get(routingId);
     else
         msg = memberMap[sender()].routingMap[routingId];
     if (!msg) throw Exception(QPID_MSG("Cluster enqueue on " << q
@@ -81,7 +82,7 @@ void MessageHandler::enqueue(RoutingId r
 
 void MessageHandler::routed(RoutingId routingId) {
     if (sender() == self())
-        eventHandler.getCore().getRoutingMap().erase(routingId);
+        core.getRoutingMap().erase(routingId);
     else
         memberMap[sender()].routingMap.erase(routingId);
 }

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h?rev=1176372&r1=1176371&r2=1176372&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.h Tue Sep
27 13:11:14 2011
@@ -40,6 +40,7 @@ class Queue;
 namespace cluster {
 class EventHandler;
 class BrokerContext;
+class Core;
 
 // FIXME aconway 2011-06-28: doesn't follow the same Handler/Replica/Context pattern as for
queue.
 // Make this consistent.
@@ -51,7 +52,7 @@ class MessageHandler : public framing::A
                        public HandlerBase
 {
   public:
-    MessageHandler(EventHandler&);
+    MessageHandler(EventHandler&, Core&);
 
     bool invoke(const framing::AMQBody& body);
 
@@ -73,6 +74,7 @@ class MessageHandler : public framing::A
 
     broker::Broker& broker;
     MemberMap memberMap;
+    Core& core;
 };
 }} // namespace qpid::cluster
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp?rev=1176372&r1=1176371&r2=1176372&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp Tue Sep
27 13:11:14 2011
@@ -23,6 +23,7 @@
 #include "Multicaster.h"
 #include "qpid/cluster/types.h"
 #include "BrokerContext.h"      // for ScopedSuppressReplication
+#include "hash.h"
 #include "qpid/framing/ProtocolVersion.h"
 #include "qpid/framing/ClusterQueueResubscribeBody.h"
 #include "qpid/framing/ClusterQueueSubscribeBody.h"
@@ -39,7 +40,7 @@ QueueContext::QueueContext(broker::Queue
     : timer(boost::bind(&QueueContext::timeout, this),
             q.getBroker()->getTimer(),
             consumeLock),
-      queue(q), mcast(m), consumers(0)
+      queue(q), mcast(m), consumers(0), hash(hashof(q.getName()))
 {
     q.setClusterContext(boost::intrusive_ptr<QueueContext>(this));
     q.stopConsumers();          // Stop queue initially.

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h?rev=1176372&r1=1176371&r2=1176372&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.h Tue Sep 27
13:11:14 2011
@@ -76,9 +76,6 @@ class QueueContext : public RefCounted {
      */
     void cancel(size_t n);
 
-    /** Get the context for a broker queue. */
-    static boost::intrusive_ptr<QueueContext> get(broker::Queue&);
-
     /** Called in timer thread when the timer runs out. */
     void timeout();
 
@@ -91,12 +88,19 @@ class QueueContext : public RefCounted {
     /** Called by MesageHandler when a message is dequeued. */
     broker::QueuedMessage dequeue(uint32_t position);
 
-  private:
+    size_t getHash() const { return hash; }
+
+
+    /** Get the cluster context for a broker queue. */
+    static boost::intrusive_ptr<QueueContext> get(broker::Queue&) ;
+
+private:
     sys::Mutex lock;
     CountdownTimer timer;
     broker::Queue& queue;       // FIXME aconway 2011-06-08: should be shared/weak ptr?
     Multicaster& mcast;
     size_t consumers;
+    size_t hash;
 
     typedef LockedMap<uint32_t, broker::QueuedMessage> UnackedMap; // FIXME aconway
2011-09-15: don't need read/write map? Rename
     UnackedMap unacked;

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp?rev=1176372&r1=1176371&r2=1176372&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.cpp Tue Sep
27 13:11:14 2011
@@ -41,9 +41,10 @@ using namespace broker;
 using framing::FieldTable;
 
 WiringHandler::WiringHandler(EventHandler& e,
-                             const boost::intrusive_ptr<QueueHandler>& qh) :
+                             const boost::intrusive_ptr<QueueHandler>& qh,
+                             broker::Broker& b) :
     HandlerBase(e),
-    broker(e.getCore().getBroker()),
+    broker(b),
     recovery(broker.getQueues(), broker.getExchanges(),
              broker.getLinks(), broker.getDtxManager()),
     queueHandler(qh)

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h?rev=1176372&r1=1176371&r2=1176372&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/WiringHandler.h Tue Sep 27
13:11:14 2011
@@ -51,7 +51,7 @@ class WiringHandler : public framing::AM
                       public HandlerBase
 {
   public:
-    WiringHandler(EventHandler&, const boost::intrusive_ptr<QueueHandler>&
qh);
+    WiringHandler(EventHandler&, const boost::intrusive_ptr<QueueHandler>&
qh, broker::Broker&);
 
     bool invoke(const framing::AMQBody& body);
 

Added: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/hash.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/hash.cpp?rev=1176372&view=auto
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/hash.cpp (added)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/hash.cpp Tue Sep 27 13:11:14
2011
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "hash.h"
+#include "QueueContext.h"
+#include "qpid/broker/QueuedMessage.h"
+#include <boost/functional/hash.hpp>
+
+namespace qpid {
+namespace cluster {
+size_t hashof(const std::string& s) { return boost::hash_value(s); }
+size_t hashof(const QueueContext& qc) { return qc.getHash(); }
+size_t hashof(const broker::Queue& q) {
+    return QueueContext::get(const_cast<broker::Queue&>(q))->getHash();
+}
+size_t hashof(uint32_t n) { return boost::hash_value(n); }
+size_t hashof(const broker::QueuedMessage& qm) { return hashof(*qm.queue); }
+
+}} // namespace qpid::cluster

Propchange: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/hash.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/hash.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/hash.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/hash.h?rev=1176372&view=auto
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/hash.h (added)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/hash.h Tue Sep 27 13:11:14
2011
@@ -0,0 +1,53 @@
+#ifndef QPID_CLUSTER_EXP_HASH_H
+#define QPID_CLUSTER_EXP_HASH_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/IntegerTypes.h"
+#include <stdlib.h>
+#include <string>
+
+namespace qpid {
+namespace broker {
+class Queue;
+class QueuedMessage;
+}
+
+namespace cluster {
+
+class QueueContext;
+
+/**@file hash functions */
+
+// The following all uses the cached hash value on the Queue::getContext()
+// FIXME aconway 2011-09-26: de-const broker::Cluster interface then de-const here.
+size_t hashof(const broker::Queue& q);
+size_t hashof(const QueueContext& qc);
+size_t hashof(const broker::QueuedMessage& qm);
+
+// Hash directly from a value string.
+size_t hashof(const std::string& s);
+size_t hashof(uint32_t n);
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_EXP_HASH_H*/

Propchange: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/hash.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/hash.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message