qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r740459 - in /qpid/trunk/qpid/cpp: src/ src/qpid/cluster/ src/tests/ xml/
Date Tue, 03 Feb 2009 21:28:17 GMT
Author: aconway
Date: Tue Feb  3 21:28:14 2009
New Revision: 740459

URL: http://svn.apache.org/viewvc?rev=740459&view=rev
Log:
Fix for race conditions in cluster join.

 - ConnectionDecoder: separated from Connection. 
 - cluster/PollableQueue: stop processing frames if PollableQueue is stopped.
 - move state checks in event-queue handler to frame-queue handler.

Added:
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h   (with props)
    qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
      - copied, changed from r740374, qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h
Removed:
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h
Modified:
    qpid/trunk/qpid/cpp/src/cluster.mk
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
    qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
    qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=740459&r1=740458&r2=740459&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Tue Feb  3 21:28:14 2009
@@ -40,7 +40,7 @@
   $(CMAN_SOURCES)				\
   qpid/cluster/Cluster.cpp			\
   qpid/cluster/Cluster.h			\
-  qpid/cluster/ClusterQueueHandler.h		\
+  qpid/cluster/PollableQueue.h			\
   qpid/cluster/ClusterMap.cpp			\
   qpid/cluster/ClusterMap.h			\
   qpid/cluster/ClusterPlugin.cpp		\
@@ -49,8 +49,13 @@
   qpid/cluster/ConnectionCodec.cpp		\
   qpid/cluster/ConnectionCodec.h		\
   qpid/cluster/ConnectionMap.h			\
+  qpid/cluster/ConnectionMap.cpp		\
   qpid/cluster/Cpg.cpp				\
   qpid/cluster/Cpg.h				\
+  qpid/cluster/Decoder.cpp			\
+  qpid/cluster/Decoder.h			\
+  qpid/cluster/ConnectionDecoder.cpp		\
+  qpid/cluster/ConnectionDecoder.h		\
   qpid/cluster/Dispatchable.h			\
   qpid/cluster/UpdateClient.cpp			\
   qpid/cluster/UpdateClient.h			\
@@ -71,6 +76,7 @@
   qpid/cluster/ThreadDispatch.h			\
   qpid/cluster/ProxyInputHandler.h		\
   qpid/cluster/Quorum.h				\
+  qpid/cluster/Updatee.h			\
   qpid/cluster/WriteEstimate.cpp		\
   qpid/cluster/WriteEstimate.h			\
   qpid/cluster/types.h 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=740459&r1=740458&r2=740459&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Feb  3 21:28:14 2009
@@ -20,7 +20,6 @@
 #include "Connection.h"
 #include "UpdateClient.h"
 #include "FailoverExchange.h"
-#include "ClusterQueueHandler.h"
 
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/SessionState.h"
@@ -92,8 +91,16 @@
     writeEstimate(writeEstimate_),
     mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
     dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
-    deliverEventQueue(ClusterQueueHandler<Event>(this, boost::bind(&Cluster::deliveredEvent, this, _1), "event queue"),  poller),
-    deliverFrameQueue(ClusterQueueHandler<EventFrame>(this, boost::bind(&Cluster::deliveredFrame, this, _1), "frame queue"), poller),
+    deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1),
+                      boost::bind(&Cluster::leave, this),
+                      "Error decoding events",
+                      poller),
+    deliverFrameQueue(boost::bind(&Cluster::deliveredFrame, this, _1),
+                      boost::bind(&Cluster::leave, this),
+                      "Error delivering frames",
+                      poller),
+    connections(*this),
+    decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)),
     state(INIT),
     lastSize(0),
     lastBroker(false),
@@ -121,12 +128,23 @@
     if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
 }
 
-void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
-    connections.insert(c->getId(), c);
+// Called in connection thread to insert a client connection.
+void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
+    Lock l(lock);
+    connections.insert(c);
 }
 
-void Cluster::erase(ConnectionId id) {
+// Called in connection thread to insert an updated shadow connection.
+void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
+    Lock l(lock);
+    assert(state <= UPDATEE);   // Only during update.
+    connections.insert(c);
+}
+
+void Cluster::erase(const ConnectionId& id) {
+    // Called only by Connection::deliverClose in deliver thread, no need to lock.
     connections.erase(id);
+    decoder.erase(id);
 }
 
 std::vector<string> Cluster::getIds() const {
@@ -168,17 +186,7 @@
     }
 }
 
-boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& connectionId)  {
-    boost::intrusive_ptr<Connection> cp = connections.find(connectionId);
-    if (!cp && connectionId.getMember() != myId) { // New shadow connection
-        std::ostringstream mgmtId;
-        mgmtId << name << ":"  << connectionId;
-        cp = new Connection(*this, shadowOut, mgmtId.str(), connectionId);
-        connections.insert(connectionId, cp);
-    }
-    return cp;
-}
-
+// Deliver CPG message.
 void Cluster::deliver(
     cpg_handle_t /*handle*/,
     cpg_name* /*group*/,
@@ -187,58 +195,52 @@
     void* msg,
     int msg_len) 
 {
-    Mutex::ScopedLock l(lock);
     MemberId from(nodeid, pid);
     framing::Buffer buf(static_cast<char*>(msg), msg_len);
     Event e(Event::decodeCopy(from, buf));
     e.setSequence(sequence++);
     if (from == myId)  // Record self-deliveries for flow control.
         mcast.selfDeliver(e);
-    deliver(e, l);
+    deliver(e);
 }
 
-void Cluster::deliver(const Event& e, Lock&) {
+void Cluster::deliver(const Event& e) {
     if (state == LEFT) return;
     QPID_LATENCY_INIT(e);
     deliverEventQueue.push(e);
 }
 
-// Entry point: called when deliverEventQueue has events to process.
+// Handler for deliverEventQueue
 void Cluster::deliveredEvent(const Event& e) {
     QPID_LATENCY_RECORD("delivered event queue", e);
     Buffer buf(const_cast<char*>(e.getData()), e.getSize());
-    boost::intrusive_ptr<Connection> connection;
-    if (e.isConnection()) {
-        if (state <= UPDATEE) {
-            QPID_LOG(trace, *this << " DROP: " << e);
-            return;
-        }
-        connection = getConnection(e.getConnectionId());
-        if (!connection) return;
-    }
     if (e.getType() == CONTROL) {
         AMQFrame frame;
-        while (frame.decode(buf)) {
-            deliverFrameQueue.push(EventFrame(connection, e, frame));
-        }
-    }
-    else if (e.getType() == DATA) { 
-        connection->deliveredEvent(e, deliverFrameQueue);
+        while (frame.decode(buf)) 
+            deliverFrameQueue.push(EventFrame(e, frame));
     }
+    else if (e.getType() == DATA)
+        decoder.decode(e, e.getData());
 }
 
+// Handler for deliverFrameQueue
 void Cluster::deliveredFrame(const EventFrame& e) {
+    Mutex::ScopedLock l(lock); 
     QPID_LOG(trace, *this << " DLVR: " << e);
     QPID_LATENCY_RECORD("delivered frame queue", e.frame);
-    if (e.connection) {
-        e.connection->deliveredFrame(e);
-    }
-    else {
-        Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope too big?
-        ClusterDispatcher dispatch(*this, e.member, l);
+    if (e.isCluster()) {        // Cluster control frame
+        ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
         if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
             throw Exception(QPID_MSG("Invalid cluster control"));
     }
+    else {                      // Connection frame.
+        if (state <= UPDATEE) {
+            QPID_LOG(trace, *this << " DROP: " << e);
+            return;
+        }
+        boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
+        connection->deliveredFrame(e);
+    }
     QPID_LATENCY_RECORD("processed", e.frame);
 }
   
@@ -282,7 +284,13 @@
     std::string addresses;
     for (cpg_address* p = current; p < current+nCurrent; ++p) 
         addresses.append(MemberId(*p).str());
-    deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), myId), l);
+    deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), myId));
+}
+
+void Cluster::setReady(Lock&) {
+    state = READY;
+    if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
+    mcast.release();
 }
 
 void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) {
@@ -296,12 +304,9 @@
 
     if (state == INIT) {        // First configChange
         if (map.aliveCount() == 1) {
-            setClusterId(true);
-            // FIXME aconway 2008-12-11: Centralize transition to READY and associated actions eg mcast.release()
-            state = READY;
-            mcast.release();
             QPID_LOG(notice, *this << " first in cluster");
-            if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
+            setClusterId(true);
+            setReady(l);
             map = ClusterMap(myId, myUrl, true);
             memberUpdate(l);
         }
@@ -325,9 +330,6 @@
     }
 }
 
-
-
-
 void Cluster::tryMakeOffer(const MemberId& id, Lock& ) {
     if (state == READY && map.isJoiner(id)) {
         state = OFFER;
@@ -361,11 +363,8 @@
     if (map.ready(id, Url(url))) 
         memberUpdate(l);
     if (state == CATCHUP && id == myId) {
-        state = READY;
-        mcast.release();
         QPID_LOG(notice, *this << " caught up, active cluster member");
-        if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
-        mcast.release();
+        setReady(l);
     }
 }
 
@@ -379,8 +378,7 @@
             updateStart(updatee, *url, l);
         }
         else {                  // Another offer was first.
-            state = READY;
-            mcast.release();
+            setReady(l);
             QPID_LOG(info, *this << " cancelled update offer to " << updatee);
             tryMakeOffer(map.firstJoiner(), l); // Maybe make another offer.
         }
@@ -390,7 +388,7 @@
         setClusterId(uuid);
         state = UPDATEE;
         QPID_LOG(info, *this << " receiving update from " << updater);
