qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r725853 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/cluster/
Date Thu, 11 Dec 2008 22:50:03 GMT
Author: aconway
Date: Thu Dec 11 14:50:02 2008
New Revision: 725853

URL: http://svn.apache.org/viewvc?rev=725853&view=rev
Log:
cluster: refactor multicast concerns into separate Multicaster class with separate locking.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterLeaveException.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/cluster.mk
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=725853&r1=725852&r2=725853&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Thu Dec 11 14:50:02 2008
@@ -46,6 +46,9 @@
   qpid/cluster/ClusterMap.cpp \
   qpid/cluster/FailoverExchange.h \
   qpid/cluster/FailoverExchange.cpp \
+  qpid/cluster/Multicaster.h \
+  qpid/cluster/Multicaster.cpp \
+  qpid/cluster/ClusterLeaveException.h \
   qpid/cluster/Quorum.h
 
 cluster_la_LIBADD=  -lcpg $(libcman) libqpidbroker.la libqpidclient.la

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=725853&r1=725852&r2=725853&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Dec 11 14:50:02 2008
@@ -90,20 +90,19 @@
     name(name_),
     myUrl(url_),
     myId(cpg.self()),
+    readMax(readMax_),
     cpgDispatchHandle(
         cpg,
         boost::bind(&Cluster::dispatch, this, _1), // read
         0,                                         // write
         boost::bind(&Cluster::disconnect, this, _1) // disconnect
     ),
-    deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
-    mcastQueue(boost::bind(&Cluster::sendMcast, this, _1), poller),
-    mcastId(0),
+    mcast(cpg, poller),
     mgmtObject(0),
+    deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
     state(INIT),
     lastSize(0),
-    lastBroker(false),
-    readMax(readMax_)
+    lastBroker(false)
 {
     ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
     if (agent != 0){
@@ -116,7 +115,6 @@
     failoverExchange.reset(new FailoverExchange(this));
     cpgDispatchHandle.startWatch(poller);
     deliverQueue.start();
-    mcastQueue.start();
     QPID_LOG(notice, *this << " joining cluster " << name << " with url="
<< myUrl);
     if (quorum_) quorum.init();
     cpg.join(name);
@@ -135,49 +133,6 @@
     connections.erase(id);
 }
 
-void Cluster::mcastControl(const framing::AMQBody& body, const ConnectionId& id,
uint32_t seq) {
-    Event e(Event::control(body, id, seq));
-    QPID_LOG(trace, *this << " MCAST " << e << ": " << body);
-    mcast(e);
-}
-
-void Cluster::mcastControl(const framing::AMQBody& body) {
-    Event e(Event::control(body, ConnectionId(myId,0), ++mcastId));
-    QPID_LOG(trace, *this << " MCAST " << e << ": " << body);
-    mcast(e);
-}
-
-void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection,
uint32_t id) {
-    Event e(DATA, connection, size, id);
-    memcpy(e.getData(), data, size);
-    {
-        Lock l(lock);
-        if (state <= CATCHUP && e.isConnection()) {
-            // Stall outgoing connection events untill we are fully READY
-            QPID_LOG(trace, *this << " MCAST deferred: " << e );
-            mcastStallQueue.push_back(e);
-            return;
-        }
-    }
-    QPID_LOG(trace, *this << " MCAST " << e);
-    mcast(e);
-}
-
-void Cluster::mcast(const Event& e) { mcastQueue.push(e); }
-
-void Cluster::sendMcast(PollableEventQueue::Queue& values) {
-    try {
-        PollableEventQueue::Queue::iterator i = values.begin();
-        while (i != values.end() && i->mcast(cpg))
-            ++i;
-        values.erase(values.begin(), i);
-    }
-    catch (const std::exception& e) {
-        QPID_LOG(critical, "Multicast failure: " << e.what());
-        leave();
-    }
-}
-
 std::vector<Url> Cluster::getUrls() const {
     Lock l(lock);
     return getUrls(l);
@@ -315,7 +270,6 @@
 void Cluster::dispatch(sys::DispatchHandle& h) {
     try {
         cpg.dispatchAll();
-        mcastQueue.start();     // In case it was stopped by flow control.
         h.rewatch();
     } catch (const std::exception& e) {
         QPID_LOG(critical, *this << " error in cluster dispatch: " << e.what());
@@ -361,7 +315,9 @@
     if (state == INIT) {        // First configChange
         if (map.aliveCount() == 1) {
             setClusterId(true);
+            // FIXME aconway 2008-12-11: Centralize transition to READY and associated actions
eg mcast.release()
             state = READY;
+            mcast.release();
             QPID_LOG(notice, *this << " first in cluster");
             if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
             map = ClusterMap(myId, myUrl, true);
@@ -370,7 +326,7 @@
         else {                  // Joining established group.
             state = NEWBIE;
             QPID_LOG(info, *this << " joining cluster: " << map);
-            mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()));
+            mcast.mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), myId);
         }
     }
     else if (state >= READY && memberChange)
