qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r747528 - in /qpid/trunk/qpid/cpp/src: ./ qpid/broker/ qpid/cluster/ qpid/sys/ tests/
Date Tue, 24 Feb 2009 19:48:55 GMT
Author: aconway
Date: Tue Feb 24 19:48:54 2009
New Revision: 747528

URL: http://svn.apache.org/viewvc?rev=747528&view=rev
Log:
Fixed issue with producer flow control in a cluster.

Producer flow control uses a Timer and other clock-based calculations to send flow control
commands.
These commands are not predictably ordered from the clusters point of view.

Added getClusterOrderProxy() to SessionState. In a cluster it returns
a proxy that defers sending a command to the client until it is
multicast to the cluster.  In a stand alone broker it is just the
normal proxy. Updated producer flow control to use this proxy.

Cluster flow control is turned off in shadow connections. Only the
directly connected node does flow control calculations and multicasts
the commands to send. All nodes sending of the commands thru SessionState
to ensure consistent session state (e.g. command numbering.)

Added:
    qpid/trunk/qpid/cpp/src/qpid/cluster/McastFrameHandler.h   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/cluster.mk
    qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
    qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
    qpid/trunk/qpid/cpp/src/tests/federated_cluster_test
    qpid/trunk/qpid/cpp/src/tests/ssl_test
    qpid/trunk/qpid/cpp/src/tests/start_cluster

Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Tue Feb 24 19:48:54 2009
@@ -70,6 +70,7 @@
   qpid/cluster/FailoverExchange.h		\
   qpid/cluster/Multicaster.cpp			\
   qpid/cluster/Multicaster.h			\
+  qpid/cluster/MulticastFrameHandler.h		\
   qpid/cluster/NoOpConnectionOutputHandler.h	\
   qpid/cluster/OutputInterceptor.cpp		\
   qpid/cluster/OutputInterceptor.h		\

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h Tue Feb 24 19:48:54 2009
@@ -48,8 +48,9 @@
         heartbeatmax(120),
         stagingThreshold(broker.getStagingThreshold()),
         federationLink(true),
-        clientSupportsThrottling(false)
-        {}
+        clientSupportsThrottling(false),
+        clusterOrderOut(0)
+    {}
 
     virtual ~ConnectionState () {}
 
@@ -75,7 +76,7 @@
     const string& getFederationPeerTag() const { return federationPeerTag; }
     std::vector<Url>& getKnownHosts() { return knownHosts; }
     
-    void setClientThrottling() { clientSupportsThrottling = true; }
+    void setClientThrottling(bool set=true) { clientSupportsThrottling = set; }
     bool getClientThrottling() const { return clientSupportsThrottling; }
 
     Broker& getBroker() { return broker; }
@@ -86,11 +87,20 @@
     //contained output tasks
     sys::AggregateOutput outputTasks;
 
-    sys::ConnectionOutputHandlerPtr& getOutput() { return out; }
+    sys::ConnectionOutputHandler& getOutput() { return out; }
     framing::ProtocolVersion getVersion() const { return version; }
-
     void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out.set(o); }
 
+    /**
+     * If the broker is part of a cluster, this is a handler provided
+     * by cluster code. It ensures consistent ordering of commands
+     * that are sent based on criteria that are not predictably
+     * ordered cluster-wide, e.g. a timer firing.
+     */
+    framing::FrameHandler* getClusterOrderOutput() { return clusterOrderOut; }
+    void setClusterOrderOutput(framing::FrameHandler& fh) { clusterOrderOut = &fh;
}
+
+
   protected:
     framing::ProtocolVersion version;
     uint32_t framemax;
@@ -103,6 +113,7 @@
     string federationPeerTag;
     std::vector<Url> knownHosts;
     bool clientSupportsThrottling;
+    framing::FrameHandler* clusterOrderOut;
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Tue Feb 24 19:48:54 2009
@@ -34,7 +34,8 @@
 SessionHandler::SessionHandler(Connection& c, ChannelId ch)
     : amqp_0_10::SessionHandler(&c.getOutput(), ch),
       connection(c), 
-      proxy(out)
+      proxy(out),
+      clusterOrderProxy(c.getClusterOrderOutput() ? new SetChannelProxy(ch, c.getClusterOrderOutput())
: 0)
 {}
 
 SessionHandler::~SessionHandler() {}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.h Tue Feb 24 19:48:54 2009