-        deliverEventQueue.stop();
+        deliverFrameQueue.stop();
         checkUpdateIn(l);
     }
 }
@@ -400,7 +398,7 @@
     assert(state == OFFER);
     state = UPDATER;
     QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url);
-    deliverEventQueue.stop();
+    deliverFrameQueue.stop();
     if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
     updateThread = Thread(
         new UpdateClient(myId, updatee, url, broker, map, connections.values(),
@@ -422,7 +420,7 @@
         mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId);
         state = CATCHUP;
         QPID_LOG(info, *this << " received update, starting catch-up");
-        deliverEventQueue.start();
+        deliverFrameQueue.start();
     }
 }
 
@@ -432,11 +430,11 @@
 }
 
 void Cluster::updateOutDone(Lock& l) {
+    QPID_LOG(info, *this << " sent update");
     assert(state == UPDATER);
     state = READY;
     mcast.release();
-    QPID_LOG(info, *this << " sent update");
-    deliverEventQueue.start();
+    deliverFrameQueue.start();
     tryMakeOffer(map.firstJoiner(), l); // Try another offer
 }
 
@@ -504,8 +502,6 @@
     }
     lastSize = size;
 
-    //
-
     if (mgmtObject) {
         mgmtObject->set_clusterSize(size); 
         string urlstr;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=740459&r1=740458&r2=740459&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Feb  3 21:28:14 2009
@@ -29,9 +29,10 @@
 #include "NoOpConnectionOutputHandler.h"
 #include "PollerDispatch.h"
 #include "Quorum.h"
+#include "Decoder.h"
+#include "PollableQueue.h"
 
 #include "qpid/broker/Broker.h"
-#include "qpid/sys/PollableQueue.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/Url.h"
@@ -73,8 +74,9 @@
     virtual ~Cluster();
 
     // Connection map - called in connection threads.
-    void insert(const ConnectionPtr&); 
-    void erase(ConnectionId);       
+    void addLocalConnection(const ConnectionPtr&); 
+    void addShadowConnection(const ConnectionPtr&); 
+    void erase(const ConnectionId&);       
     
     // URLs of current cluster members - called in connection threads.
     std::vector<std::string> getIds() const;
@@ -100,8 +102,8 @@
   private:
     typedef sys::Monitor::ScopedLock Lock;
 
-    typedef sys::PollableQueue<Event> PollableEventQueue;
-    typedef sys::PollableQueue<EventFrame> PollableFrameQueue;
+    typedef PollableQueue<Event> PollableEventQueue;
+    typedef PollableQueue<EventFrame> PollableFrameQueue;
 
     // NB: The final Lock& parameter on functions below is used to mark functions
     // that should only be called by a function that already holds the lock.
@@ -132,6 +134,8 @@
     // Helper, called in deliver thread.
     void updateStart(const MemberId& updatee, const Url& url, Lock&);
 
+    void setReady(Lock&);
+
     void deliver( // CPG deliver callback. 
         cpg_handle_t /*handle*/,
         struct cpg_name *group,
@@ -140,7 +144,7 @@
         void* /*msg*/,
         int /*msg_len*/);
 
-    void deliver(const Event& e, Lock&); 
+    void deliver(const Event&);
     
     void configChange( // CPG config change callback.
         cpg_handle_t /*handle*/,
@@ -150,8 +154,6 @@
         struct cpg_address */*joined*/, int /*nJoined*/
     );
 
-    boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&);
-
     virtual qpid::management::ManagementObject* GetManagementObject() const;
     virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
 
@@ -193,7 +195,10 @@
     boost::shared_ptr<FailoverExchange> failoverExchange;
     Quorum quorum;
 
-    // Remaining members are protected by lock.
+    // Called only from event delivery thread
+    Decoder decoder;
+    
+    // Remaining members are protected by lock
     mutable sys::Monitor lock;
 
     //    Local cluster state, cluster map

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=740459&r1=740458&r2=740459&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Tue Feb  3 21:28:14 2009
@@ -69,8 +69,7 @@
     std::for_each(membersFt.begin(), membersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(members), boost::ref(alive)));
 }
 
-ClusterConnectionMembershipBody ClusterMap::asMethodBody() const {
-    framing::ClusterConnectionMembershipBody b;
+void ClusterMap::toMethodBody(framing::ClusterConnectionMembershipBody& b) const {
     b.getJoiners().clear();
     std::for_each(joiners.begin(), joiners.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getJoiners()), _1));
     for(Set::const_iterator i = alive.begin(); i != alive.end(); ++i) {
@@ -79,7 +78,6 @@
     }
     b.getMembers().clear();
     std::for_each(members.begin(), members.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getMembers()), _1));