@@ -384,7 +340,7 @@
     if (state == READY && map.isNewbie(id)) {
         state = OFFER;
         QPID_LOG(info, *this << " send dump-offer to " << id);
-        mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId));
+        mcast.mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId), myId);
     }
 }
 
@@ -414,10 +370,10 @@
         memberUpdate(l);
     if (state == CATCHUP && id == myId) {
         state = READY;
+        mcast.release();
         QPID_LOG(notice, *this << " caught up, active cluster member");
         if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
-        for_each(mcastStallQueue.begin(), mcastStallQueue.end(), boost::bind(&Cluster::mcast,
this, _1));
-        mcastStallQueue.clear();
+        mcast.release();
     }
 }
 
@@ -432,6 +388,7 @@
         }
         else {                  // Another offer was first.
             state = READY;
+            mcast.release();
             QPID_LOG(info, *this << " cancelled dump offer to " << dumpee);
             tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer.
         }
@@ -461,6 +418,7 @@
                        boost::bind(&Cluster::dumpOutError, this, _1)));
 }
 
+// Called in dump thread.
 void Cluster::dumpInDone(const ClusterMap& m) {
     Lock l(lock);
     dumpedMap = m;
@@ -471,8 +429,7 @@
     if (state == LEFT) return;
     if (state == DUMPEE && dumpedMap) {
         map = *dumpedMap;
-        mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()));
-        // Don't flush the mcast queue till we are READY, on self-deliver.
+        mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId);
         state = CATCHUP;
         QPID_LOG(info, *this << " received dump, starting catch-up");
         deliverQueue.start();
@@ -487,6 +444,7 @@
 void Cluster::dumpOutDone(Lock& l) {
     assert(state == DUMPER);
     state = READY;
+    mcast.release();
     QPID_LOG(info, *this << " sent dump");
     deliverQueue.start();
     tryMakeOffer(map.firstNewbie(), l); // Try another offer
@@ -523,7 +481,7 @@
 
 void Cluster::stopFullCluster(Lock& ) {
     QPID_LOG(notice, *this << " shutting down cluster " << name);
-    mcastControl(ClusterShutdownBody());
+    mcast.mcastControl(ClusterShutdownBody(), myId);
 }
 
 void Cluster::memberUpdate(Lock& l) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=725853&r1=725852&r2=725853&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Dec 11 14:50:02 2008
@@ -26,6 +26,7 @@
 #include "ConnectionMap.h"
 #include "FailoverExchange.h"
 #include "Quorum.h"
+#include "Multicaster.h"
 
 #include "qpid/broker/Broker.h"
 #include "qpid/sys/PollableQueue.h"
@@ -55,8 +56,10 @@
 class Connection;
 
 /**
- * Connection to the cluster.
+ * Connection to the cluster
  *
+ * Threading notes: 3 thread categories: connection, deliver, dump.
+ * 
  */
 class Cluster : private Cpg::Handler, public management::Manageable {
   public:
@@ -70,29 +73,26 @@
 
     virtual ~Cluster();
 
-    // Connection map
+    // Connection map - called in connection threads.
     void insert(const ConnectionPtr&); 
     void erase(ConnectionId);       
     
-    // Send to the cluster 
-    void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&, uint32_t
id);
-    void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id);
-
-    // URLs of current cluster members.
+    // URLs of current cluster members - called in connection threads.
     std::vector<Url> getUrls() const;
     boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange;
}
 
-    // Leave the cluster
+    // Leave the cluster - called in any thread.
     void leave();
 
-    // Dump completedJo
+    // Dump completed - called in dump thread
     void dumpInDone(const ClusterMap&);
 
     MemberId getId() const;
     broker::Broker& getBroker() const;
+    Multicaster& getMulticast() { return mcast; }
 
     boost::function<bool ()> isQuorate;
-    void checkQuorum();
+    void checkQuorum();         // called in connection threads.
 
     size_t getReadMax() { return readMax; }
     
