qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r743114 - in /qpid/trunk/qpid/cpp/src: qpid/amqp_0_10/ qpid/cluster/ tests/
Date Tue, 10 Feb 2009 21:42:10 GMT
Author: aconway
Date: Tue Feb 10 21:42:10 2009
New Revision: 743114

URL: http://svn.apache.org/viewvc?rev=743114&view=rev
Log:
Fix cluster flow control bug: hang with large messages (>frame-max) and low --cluster-read-max.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/UnknownType.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
    qpid/trunk/qpid/cpp/src/tests/latencytest.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/UnknownType.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/UnknownType.h?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/UnknownType.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/UnknownType.h Tue Feb 10 21:42:10 2009
@@ -21,9 +21,9 @@
  * under the License.
  *
  */
+#include "qpid/sys/IntegerTypes.h"
 #include <vector>
 #include <iosfwd>
-#include <stdint.h>
 
 namespace qpid {
 namespace amqp_0_10 {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Feb 10 21:42:10 2009
@@ -103,7 +103,7 @@
                       "Error delivering frames",
                       poller),
     connections(*this),
-    decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)),
+    decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1), connections),
     expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, myId,
broker.getTimer())),
     frameId(0),
     initialized(false),

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Feb 10 21:42:10 2009
@@ -62,14 +62,14 @@
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
                        const std::string& wrappedId, ConnectionId myId)
     : cluster(c), self(myId), catchUp(false), output(*this, out),
-      connection(&output, cluster.getBroker(), wrappedId), readCredit(0), expectProtocolHeader(false)
+      connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false)
 { init(); }
 
 // Local connections
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
                        const std::string& wrappedId, MemberId myId, bool isCatchUp, bool
isLink)
     : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
-      connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId
: 0), readCredit(0),
+      connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId
: 0),
       expectProtocolHeader(isLink)
 { init(); }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue Feb 10 21:42:10 2009
@@ -146,6 +146,8 @@
     // Encoded queue/exchange replication.
     void queue(const std::string& encoded);
     void exchange(const std::string& encoded);
+
+    void giveReadCredit(int credit) { output.giveReadCredit(credit); }
     
   private:
     void init();
@@ -171,7 +173,6 @@
     framing::SequenceNumber deliverSeq;
     framing::ChannelId currentChannel;
     boost::shared_ptr<broker::TxBuffer> txBuffer;
-    int readCredit;
     bool expectProtocolHeader;
 
     static qpid::sys::AtomicValue<uint64_t> catchUpId;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp Tue Feb 10 21:42:10 2009
@@ -21,28 +21,34 @@
 
 #include "ConnectionDecoder.h"
 #include "EventFrame.h"