@@ -54,6 +54,17 @@
     framing::AMQP_ClientProxy& getProxy() { return proxy; }
     const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
 
+    /**
+     * If commands are sent based on the local time (e.g. in timers), they don't have
+     * a well-defined ordering across cluster nodes.
+     * This proxy is for sending such commands. In a clustered broker it will take steps
+     * to synchronize command order across the cluster. In a stand-alone broker
+     * it is just a synonym for getProxy()
+     */  
+    framing::AMQP_ClientProxy& getClusterOrderProxy() {
+        return clusterOrderProxy.get() ? *clusterOrderProxy : proxy;
+    }
+
     virtual void handleDetach();
     
     // Overrides
@@ -69,9 +80,16 @@
     virtual void readyToSend();
 
   private:
+    struct SetChannelProxy : public framing::AMQP_ClientProxy { // Proxy that sets the channel.
+        framing::ChannelHandler setChannel;
+        SetChannelProxy(uint16_t ch, framing::FrameHandler* out)
+            : framing::AMQP_ClientProxy(setChannel), setChannel(ch, out) {}
+    };
+
     Connection& connection;
     framing::AMQP_ClientProxy proxy;
     std::auto_ptr<SessionState> session;
+    std::auto_ptr<SetChannelProxy> clusterOrderProxy;
 };
 
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Tue Feb 24 19:48:54 2009
@@ -66,7 +66,7 @@
     uint32_t maxRate = broker.getOptions().maxSessionRate;
     if (maxRate) {
         if (handler->getConnection().getClientThrottling()) {
-            rateFlowcontrol = new RateFlowcontrol(maxRate);
+            rateFlowcontrol.reset(new RateFlowcontrol(maxRate));
         } else {
             QPID_LOG(warning, getId() << ": Unable to flow control client - client
doesn't support");
         }
@@ -210,7 +210,6 @@
     {}
 
     void fire() {
-        QPID_LOG(critical, "ScheduledCreditTask fired"); // FIXME aconway 2009-02-23: REMOVE
         // This is the best we can currently do to avoid a destruction/fire race
         if (!isCancelled()) {
             if ( !sessionState.processSendCredit(0) ) {
@@ -275,7 +274,8 @@
     if ( msgs > 0 && rateFlowcontrol->flowStopped() ) {
         QPID_LOG(warning, getId() << ": producer throttling violation");
         // TODO: Probably do message.stop("") first time then disconnect
-        getProxy().getMessage().stop("");
+        // See comment on getClusterOrderProxy() in .h file
+        getClusterOrderProxy().getMessage().stop("");
         return true;
     }
     AbsTime now = AbsTime::now();
@@ -283,7 +283,7 @@
     if (mgmtObject) mgmtObject->dec_clientCredit(msgs);
     if ( sendCredit>0 ) {
         QPID_LOG(debug, getId() << ": send producer credit " << sendCredit);
-        getProxy().getMessage().flow("", 0, sendCredit);
+        getClusterOrderProxy().getMessage().flow("", 0, sendCredit);
         rateFlowcontrol->sentCredit(now, sendCredit);
         if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit);
         return true;
@@ -364,8 +364,9 @@
         // Issue initial credit - use a heuristic here issue min of 300 messages or 1 secs
worth
         uint32_t credit = std::min(rateFlowcontrol->getRate(), 300U);
         QPID_LOG(debug, getId() << ": Issuing producer message credit " << credit);
-        getProxy().getMessage().setFlowMode("", 0);
-        getProxy().getMessage().flow("", 0, credit);
+        // See comment on getClusterOrderProxy() in .h file
+        getClusterOrderProxy().getMessage().setFlowMode("", 0);
+        getClusterOrderProxy().getMessage().flow("", 0, credit);
         rateFlowcontrol->sentCredit(AbsTime::now(), credit);
         if (mgmtObject) mgmtObject->inc_clientCredit(credit);
     }
@@ -373,4 +374,8 @@
 
 Broker& SessionState::getBroker() { return broker; }
 
+framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() {
+    return handler->getClusterOrderProxy();
+}
+
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Feb 24 19:48:54 2009
@@ -125,6 +125,15 @@
 
     void sendAcceptAndCompletion();
 
+    /**
+     * If commands are sent based on the local time (e.g. in timers), they don't have
+     * a well-defined ordering across cluster nodes.
+     * This proxy is for sending such commands. In a clustered broker it will take steps
+     * to synchronize command order across the cluster. In a stand-alone broker
+     * it is just a synonym for getProxy()
+     */  
+    framing::AMQP_ClientProxy& getClusterOrderProxy();
+    
     Broker& broker;
     SessionHandler* handler;
     sys::AbsTime expiry;        // Used by SessionManager.
@@ -138,7 +147,7 @@
 
     // State used for producer flow control (rate limited)
     qpid::sys::Mutex rateLock;
-    RateFlowcontrol* rateFlowcontrol;
+    boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
     boost::intrusive_ptr<TimerTask> flowControlTimer;
 
     friend class SessionManager;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Feb 24 19:48:54 2009
@@ -62,7 +62,8 @@
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
                        const std::string& wrappedId, ConnectionId myId)
     : cluster(c), self(myId), catchUp(false), output(*this, out),
-      connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false)
+      connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false),
+      mcastFrameHandler(cluster.getMulticast(), self)
 { init(); }
 
 // Local connections
@@ -70,15 +71,20 @@
                        const std::string& wrappedId, MemberId myId, bool isCatchUp, bool
isLink)
     : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
       connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId
: 0),
-      expectProtocolHeader(isLink)
+      expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self)
 { init(); }
 
 void Connection::init() {
     QPID_LOG(debug, cluster << " new connection: " << *this);
-    if (isLocalClient()) {
+    if (isLocalClient()) {  
+        connection.setClusterOrderOutput(mcastFrameHandler); // Actively send cluster-order
frames from local node
         cluster.addLocalConnection(this);
         giveReadCredit(cluster.getReadMax());
     }
+    else {                                                  // Shadow or catch-up connection
+        connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order
frames
+        connection.setClientThrottling(false);              // Disable client throttling,
done by active node.
+    }
 }
 
 void Connection::giveReadCredit(int credit) {
@@ -143,7 +149,15 @@
     if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
         && !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
     {
-        connection.received(const_cast<AMQFrame&>(f.frame)); // Pass to broker
connection.
+        // FIXME aconway 2009-02-24: Using the DATA/CONTROL
+        // distinction to distinguish incoming vs. outgoing frames is
+        // very unclear.
+        if (f.type == DATA) // incoming data frames to broker::Connection
+            connection.received(const_cast<AMQFrame&>(f.frame)); 
+        else {                    // outgoing data frame, send via SessionState
+            broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession();
+            if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
+        }
     }
     giveReadCredit(f.readCredit);
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue Feb 24 19:48:54 2009
@@ -27,6 +27,7 @@
 #include "OutputInterceptor.h"
 #include "NoOpConnectionOutputHandler.h"
 #include "EventFrame.h"
+#include "McastFrameHandler.h"
 
 #include "qpid/broker/Connection.h"
 #include "qpid/amqp_0_10/Connection.h"
@@ -150,6 +151,10 @@
     void giveReadCredit(int credit);
     
   private:
+    struct NullFrameHandler : public framing::FrameHandler {
+        void handle(framing::AMQFrame&) {}
+    };
+    
     void init();
     bool checkUnsupported(const framing::AMQBody& body);
     void deliverClose();
@@ -174,6 +179,8 @@
     framing::ChannelId currentChannel;
     boost::shared_ptr<broker::TxBuffer> txBuffer;
     bool expectProtocolHeader;
+    McastFrameHandler mcastFrameHandler;
+    NullFrameHandler nullFrameHandler;
 
     static qpid::sys::AtomicValue<uint64_t> catchUpId;
     

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Tue Feb 24 19:48:54 2009
@@ -74,14 +74,17 @@
     return e;
 }
 
-Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) {
-    framing::AMQFrame f(body);
+Event Event::control(const framing::AMQFrame& f, const ConnectionId& cid) {
     Event e(CONTROL, cid, f.encodedSize());
     Buffer buf(e);
     f.encode(buf);
     return e;
 }
 
+Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) {
+    return control(framing::AMQFrame(body), cid);
+}
+
 iovec Event::toIovec() {
     encodeHeader();
     iovec iov = { const_cast<char*>(getStore()), getStoreSize() };
@@ -110,10 +113,13 @@
 
 static const char* EVENT_TYPE_NAMES[] = { "data", "control" };
 
+std::ostream& operator << (std::ostream& o, EventType t) {
+    return o << EVENT_TYPE_NAMES[t];
+}
+
 std::ostream& operator << (std::ostream& o, const EventHeader& e) {
     o << "[event " << e.getConnectionId()  << "/" << e.getSequence()
-      << " " << EVENT_TYPE_NAMES[e.getType()]
-      << " " << e.getSize() << " bytes]";
+      << " " << e.getType() << " " << e.getSize() << " bytes]";
     return o;
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Tue Feb 24 19:48:54 2009
@@ -34,6 +34,7 @@
 
 namespace framing {
 class AMQBody;
+class AMQFrame;
 class Buffer;
 }
 
@@ -83,8 +84,11 @@
     /** Create an event copied from delivered data. */
     static Event decodeCopy(const MemberId& m, framing::Buffer&);
 
-    /** Create an event containing a control */
+    /** Create a control event. */
     static Event control(const framing::AMQBody&, const ConnectionId&);
+
+    /** Create a control event. */
+    static Event control(const framing::AMQFrame&, const ConnectionId&);
     
     // Data excluding header.
     char* getData() { return store + HEADER_SIZE; }
@@ -105,6 +109,7 @@
 };
 
 std::ostream& operator << (std::ostream&, const EventHeader&);
+
 }} // namespace qpid::cluster
 
 #endif  /*!QPID_CLUSTER_EVENT_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.cpp Tue Feb 24 19:48:54 2009