@@ -109,22 +109,17 @@
     // The parameter makes it hard to forget since you have to have an instance of
     // a Lock to call the unlocked functions.
 
-    void mcastControl(const framing::AMQBody& controlBody);
-    void mcast(const Event& e);
-
     void leave(Lock&);
     std::vector<Url> getUrls(Lock&) const;
 
-    void sendMcast(PollableEventQueue::Queue& );
-    
-    // Called via CPG, deliverQueue or DumpClient threads. 
+    // Make an offer if we can - called in deliver thread.
     void tryMakeOffer(const MemberId&, Lock&);
 
     // Called in main thread in ~Broker.
     void brokerShutdown();
 
     // Cluster controls implement XML methods from cluster.xml.
-    // May be called in CPG thread via deliver() OR in deliverQueue thread.
+    // Called in deliver thread.
     // 
     void dumpRequest(const MemberId&, const std::string&, Lock&);
     void dumpOffer(const MemberId& dumper, uint64_t dumpee, const framing::Uuid&,
Lock&);
@@ -134,6 +129,7 @@
     void delivered(PollableEventQueue::Queue&); // deliverQueue callback
     void deliveredEvent(const Event&); 
 
+    // Helper, called in deliver thread.
     void dumpStart(const MemberId& dumpee, const Url& url, Lock&);
 
     // CPG callbacks, called in CPG IO thread.
@@ -177,25 +173,31 @@
 
     void setClusterId(const framing::Uuid&);
 
-    mutable sys::Monitor lock;
-
+    // Immutable members set on construction, never changed.
     broker::Broker& broker;
     boost::shared_ptr<sys::Poller> poller;
     Cpg cpg;
     const std::string name;
     const Url myUrl;
     const MemberId myId;
-
-    ConnectionMap connections;
+    const size_t readMax;
+    framing::Uuid clusterId;
     NoOpConnectionOutputHandler shadowOut;
     sys::DispatchHandle cpgDispatchHandle;
-    PollableEventQueue deliverQueue, mcastQueue;
-    PlainEventQueue mcastStallQueue;
-    uint32_t mcastId;
-    framing::Uuid clusterId;
 
+
+    // Thread safe members
+    Multicaster mcast;
     qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle
+    PollableEventQueue deliverQueue;
+    ConnectionMap connections;
+    boost::shared_ptr<FailoverExchange> failoverExchange;
+    Quorum quorum;
+
+    // Remaining members are protected by lock.
+    mutable sys::Monitor lock;
 
+    //    Local cluster state, cluster map
     enum {
         INIT,                   ///< Initial state, no CPG messages received.
         NEWBIE,                 ///< Sent dump request, waiting for dump offer.
@@ -206,17 +208,13 @@
         DUMPER,                 ///< Offer accepted, sending a state dump.
         LEFT                    ///< Final state, left the cluster.
     } state;
-    
     ClusterMap map;
-    sys::Thread dumpThread;
-    boost::optional<ClusterMap> dumpedMap;
-    
     size_t lastSize;
     bool lastBroker;
-    boost::shared_ptr<FailoverExchange> failoverExchange;
 
-    Quorum quorum;
-    size_t readMax;
+    //     Dump related
+    sys::Thread dumpThread;
+    boost::optional<ClusterMap> dumpedMap;
 
   friend std::ostream& operator<<(std::ostream&, const Cluster&);
   friend class ClusterDispatcher;

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterLeaveException.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterLeaveException.h?rev=725853&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterLeaveException.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterLeaveException.h Thu Dec 11 14:50:02
2008
@@ -0,0 +1,35 @@
+#ifndef QPID_CLUSTER_CLUSTERLEAVEEXCEPTION_H
+#define QPID_CLUSTER_CLUSTERLEAVEEXCEPTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/Exception.h"
+
+namespace qpid {
+namespace cluster {
+
+struct ClusterLeaveException : public Exception
+{
+    ClusterLeaveException(const std::string& message=std::string()) : Exception(message)
{}
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_CLUSTERLEAVEEXCEPTION_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterLeaveException.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterLeaveException.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=725853&r1=725852&r2=725853&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Thu Dec 11 14:50:02 2008
@@ -163,7 +163,7 @@
             // closed and process any outstanding frames from the cluster
             // until self-delivery of deliver-close.
             output.setOutputHandler(discardHandler);
-            cluster.mcastControl(ClusterConnectionDeliverCloseBody(), self, ++mcastSeq);
+            cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self);
         }
     }
     catch (const std::exception& e) {
@@ -193,7 +193,7 @@
     }
     else {                      // Multicast local connections.
         assert(isLocal());
-        cluster.mcastBuffer(buffer, size, self, ++mcastSeq);
+        cluster.getMulticast().mcastBuffer(buffer, size, self);
     }
     return size;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=725853&r1=725852&r2=725853&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Thu Dec 11 14:50:02 2008
