qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1061308 - in /qpid/trunk/qpid/cpp: include/qpid/framing/ResizableBuffer.h src/Makefile.am src/qpid/management/ManagementAgent.cpp src/qpid/management/ManagementAgent.h src/tests/cluster_test_logs.py src/tests/cluster_tests.py
Date Thu, 20 Jan 2011 14:13:08 GMT
Author: aconway
Date: Thu Jan 20 14:13:08 2011
New Revision: 1061308

URL: http://svn.apache.org/viewvc?rev=1061308&view=rev
Log:
Bug 654872, QPID-3007: Batch management messages by count, not size.

QMF V1 management messages were being batched by accumulating up to a
certain total size of data. Since management messages may have
different sizes on brokers in a cluster, this was leading to
inconsistencies.

This patch batches V1 messages by count rather than by size, similar
to V2 messages.

Added:
    qpid/trunk/qpid/cpp/include/qpid/framing/ResizableBuffer.h   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
    qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py

Added: qpid/trunk/qpid/cpp/include/qpid/framing/ResizableBuffer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/framing/ResizableBuffer.h?rev=1061308&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/framing/ResizableBuffer.h (added)
+++ qpid/trunk/qpid/cpp/include/qpid/framing/ResizableBuffer.h Thu Jan 20 14:13:08 2011
@@ -0,0 +1,60 @@
+#ifndef QPID_FRAMING_RESIZABLEBUFFER_H
+#define QPID_FRAMING_RESIZABLEBUFFER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Buffer.h"
+#include <vector>
+
+namespace qpid {
+namespace framing {
+
+/**
+ * A buffer that maintains its own storage and can be resized, 
+ * keeping any data already written to the buffer. 
+ */
+class ResizableBuffer : public Buffer
+{
+  public:
+    ResizableBuffer(size_t initialSize) : store(initialSize) {
+        static_cast<Buffer&>(*this) = Buffer(&store[0], store.size());
+    }
+    
+    void resize(size_t newSize) {
+        size_t oldPos =  getPosition();
+        store.resize(newSize);
+        static_cast<Buffer&>(*this) = Buffer(&store[0], store.size());
+        setPosition(oldPos);
+    }
+
+    /** Make sure at least n bytes are available */
+    void makeAvailable(size_t n) {
+        if (n > available())
+            resize(getSize() + n - available());
+    }
+    
+  private:
+    std::vector<char> store;
+};
+}} // namespace qpid::framing
+
+#endif  /*!QPID_FRAMING_RESIZABLEBUFFER_H*/

Propchange: qpid/trunk/qpid/cpp/include/qpid/framing/ResizableBuffer.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/include/qpid/framing/ResizableBuffer.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1061308&r1=1061307&r2=1061308&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Thu Jan 20 14:13:08 2011
@@ -373,6 +373,7 @@ libqpidcommon_la_SOURCES +=			\
   qpid/framing/BodyHandler.cpp			\
   qpid/framing/BodyHandler.h			\
   qpid/framing/Buffer.cpp			\