@@ -27,13 +27,13 @@
 EventFrame::EventFrame() : sequence(0) {}
 
 EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc)
-    : connectionId(e.getConnectionId()), frame(f), sequence(e.getSequence()), readCredit(rc)
+    : connectionId(e.getConnectionId()), frame(f), sequence(e.getSequence()), readCredit(rc),
type(e.getType())
 {
     QPID_LATENCY_INIT(frame);
 }
 
 std::ostream& operator<<(std::ostream& o, const EventFrame& e) {
-    return o << e.connectionId << "/" << e.sequence << " " <<
e.frame << " rc=" << e.readCredit;
+    return o << e.connectionId << "/" << e.sequence << " " <<
e.frame << " rc=" << e.readCredit << " type=" << e.type;
 }
 
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h Tue Feb 24 19:48:54 2009
@@ -57,7 +57,8 @@
     ConnectionId connectionId;
     framing::AMQFrame frame;   
     uint64_t sequence;
-    int readCredit;             // last frame in an event, give credit when processed.
+    int readCredit; ///< last frame in an event, give credit when processed.
+    EventType type;
 };
 
 std::ostream& operator<<(std::ostream& o, const EventFrame& e);

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/McastFrameHandler.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/McastFrameHandler.h?rev=747528&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/McastFrameHandler.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/McastFrameHandler.h Tue Feb 24 19:48:54 2009
@@ -0,0 +1,46 @@
+#ifndef QPID_CLUSTER_MCASTFRAMEHANDLER_H
+#define QPID_CLUSTER_MCASTFRAMEHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "Multicaster.h"
+#include "qpid/framing/FrameHandler.h"
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * A frame handler that multicasts frames as CONTROL events.
+ */
+class McastFrameHandler : public framing::FrameHandler
+{
+  public:
+    McastFrameHandler(Multicaster& m, const ConnectionId& cid) : mcast(m), connection(cid)
{}
+    void handle(framing::AMQFrame& frame) { mcast.mcastControl(frame, connection); }
+  private:
+    Multicaster& mcast;
+    ConnectionId connection;
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_MCASTFRAMEHANDLER_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/McastFrameHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/McastFrameHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Tue Feb 24 19:48:54 2009
@@ -24,6 +24,7 @@
 #include "qpid/log/Statement.h"
 #include "qpid/sys/LatencyMetric.h"
 #include "qpid/framing/AMQBody.h"
+#include "qpid/framing/AMQFrame.h"
 
 namespace qpid {
 namespace cluster {
@@ -43,6 +44,11 @@
     mcast(Event::control(body, id));
 }
 
+void Multicaster::mcastControl(const framing::AMQFrame& frame, const ConnectionId&
id) {
+    QPID_LOG(trace, "MCAST " << id << ": " << frame);
+    mcast(Event::control(frame, id));
+}
+
 void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& id)
{
     Event e(DATA, id, size);
     memcpy(e.getData(), data, size);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h Tue Feb 24 19:48:54 2009
@@ -50,6 +50,7 @@
                 boost::function<void()> onError
     );
     void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&);