-    return b;
 }
 
 bool ClusterMap::configChange(

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=740459&r1=740458&r2=740459&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h Tue Feb  3 21:28:14 2009
@@ -71,7 +71,7 @@
     MemberId firstJoiner() const;
 
     /** Convert map contents to a cluster control body. */
-    framing::ClusterConnectionMembershipBody asMethodBody() const;
+    void toMethodBody(framing::ClusterConnectionMembershipBody&) const;
 
     size_t aliveCount() const { return alive.size(); }
     size_t memberCount() const { return members.size(); }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=740459&r1=740458&r2=740459&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Feb  3 21:28:14 2009
@@ -75,7 +75,9 @@
 
 void Connection::init() {
     QPID_LOG(debug, cluster << " new connection: " << *this);
-    if (isLocal() && !isCatchUp() && cluster.getReadMax()) {
+    if (isLocalClient()) {
+        cluster.addLocalConnection(this);
+        if (cluster.getReadMax()) 
         output.giveReadCredit(cluster.getReadMax());
     }
 }
@@ -99,17 +101,15 @@
 // Received from a directly connected client.
 void Connection::received(framing::AMQFrame& f) {
     QPID_LOG(trace, cluster << " RECV " << *this << ": " << f);
-    if (isLocal()) {
+    if (isLocal()) {            // Local catch-up connection.
         currentChannel = f.getChannel();
         if (!framing::invoke(*this, *f.getBody()).wasHandled())
             connection.received(f);
     }
-    else {             // Shadow or updated ex catch-up connection.
+    else {             // Shadow or updated catch-up connection.
         if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
-            if (isShadow()) {
-                QPID_LOG(debug, cluster << " inserting connection " << *this);
-                cluster.insert(boost::intrusive_ptr<Connection>(this));
-            }
+            if (isShadow()) 
+                cluster.addShadowConnection(this);
             AMQFrame ok((ConnectionCloseOkBody()));
             connection.getOutput().send(ok);
             output.setOutputHandler(discardHandler);
@@ -136,24 +136,7 @@
     return !message.empty();
 }
 
-// Decode buffer and put frames on frameq.
-void Connection::deliveredEvent(const Event& e, PollableFrameQueue& frameq) {
-    assert(!catchUp);
-    Buffer buf(e);
-    // Set read credit on the last frame.
-    ++readCredit;               // One credit per buffer.
-    if (!mcastDecoder.decode(buf)) return;
-    AMQFrame frame(mcastDecoder.frame);
-    while (mcastDecoder.decode(buf)) {
-        frameq.push(EventFrame(this, e, frame));
-        frame = mcastDecoder.frame;
-    }
-    frameq.push(EventFrame(this, e, frame, readCredit));
-    readCredit = 0;
-}
-
-
-// Delivered from cluster.
+// Called in delivery thread, in cluster order.
 void Connection::deliveredFrame(const EventFrame& f) {
     assert(!catchUp);
     currentChannel = f.frame.getChannel(); 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=740459&r1=740458&r2=740459&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue Feb  3 21:28:14 2009
@@ -72,9 +72,11 @@
     ConnectionId getId() const { return self; }
     broker::Connection& getBrokerConnection() { return connection; }
 
-    /** True for connections from direct clients of local broker */
+    /** Local connections may be clients or catch-up connections */
     bool isLocal() const;
 
+    bool isLocalClient() const { return isLocal() && !isCatchUp(); }
+
     /** True for connections that are shadowing remote broker connections */
     bool isShadow() const;
 
@@ -101,7 +103,6 @@
     size_t decode(const char* buffer, size_t size);
 
     // Called for data delivered from the cluster.
-    void deliveredEvent(const Event&, PollableFrameQueue&);
     void deliveredFrame(const EventFrame&);
 
     void consumerState(const std::string& name, bool blocked, bool notifyEnabled);
@@ -166,7 +167,6 @@
     WriteEstimate writeEstimate;
     OutputInterceptor output;
     framing::FrameDecoder localDecoder;
-    framing::FrameDecoder mcastDecoder;
     broker::Connection connection;
     framing::SequenceNumber deliverSeq;
     framing::ChannelId currentChannel;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=740459&r1=740458&r2=740459&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Tue Feb  3 21:28:14 2009
@@ -59,8 +59,6 @@
 {
     std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor));
     codec.setInputHandler(ih);
-    if (!catchUp)               // Don't put catchUp connections in the cluster map.
-        cluster.insert(interceptor);
 }
 
 ConnectionCodec::~ConnectionCodec() {}

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp?rev=740459&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp Tue Feb  3 21:28:14 2009
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ConnectionDecoder.h"
+#include "EventFrame.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+
+ConnectionDecoder::ConnectionDecoder(const Handler& h) : handler(h), readCredit(0) {}
+
+void ConnectionDecoder::decode(const EventHeader& eh, const void* data) {
+    assert(eh.getType() == DATA); // Only handle connection data events.
+    const char* cp = static_cast<const char*>(data);
+    Buffer buf(const_cast<char*>(cp), eh.getSize());
+    // Set read credit on the last frame in the event.
+    ++readCredit;               // One credit per event = connection read buffer.
+    if (decoder.decode(buf)) { // Decoded a frame
+        AMQFrame frame(decoder.frame);
+        while (decoder.decode(buf)) {
+            handler(EventFrame(eh, frame));
+            frame = decoder.frame;
+        }
+        handler(EventFrame(eh, frame, readCredit));
+        readCredit = 0;         // Reset credit for next event.
+    }
+}
+
+}} // namespace qpid::cluster

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h?rev=740459&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h Tue Feb  3 21:28:14 2009
@@ -0,0 +1,60 @@
+#ifndef QPID_CLUSTER_CONNECTIONDECODER_H
+#define QPID_CLUSTER_CONNECTIONDECODER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/FrameDecoder.h"
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace cluster {
+
+class EventHeader;
+class EventFrame;
+/**
+ * Decodes delivered connection data Event's as EventFrame's for a
+ * connection replica, local or shadow. Manages state for frame
+ * fragments and flow control.
+ * 
+ * THREAD UNSAFE: connection events are decoded in sequence.
+ */
+class ConnectionDecoder
+{
+  public:
+    typedef boost::function<void(const EventFrame&)> Handler;
+
+    ConnectionDecoder(const Handler& h);
+
+    /** Takes EventHeader + data rather than Event so that the caller can
+     * pass a pointer to connection data or a CPG buffer directly without copy.
+     */
+    void decode(const EventHeader& eh, const void* data);
+
+  private:
+    Handler handler;
+    framing::FrameDecoder decoder;
+    int readCredit;
+};
+
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_CONNECTIONDECODER_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp?rev=740459&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp Tue Feb  3 21:28:14 2009
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionMap.h"
+#include "Cluster.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+using framing::InternalErrorException;
+
+void ConnectionMap::insert(ConnectionPtr p) {
+    std::pair<Map::iterator, bool> ib = map.insert(Map::value_type(p->getId(), p));
+    if (!ib.second) {
+        assert(0);
+        throw InternalErrorException(QPID_MSG("Duplicate connection replica: " << p->getId()));
+    }
+}
+
+void ConnectionMap::erase(const ConnectionId& id) {
+    Map::iterator i = map.find(id);
+    if (i == map.end()) {
+        assert(0);
+        QPID_LOG(warning, "Erase non-existent connection replica: " << id);
+    }
+    map.erase(i);
+}
+
+ConnectionMap::ConnectionPtr ConnectionMap::get(const ConnectionId& id) {
+    Map::const_iterator i = map.find(id);
+    if (i == map.end()) {
+        assert(id.getMember() != cluster.getId());
+        // New remote connection, create a shadow.
+        std::ostringstream mgmtId;
+        mgmtId << id;
+        ConnectionPtr cp = new Connection(cluster, shadowOut, mgmtId.str(), id);
+        std::pair<Map::iterator, bool> ib = map.insert(Map::value_type(id, cp)); 
+        assert(ib.second);      // FIXME aconway 2009-02-03: exception.
+        i = ib.first;
+    }
+    return i->second;
+}
+
+ConnectionMap::Vector ConnectionMap::values() const {
+    Vector result(map.size());
+    std::transform(map.begin(), map.end(), result.begin(),
+                   boost::bind(&Map::value_type::second, _1));
+    return result;
+}
+
+void ConnectionMap::update(MemberId myId, const ClusterMap& cluster) {
+    for (Map::iterator i = map.begin(); i != map.end(); ) {
+        MemberId member = i->first.getMember();
+        if (member != myId && !cluster.isMember(member)) { 
+            i->second->left();
+            map.erase(i++);
+        } else {
+            i++;
+        }
+    }
+}
+
+void ConnectionMap::clear() {
+    map.clear();
+}
+
+}} // namespace qpid::cluster

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h?rev=740459&r1=740458&r2=740459&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h Tue Feb  3 21:28:14 2009
@@ -24,6 +24,7 @@
 #include "types.h"
 #include "Connection.h"
 #include "ClusterMap.h"
+#include "NoOpConnectionOutputHandler.h"
 #include "qpid/sys/Mutex.h"
 #include <boost/intrusive_ptr.hpp>
 #include <map>
@@ -31,61 +32,48 @@
 namespace qpid {
 namespace cluster {
 
+class Cluster;
+
 /**
- * Thread safe map of connections.
+ * Thread safe map of connections. The map is used in:
+ * - deliver thread to look connections and create new shadow connections.
+ * - local catch-up connection threads to add a caught-up shadow connections.
+ * - local client connection threads when local connections are created.
  */
-class ConnectionMap
-{
+class ConnectionMap {
   public:
     typedef boost::intrusive_ptr<cluster::Connection> ConnectionPtr;
     typedef std::vector<ConnectionPtr> Vector;
     
-    void insert(ConnectionId id, ConnectionPtr p) {
-        ScopedLock l(lock);
-        map.insert(Map::value_type(id,p));
-    }
-
-    void erase(ConnectionId id) {
-        ScopedLock l(lock);
-        map.erase(id);
-    }
-
-    ConnectionPtr find(ConnectionId id) const {
-        ScopedLock l(lock);
-        Map::const_iterator i = map.find(id);
-        return i == map.end() ? ConnectionPtr() : i->second;
-    }
-
-    Vector values() const {
-        Vector result(map.size());
-        std::transform(map.begin(), map.end(), result.begin(),
-                       boost::bind(&Map::value_type::second, _1));
-        return result;
-    }
-
-    void update(MemberId myId, const ClusterMap& cluster) {
-        for (Map::iterator i = map.begin(); i != map.end(); ) {
-            MemberId member = i->first.getMember();
-            if (member != myId && !cluster.isMember(member)) { 
-                i->second->left();
-                map.erase(i++);
-            } else {
-                i++;
-            }
-        }
-    }
-
-    void clear() {
-        ScopedLock l(lock);
-        map.clear();
-    }
+    ConnectionMap(Cluster& c) : cluster(c) {}
+    
+    /** Insert a local connection or a caught up shadow connection.
+     * Called in local connection thread.
+     */
+    void insert(ConnectionPtr p); 
+
+    /** Erase a closed connection. Called in deliver thread. */
+    void erase(const ConnectionId& id);
+
+    /** Get an existing connection. */ 
+    ConnectionPtr get(const ConnectionId& id);
+
+    /** Get connections for sending an update. */
+    Vector values() const;
+
+    /** Remove connections who's members are no longer in the cluster. Deliver thread. */
+    void update(MemberId myId, const ClusterMap& cluster); 
+
+    
+    void clear();
+
+    size_t size() const;
 
-    size_t size() const { return map.size(); }
   private:
     typedef std::map<ConnectionId, ConnectionPtr> Map;
-    typedef sys::Mutex::ScopedLock ScopedLock;
-    
-    mutable sys::Mutex lock;
+
+    Cluster& cluster;
+    NoOpConnectionOutputHandler shadowOut;
     Map map;
 };
 

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp?rev=740459&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp Tue Feb  3 21:28:14 2009
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Decoder.h"
+#include "Event.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/ptr_map.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace framing;
+
+Decoder::Decoder(const Handler& h) : handler(h) {}
+
+void Decoder::decode(const EventHeader& eh, const void* data) {
+    ConnectionId id = eh.getConnectionId();
+    std::pair<Map::iterator, bool> ib = map.insert(id, new ConnectionDecoder(handler));
+    ptr_map_ptr(ib.first)->decode(eh, data);
+}
+
+void Decoder::erase(const ConnectionId& c) {
+    Map::iterator i = map.find(c);
+    if (i != map.end())     // FIXME aconway 2009-02-03: 
+        map.erase(i);
+}
+
+}} // namespace qpid::cluster

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h?rev=740459&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h Tue Feb  3 21:28:14 2009
@@ -0,0 +1,62 @@
+#ifndef QPID_CLUSTER_DECODER_H
+#define QPID_CLUSTER_DECODER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ConnectionDecoder.h"
+#include "types.h"
+#include <boost/ptr_container/ptr_map.hpp>
+
+namespace qpid {
+namespace cluster {
+
+class EventHeader;
+
+/**
+ * Holds a map of ConnectionDecoders. Decodes Events into EventFrames
+ * and forwards EventFrames to a handler.
+ *
+ * THREAD UNSAFE: Called sequentially with un-decoded cluster events from CPG.
+ */
+class Decoder
+{
+  public:
+    typedef boost::function<void(const EventFrame&)> Handler;
+
+    Decoder(const Handler& h);
+
+    /** Takes EventHeader + data rather than Event so that the caller can
+     * pass a pointer to connection data or a CPG buffer directly without copy.
+     */
+    void decode(const EventHeader& eh, const void* data);
+
+    /** Erase the decoder for a connection. */
+    void erase(const ConnectionId&);
+
+  private:
+    typedef boost::ptr_map<ConnectionId, ConnectionDecoder> Map;
+    Handler handler;
+    Map map;
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_DECODER_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=740459&r1=740458&r2=740459&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Tue Feb  3 21:28:14 2009
@@ -49,7 +49,12 @@
     EventType getType() const { return type; }
     ConnectionId getConnectionId() const { return connectionId; }
     MemberId getMemberId() const { return connectionId.getMember(); }
+
+    /** Size of payload data, excluding header. */
     size_t getSize() const { return size; }
+    /** Size of header + payload. */ 
+    size_t getStoreSize() { return size + HEADER_SIZE; }
+
     uint64_t getSequence() const { return sequence; }
     void setSequence(uint64_t n) { sequence = n; }
 
@@ -88,7 +93,6 @@
     // Store including header
     char* getStore() { return store; }
     const char* getStore() const { return store; }
-    size_t getStoreSize() { return size + HEADER_SIZE; }
     
     operator framing::Buffer() const;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp?rev=740459&r1=740458&r2=740459&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp Tue Feb  3 21:28:14 2009
@@ -26,21 +26,14 @@
 
 EventFrame::EventFrame() : sequence(0) {}
 
-EventFrame::EventFrame(
-    const boost::intrusive_ptr<Connection>& c, const Event& e,
-    const framing::AMQFrame& f, int rc
-) : connection(c), member(e.getMemberId()), frame(f),
-    sequence(e.getSequence()), readCredit(rc)
+EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc)
+    : connectionId(e.getConnectionId()), frame(f), sequence(e.getSequence()), readCredit(rc)
 {
     QPID_LATENCY_INIT(frame);
 }
 
 std::ostream& operator<<(std::ostream& o, const EventFrame& e) {
-    if (e.connection)
-        o << e.connection->getId();
-    else
-        o << e.member;
-    return o  << "/" << e.sequence << " " << e.frame << " rc=" << e.readCredit;
+    return o << e.connectionId << "/" << e.sequence << " " << e.frame << " rc=" << e.readCredit;
 }
 
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h?rev=740459&r1=740458&r2=740459&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h Tue Feb  3 21:28:14 2009
@@ -32,22 +32,21 @@
 namespace qpid {
 namespace cluster {
 
-class Connection;
-
 /**
  * A frame decoded from an Event.
  */
 struct EventFrame
 {
+  public:
     EventFrame();
 
-    EventFrame(const boost::intrusive_ptr<Connection>& c, const Event& e,
-               const framing::AMQFrame& f, int rc=0);
+    EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc=0);
 
-    bool isCluster() const { return !connection; }
-    bool isConnection() const { return connection; }
+    bool isCluster() const { return !connectionId.getPointer(); }
+    bool isConnection() const { return connectionId.getPointer(); }
     bool isLastInEvent() const { return readCredit; }
 
+
     // True if this frame follows immediately after frame e. 
     bool follows(const EventFrame& e) const {
         return sequence == e.sequence || (sequence == e.sequence+1 && e.readCredit);
@@ -55,8 +54,7 @@
 
     bool operator<(const EventFrame& e) const { return sequence < e.sequence; }
     
-    boost::intrusive_ptr<Connection> connection;
-    MemberId member;
+    ConnectionId connectionId;
     framing::AMQFrame frame;   
     uint64_t sequence;
     int readCredit;             // last frame in an event, give credit when processed.

Copied: qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h (from r740374, qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h?p2=qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h&p1=qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h&r1=740374&r2=740459&rev=740459&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h Tue Feb  3 21:28:14 2009
@@ -1,5 +1,5 @@
-#ifndef QPID_CLUSTER_CLUSTERQUEUEHANDLER_H
-#define QPID_CLUSTER_CLUSTERQUEUEHANDLER_H
+#ifndef QPID_CLUSTER_POLLABLEQUEUE_H
+#define QPID_CLUSTER_POLLABLEQUEUE_H
 
 /*
  *
@@ -22,35 +22,47 @@
  *
  */
 
-#include "Cluster.h"
 #include "qpid/sys/PollableQueue.h"
 #include  <qpid/log/Statement.h>
 
 namespace qpid {
 namespace cluster {
 
-/** Convenience functor for PollableQueue callbacks. */
-template <class T> struct ClusterQueueHandler {
-    ClusterQueueHandler(Cluster& c, boost::function<void (const T&)> f, const std::string& n) : cluster(c), callback(f), name(n) {}
-    ClusterQueueHandler(const Cluster* c, boost::function<void (const T&)> f, const std::string& n) : cluster(*const_cast<Cluster*>(c)), callback(f), name(n) {}
+/**
+ * More convenient version of PollableQueue that handles iterating
+ * over the batch and error handling.
+ */
+template <class T> class PollableQueue : public sys::PollableQueue<T> {
+  public:
+    typedef boost::function<void (const T&)> Callback;
+    typedef boost::function<void()> ErrorCallback;
+
+    PollableQueue(Callback f, ErrorCallback err, const std::string& msg, const boost::shared_ptr<sys::Poller>& poller)
+        : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1), poller),
+          callback(f), error(err), message(msg) {}
 
-    void operator()(typename sys::PollableQueue<T>::Queue& values) {
+    void handleBatch(typename sys::PollableQueue<T>::Queue& values) {
         try {
-            std::for_each(values.begin(), values.end(), callback);
-            values.clear();
+            typename sys::PollableQueue<T>::Queue::iterator i = values.begin();
+            while (i != values.end() && !this->isStopped()) {
+                callback(*i);
+                ++i;
+            }
+            values.erase(values.begin(), i);
         }
         catch (const std::exception& e) {
-            QPID_LOG(error, "Error on " << name << ": " << e.what());
-            cluster.leave();
+            QPID_LOG(error, message << ": " << e.what());
+            error();
         }
     }
 
-    Cluster& cluster;
-    boost::function<void (const T&)> callback;
-    std::string name;
+  private:
+    Callback callback;
+    ErrorCallback error;
+    std::string message;
 };
 
     
 }} // namespace qpid::cluster
 
-#endif  /*!QPID_CLUSTER_CLUSTERQUEUEHANDLER_H*/
+#endif  /*!QPID_CLUSTER_POLLABLEQUEUE_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp?rev=740459&r1=740458&r2=740459&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp Tue Feb  3 21:28:14 2009
@@ -35,7 +35,7 @@
     enable = true;
     cman = cman_init(0);
     if (cman == 0) throw ErrnoException("Can't connect to cman service");
-    // FIXME aconway 2008-11-13: configurable max wait.
+    // TODO aconway 2008-11-13: configurable max wait.
     for (int retry = 0;  !cman_is_quorate(cman) && retry < 30; retry++) {
         QPID_LOG(info, "Waiting for cluster quorum: " << sys::strError(errno));
         sys::sleep(1);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=740459&r1=740458&r2=740459&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Tue Feb  3 21:28:14 2009
@@ -117,7 +117,10 @@
     session.close();
 
     std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1));
-    AMQFrame frame(map.asMethodBody());
+
+    ClusterConnectionMembershipBody membership;
+    map.toMethodBody(membership);
+    AMQFrame frame(membership);
     client::ConnectionAccess::getImpl(connection)->handle(frame);
     connection.close();
     QPID_LOG(debug,  updaterId << " updated state to " << updateeId << " at " << updateeUrl);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h?rev=740459&r1=740458&r2=740459&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/types.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/types.h Tue Feb  3 21:28:14 2009
@@ -24,11 +24,10 @@
 
 #include "config.h"
 #include "qpid/Url.h"
-
+#include <boost/intrusive_ptr.hpp>
 #include <utility>
 #include <iosfwd>
 #include <string>
-
 #include <stdint.h>
 
 extern "C" {
@@ -45,6 +44,7 @@
 namespace cluster {
 
 class Connection;
+typedef boost::intrusive_ptr<Connection> ConnectionPtr;
 
 /** Types of cluster event. */
 enum EventType { DATA, CONTROL };

Modified: qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h?rev=740459&r1=740458&r2=740459&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/ForkedBroker.h Tue Feb  3 21:28:14 2009
@@ -108,6 +108,7 @@
             std::vector<const char*> args2(args);
             args2.push_back("--port=0");
             args2.push_back("--mgmt-enable=no"); // TODO aconway 2008-07-16: why does mgmt cause problems?
+            if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE"))
             args2.push_back("--log-enable=error+"); // Keep quiet except for errors.
             args2.push_back(0);
             execv(prog, const_cast<char* const*>(&args2[0]));

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=740459&r1=740458&r2=740459&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Tue Feb  3 21:28:14 2009
@@ -219,7 +219,7 @@
     ClusterFixture cluster(1);
     Client c0(cluster[0], "c0");
     FieldTable args;
-    args.setInt("qpid.msg_sequence", 1); // FIXME aconway 2008-11-11: works with "qpid.sequence_counter"??
+    args.setInt("qpid.msg_sequence", 1); 
     c0.session.queueDeclare(arg::queue="q");
     c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=args);
     c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k");
@@ -452,7 +452,7 @@
     BOOST_CHECK_EQUAL(kb0, kb2);
 }
 
-QPID_AUTO_TEST_CASE(UpdateConsumers) {
+QPID_AUTO_TEST_CASE(testUpdateConsumers) {
     ClusterFixture cluster(1, 1);  
 
     Client c0(cluster[0], "c0"); 

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=740459&r1=740458&r2=740459&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Tue Feb  3 21:28:14 2009
@@ -36,7 +36,7 @@
       <field name="cluster-id" type="uuid"/>
     </control>
 
-Min     <control name="ready" code="0x10" label="New member is ready.">
+    <control name="ready" code="0x10" label="New member is ready.">
       <field name="url" type="str16"/>
     </control>
 
@@ -45,6 +45,7 @@
     </control>
     
     <control name="shutdown" code="0x20" label="Shut down entire cluster"/>
+
   </class>
 
   <!-- TODO aconway 2008-09-10: support for un-attached connections. -->
@@ -53,8 +54,7 @@
 
   <class name="cluster-connection" code="0x81" label="Qpid clustering extensions.">
 
-    <control name="deliver-close" code="0x2">
-    </control>
+    <control name="deliver-close" code="0x2"/>
 
     <control name="deliver-do-output" code="0x3">
       <field name="bytes" type="uint32"/>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message