+  qpid/framing/ResizableBuffer.h		\
   qpid/framing/ChannelHandler.h			\
   qpid/framing/Endian.cpp			\
   qpid/framing/Endian.h				\

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1061308&r1=1061307&r2=1061308&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Thu Jan 20 14:13:08 2011
@@ -106,7 +106,8 @@ ManagementAgent::ManagementAgent (const 
     startTime(sys::now()),
     suppressed(false), disallowAllV1Methods(false),
     vendorNameKey(defaultVendorName), productNameKey(defaultProductName),
-    qmf1Support(qmfV1), qmf2Support(qmfV2), maxV2ReplyObjs(100)
+    qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100),
+    msgBuffer(MA_BUFFER_SIZE)
 {
     nextObjectId   = 1;
     brokerBank     = 1;
@@ -663,7 +664,6 @@ void ManagementAgent::periodicProcessing
 #define HEADROOM  4096
     debugSnapshot("Management agent periodic processing");
     sys::Mutex::ScopedLock lock (userLock);
-    char                msgChars[BUFSIZE];
     uint32_t            contentSize;
     string              routingKey;
     string sBuf;
@@ -704,7 +704,7 @@ void ManagementAgent::periodicProcessing
         for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++)
{
             std::string packageName;
             std::string className;
-            Buffer msgBuffer(msgChars, BUFSIZE);
+            msgBuffer.reset();
             uint32_t v1Objs = 0;
             uint32_t v2Objs = 0;
             Variant::List list_;
@@ -715,6 +715,7 @@ void ManagementAgent::periodicProcessing
 
             for (DeletedObjectList::iterator lIter = mIter->second.begin();
                  lIter != mIter->second.end(); lIter++) {
+                msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space.
                 std::string oid = (*lIter)->objectId;
                 if (!(*lIter)->encodedV1Config.empty()) {
                     encodeHeader(msgBuffer, 'c');
@@ -730,9 +731,9 @@ void ManagementAgent::periodicProcessing
                              << " len=" <<  (*lIter)->encodedV1Inst.size());
                     v1Objs++;
                 }
-                if (v1Objs && msgBuffer.available() < HEADROOM) {
+                if (v1Objs >= maxReplyObjs) {
                     v1Objs = 0;
-                    contentSize = BUFSIZE - msgBuffer.available();
+                    contentSize = msgBuffer.getSize();
                     stringstream key;
                     key << "console.obj.1.0." << packageName << "." <<
className;
                     msgBuffer.reset();
@@ -744,7 +745,7 @@ void ManagementAgent::periodicProcessing
                 if (!(*lIter)->encodedV2.empty()) {
                     QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2);
                     list_.push_back((*lIter)->encodedV2);
-                    if (++v2Objs >= maxV2ReplyObjs) {
+                    if (++v2Objs >= maxReplyObjs) {
                         v2Objs = 0;
 
                         string content;
@@ -815,11 +816,11 @@ void ManagementAgent::periodicProcessing
     // sendBuffer() call, so always restart the search after a sendBuffer() call
     //
     while (1) {
-        Buffer msgBuffer(msgChars, BUFSIZE);
+        msgBuffer.reset();
         Variant::List list_;
         uint32_t pcount;
         uint32_t scount;
-        uint32_t v2Objs;
+        uint32_t v1Objs, v2Objs;
         ManagementObjectMap::iterator baseIter;
         std::string packageName;
         std::string className;
@@ -842,6 +843,7 @@ void ManagementAgent::periodicProcessing
             break;  // done - all objects processed
 
         pcount = scount = 0;
+        v1Objs = 0;
         v2Objs = 0;
         list_.clear();
         msgBuffer.reset();
@@ -849,6 +851,7 @@ void ManagementAgent::periodicProcessing
         for (ManagementObjectMap::iterator iter = baseIter;
              iter != managementObjects.end();
              iter++) {
+            msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space
             ManagementObject* baseObject = baseIter->second;
             ManagementObject* object = iter->second;
             bool send_stats, send_props;
@@ -875,6 +878,7 @@ void ManagementAgent::periodicProcessing
                     QPID_LOG(trace, "Changed V1 properties "
                              << object->getObjectId().getV2Key()
                              << " len=" << msgBuffer.getPosition()-pos);
+                    ++v1Objs;
                 }
 
                 if (send_stats && qmf1Support) {
@@ -886,7 +890,7 @@ void ManagementAgent::periodicProcessing
                     QPID_LOG(trace, "Changed V1 statistics "
                              << object->getObjectId().getV2Key()
                              << " len=" << msgBuffer.getPosition()-pos);
-
+                    ++v1Objs;
                 }
 
                 if ((send_stats || send_props) && qmf2Support) {
@@ -916,8 +920,8 @@ void ManagementAgent::periodicProcessing
 
                 object->setForcePublish(false);
 
-                if ((qmf1Support && (msgBuffer.available() < HEADROOM)) ||
-                    (qmf2Support && (v2Objs >= maxV2ReplyObjs)))
+                if ((qmf1Support && (v1Objs >= maxReplyObjs)) ||
+                    (qmf2Support && (v2Objs >= maxReplyObjs)))
                     break;  // have enough objects, send an indication...
             }
         }
@@ -1967,7 +1971,7 @@ void ManagementAgent::handleGetQueryLH(c
                                                            "_data",
                                                            object->getMd5Sum());
                     _subList.push_back(map_);
-                    if (++objCount >= maxV2ReplyObjs) {
+                    if (++objCount >= maxReplyObjs) {
                         objCount = 0;
                         _list.push_back(_subList);
                         _subList.clear();

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=1061308&r1=1061307&r2=1061308&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Thu Jan 20 14:13:08 2011
@@ -35,6 +35,7 @@
 #include "qpid/types/Variant.h"
 #include <qpid/framing/AMQFrame.h>
 #include <qpid/framing/FieldValue.h>
+#include <qpid/framing/ResizableBuffer.h>
 #include <memory>
 #include <string>
 #include <map>
@@ -330,7 +331,7 @@ private:
 
     // Maximum # of objects allowed in a single V2 response
     // message.
-    uint32_t maxV2ReplyObjs;
+    uint32_t maxReplyObjs;
 
     // list of objects that have been deleted, but have yet to be published
     // one final time.
@@ -343,6 +344,7 @@ private:
     char inputBuffer[MA_BUFFER_SIZE];
     char outputBuffer[MA_BUFFER_SIZE];
     char eventBuffer[MA_BUFFER_SIZE];
+    framing::ResizableBuffer msgBuffer;
 
     void writeData ();
     void periodicProcessing (void);

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py?rev=1061308&r1=1061307&r2=1061308&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test_logs.py Thu Jan 20 14:13:08 2011
@@ -59,7 +59,8 @@ def filter_log(log):
             'task late',
             'task overran',
             'warning CLOSING .* unsent data',
-            'Inter-broker link '
+            'Inter-broker link ',
+            'Running in a cluster, marking store'
             ])
         if re.compile(skip).search(l): continue
 
@@ -85,7 +86,7 @@ def filter_log(log):
         out.write(l)
     out.close()
 
-def verify_logs(logs):
+def verify_logs():
     """Compare log files from cluster brokers, verify that they correspond correctly."""
     # FIXME aconway 2011-01-19: disable when called from unit tests
     # Causing sporadic failures, see https://issues.apache.org/jira/browse/QPID-3007
@@ -110,4 +111,4 @@ def verify_logs(logs):
 
 # Can be run as a script.
 if __name__ == "__main__":
-    verify_logs(glob.glob("*.log"))
+    verify_logs()

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1061308&r1=1061307&r2=1061308&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Thu Jan 20 14:13:08 2011
@@ -302,7 +302,7 @@ acl allow all all
             scanner.join()
         assert scanner.found
         # Verify logs are consistent
-        cluster_test_logs.verify_logs(glob.glob("*.log"))
+        cluster_test_logs.verify_logs()
 
 class LongTests(BrokerTest):
     """Tests that can run for a long time if -DDURATION=<minutes> is set"""
@@ -448,11 +448,20 @@ class LongTests(BrokerTest):
             c.stop()
 
         # Verify that logs are consistent
-        cluster_test_logs.verify_logs(glob.glob("*.log"))
+        cluster_test_logs.verify_logs()
 
     def test_management_qmf2(self):
         self.test_management(args=["--mgmt-qmf2=yes"])
 
+    def test_connect_consistent(self):   # FIXME aconway 2011-01-18:
+        args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
+        cluster = self.cluster(2, args=args)
+        end = time.time() + self.duration()
+        while (time.time() < end):  # Get a management interval
+            for i in xrange(1000): cluster[0].connect().close()
+            cluster_test_logs.verify_logs()
+        
+
 class StoreTests(BrokerTest):
     """
     Cluster tests that can only be run if there is a store available.



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message