+    void mcastControl(const framing::AMQFrame& controlFrame, const ConnectionId&);
     void mcastBuffer(const char*, size_t, const ConnectionId&);
     void mcast(const Event& e);
     /** End holding mode, held events are mcast */

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/types.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/types.h Tue Feb 24 19:48:54 2009
@@ -78,6 +78,8 @@
 
 std::ostream& operator<<(std::ostream&, const ConnectionId&);
 
+std::ostream& operator << (std::ostream&, EventType);
+
 }} // namespace qpid::cluster
 
 #endif  /*!QPID_CLUSTER_TYPES_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h Tue Feb 24 19:48:54 2009
@@ -30,7 +30,7 @@
 /**
  * A ConnectionOutputHandler that delegates to another
  * ConnectionOutputHandler.  Allows the "real" ConnectionOutputHandler
- * to be changed modified without updating all the pointers/references
+ * to be changed without updating all the pointers/references
  * using the ConnectionOutputHandlerPtr
  */
 class ConnectionOutputHandlerPtr : public ConnectionOutputHandler

Modified: qpid/trunk/qpid/cpp/src/tests/federated_cluster_test
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/federated_cluster_test?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/federated_cluster_test (original)
+++ qpid/trunk/qpid/cpp/src/tests/federated_cluster_test Tue Feb 24 19:48:54 2009
@@ -22,7 +22,7 @@
 # Test reliability of the replication feature in the face of link
 # failures:
 srcdir=`dirname $0`
-PYTHON_DIR=${srcdir}/../../../python
+PYTHON_DIR=$srcdir/../../../python
 
 trap stop_brokers EXIT
 
@@ -37,7 +37,7 @@
         unset BROKER_A
     fi      
     if [[ $NODE_1 ]] ; then
-        ./stop_cluster
+        $srcdir/stop_cluster
         unset NODE_1
     fi
 }

Modified: qpid/trunk/qpid/cpp/src/tests/ssl_test
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ssl_test?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ssl_test (original)
+++ qpid/trunk/qpid/cpp/src/tests/ssl_test Tue Feb 24 19:48:54 2009
@@ -39,8 +39,7 @@
 }
 
 start_broker() {
-    ../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir
--auth no --config $CONFIG --load-module ../.libs/ssl.so --ssl-cert-db $CERT_DIR --ssl-cert-password-file
$CERT_PW_FILE > qpidd.port
-    PORT=`cat qpidd.port`
+    PORT=`../qpidd --daemon --transport ssl --port 0 --ssl-port 0 --no-data-dir --no-module-dir
--auth no --config $CONFIG --load-module ../.libs/ssl.so --ssl-cert-db $CERT_DIR --ssl-cert-password-file
$CERT_PW_FILE`
 }
 
 stop_broker() {

Modified: qpid/trunk/qpid/cpp/src/tests/start_cluster
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/start_cluster?rev=747528&r1=747527&r2=747528&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/start_cluster (original)
+++ qpid/trunk/qpid/cpp/src/tests/start_cluster Tue Feb 24 19:48:54 2009
@@ -32,7 +32,7 @@
 rm -f cluster*.log
 SIZE=${1:-1}; shift
 CLUSTER=`pwd`		# Cluster name=pwd, avoid clashes.
-OPTS="-d --no-module-dir --load-module ../.libs/cluster.so  --cluster-name=$CLUSTER --no-data-dir
--auth=no $*"
+OPTS="-d --no-module-dir --load-module ../.libs/cluster.so  --cluster-name=$CLUSTER --no-data-dir
--auth=no $@"
 
 for (( i=0; i<SIZE; ++i )); do
     PORT=`with_ais_group ../qpidd  -p0 --log-to-file=cluster$i.log $OPTS`  || exit 1



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message