+#include "ConnectionMap.h"
 
 namespace qpid {
 namespace cluster {
 
 using namespace framing;
 
-ConnectionDecoder::ConnectionDecoder(const Handler& h) : handler(h), readCredit(0) {}
+ConnectionDecoder::ConnectionDecoder(const Handler& h) : handler(h) {}
 
-void ConnectionDecoder::decode(const EventHeader& eh, const void* data) {
+void ConnectionDecoder::decode(const EventHeader& eh, const void* data, ConnectionMap&
map) {
     assert(eh.getType() == DATA); // Only handle connection data events.
     const char* cp = static_cast<const char*>(data);
     Buffer buf(const_cast<char*>(cp), eh.getSize());
-    // Set read credit on the last frame in the event.
-    ++readCredit;               // One credit per event = connection read buffer.
-    if (decoder.decode(buf)) { // Decoded a frame
+    if (decoder.decode(buf)) {  // Decoded a frame
         AMQFrame frame(decoder.frame);
         while (decoder.decode(buf)) {
             handler(EventFrame(eh, frame));
             frame = decoder.frame;
         }
-        handler(EventFrame(eh, frame, readCredit));
-        readCredit = 0;         // Reset credit for next event.
+        handler(EventFrame(eh, frame, 1)); // Set read-credit on the last frame.
+    }
+    else {
+        // We must give 1 unit read credit per event.
+        // This event does not contain any complete frames so 
+        // we must give read credit directly.
+        ConnectionPtr connection = map.getLocal(eh.getConnectionId());
+        if (connection)
+            connection->giveReadCredit(1);
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h Tue Feb 10 21:42:10 2009
@@ -30,6 +30,8 @@
 
 class EventHeader;
 class EventFrame;
+class ConnectionMap;
+
 /**
  * Decodes delivered connection data Event's as EventFrame's for a
  * connection replica, local or shadow. Manages state for frame
@@ -47,12 +49,11 @@
     /** Takes EventHeader + data rather than Event so that the caller can
      * pass a pointer to connection data or a CPG buffer directly without copy.
      */
-    void decode(const EventHeader& eh, const void* data);
+    void decode(const EventHeader& eh, const void* data, ConnectionMap& connections);
 
   private:
     Handler handler;
     framing::FrameDecoder decoder;
-    int readCredit;
 };
 
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp Tue Feb 10 21:42:10 2009
@@ -62,6 +62,12 @@
     return i->second;
 }
 
+ConnectionMap::ConnectionPtr ConnectionMap::getLocal(const ConnectionId& id) {
+    if (id.getMember() != cluster.getId()) return 0;
+    Map::const_iterator i = map.find(id);
+    return i == map.end() ? 0 : i->second;
+}
+
 ConnectionMap::Vector ConnectionMap::values() const {
     Vector result(map.size());
     std::transform(map.begin(), map.end(), result.begin(),

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h Tue Feb 10 21:42:10 2009
@@ -60,6 +60,9 @@
      */ 
     ConnectionPtr get(const ConnectionId& id);
 
+    /** If ID is a local connection and in the map return it, else return 0 */
+    ConnectionPtr getLocal(const ConnectionId& id);
+        
     /** Get connections for sending an update. */
     Vector values() const;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp Tue Feb 10 21:42:10 2009
@@ -29,12 +29,12 @@
 
 using namespace framing;
 
-Decoder::Decoder(const Handler& h) : handler(h) {}
+Decoder::Decoder(const Handler& h, ConnectionMap& cm) : handler(h), connections(cm)
{}
 
 void Decoder::decode(const EventHeader& eh, const void* data) {
     ConnectionId id = eh.getConnectionId();
     std::pair<Map::iterator, bool> ib = map.insert(id, new ConnectionDecoder(handler));
-    ptr_map_ptr(ib.first)->decode(eh, data);
+    ptr_map_ptr(ib.first)->decode(eh, data, connections);
 }
 
 void Decoder::erase(const ConnectionId& c) {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h Tue Feb 10 21:42:10 2009
@@ -30,6 +30,7 @@
 namespace cluster {
 
 class EventHeader;
+class ConnectionMap;
 
 /**
  * Holds a map of ConnectionDecoders. Decodes Events into EventFrames
@@ -42,7 +43,7 @@
   public:
     typedef boost::function<void(const EventFrame&)> Handler;
 
-    Decoder(const Handler& h);
+    Decoder(const Handler& h, ConnectionMap&);
 
     /** Takes EventHeader + data rather than Event so that the caller can
      * pass a pointer to connection data or a CPG buffer directly without copy.
@@ -56,7 +57,9 @@
     typedef boost::ptr_map<ConnectionId, ConnectionDecoder> Map;
     Handler handler;
     Map map;
+    ConnectionMap& connections;
 };
+
 }} // namespace qpid::cluster
 
 #endif  /*!QPID_CLUSTER_DECODER_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/types.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/types.h Tue Feb 10 21:42:10 2009
@@ -24,11 +24,12 @@
 
 #include "config.h"
 #include "qpid/Url.h"
+#include "qpid/sys/IntegerTypes.h"
 #include <boost/intrusive_ptr.hpp>
 #include <utility>
 #include <iosfwd>
 #include <string>
-#include <stdint.h>
+
 
 extern "C" {
 #if defined (HAVE_OPENAIS_CPG_H)

Modified: qpid/trunk/qpid/cpp/src/tests/latencytest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/latencytest.cpp?rev=743114&r1=743113&r2=743114&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/latencytest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/latencytest.cpp Tue Feb 10 21:42:10 2009
@@ -223,6 +223,7 @@
     if (msgCount) {
         std::cout << "Warning: found " << msgCount << " msgs on " <<
queue << ". Purging..." << std::endl;
         session.queuePurge(arg::queue=queue);
+        session.sync();
     }
     SubscriptionSettings settings;
     if (opts.prefetch) {
@@ -245,10 +246,8 @@
 {
     ++count;
     uint64_t sentAt = msg.getDeliveryProperties().getTimestamp();
-    //uint64_t sentAt = msg.getHeaders().getTimestamp("sent-at");// TODO: add support for
uint64_t as a field table type
     uint64_t receivedAt = current_time();
 
-    //std::cerr << "Latency: " << (receivedAt - sentAt) << std::endl;
     stats.update(((double) (receivedAt - sentAt)) / TIME_MSEC);
 
     if (!opts.rate && count >= opts.count) {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message