@@ -163,7 +163,6 @@
     framing::FrameDecoder localDecoder;
     framing::FrameDecoder mcastDecoder;
     broker::Connection connection;
-    framing::SequenceNumber mcastSeq;
     framing::SequenceNumber deliverSeq;
     framing::ChannelId currentChannel;
     boost::shared_ptr<broker::TxBuffer> txBuffer;

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=725853&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Thu Dec 11 14:50:02 2008
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Multicaster.h"
+#include "Cpg.h"
+#include "ClusterLeaveException.h"
+#include "qpid/log/Statement.h"
+
+
+namespace qpid {
+namespace cluster {
+
+Multicaster::Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>& poller)
:
+    cpg(cpg_), queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
+    holding(true)
+{
+    queue.start();
+}
+
+void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId&
id) {
+    mcast(Event::control(body, id));
+}
+
+void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& id)
{
+    Event e(DATA, id, size);
+    memcpy(e.getData(), data, size);
+    mcast(e);
+}
+
+void Multicaster::mcast(const Event& e) {
+    {
+        sys::Mutex::ScopedLock l(lock);
+        if (e.getType() == DATA && e.isConnection() && holding) {
+            holdingQueue.push_back(e); 
+            QPID_LOG(trace, " MCAST held: " << e );
+            return;
+        }
+    }
+    queue.push(e);
+}
+
+void Multicaster::sendMcast(PollableEventQueue::Queue& values) {
+    try {
+        PollableEventQueue::Queue::iterator i = values.begin();
+        while (i != values.end() && i->mcast(cpg)) {
+            QPID_LOG(trace, " MCAST " << *i);
+            ++i;
+        }
+        values.erase(values.begin(), i);
+    }
+    catch (const std::exception& e) {
+        throw ClusterLeaveException(e.what());
+    }
+}
+
+void Multicaster::release() {
+    sys::Mutex::ScopedLock l(lock);
+    holding = false;
+    std::for_each(holdingQueue.begin(), holdingQueue.end(), boost::bind(&Multicaster::mcast,
this, _1));
+    holdingQueue.clear();
+}
+
+}} // namespace qpid::cluster

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=725853&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h Thu Dec 11 14:50:02 2008
@@ -0,0 +1,69 @@
+#ifndef QPID_CLUSTER_MULTICASTER_H
+#define QPID_CLUSTER_MULTICASTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "Event.h"
+#include "qpid/sys/PollableQueue.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+
+namespace sys {
+class Poller;
+}
+
+namespace cluster {
+
+class Cpg;
+
+/**
+ * Multicast to the cluster. Shared, thread safe object.
+ */
+class Multicaster
+{
+  public:
+    /** Starts in holding mode: connection data events are held, other events are mcast */
+    Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>& );
+    void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&);
+    void mcastBuffer(const char*, size_t, const ConnectionId&);
+    void mcast(const Event& e);
+    /** End holding mode, held events are mcast */
+    void release();
+
+  private:
+    typedef sys::PollableQueue<Event> PollableEventQueue;
+    typedef std::deque<Event> PlainEventQueue;
+
+    void sendMcast(PollableEventQueue::Queue& );
+
+    sys::Mutex lock;
+    Cpg& cpg;
+    PollableEventQueue queue;
+    bool holding;
+    PlainEventQueue holdingQueue;
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_MULTICASTER_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=725853&r1=725852&r2=725853&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Thu Dec 11 14:50:02
2008
@@ -99,8 +99,8 @@
     // Send it anyway to keep the doOutput chain going until we are sure there's no more
output
     // (in deliverDoOutput)
     //
-    // FIXME aconway 2008-10-16: use ++parent.mcastSeq as sequence no,not 0
-    parent.getCluster().mcastControl(ClusterConnectionDeliverDoOutputBody(ProtocolVersion(),
request), parent.getId(), 0);
+    parent.getCluster().getMulticast().mcastControl(
+        ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), request), parent.getId());
     QPID_LOG(trace, parent << "Send doOutput request for " << request);
 }
 



Mime
View raw message