qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r889813 - in /qpid/trunk/qpid: cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/cluster/ cpp/src/qpid/framing/ cpp/src/tests/ python/qpid/
Date Fri, 11 Dec 2009 20:55:47 GMT
Author: aconway
Date: Fri Dec 11 20:55:45 2009
New Revision: 889813

URL: http://svn.apache.org/viewvc?rev=889813&view=rev
Log:
QPID-2266:  error sending update: Enqueue capacity threshold exceeded

Fix for the problem with a test to verify that messages going to the store
have the same headers and content-size for an updatee or a broker that
receives the publish directly.

Added:
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp
      - copied, changed from r889736, qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h
Modified:
    qpid/trunk/qpid/cpp/src/cluster.cmake
    qpid/trunk/qpid/cpp/src/cluster.mk
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h
    qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp
    qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/cpp/src/tests/test_store.cpp
    qpid/trunk/qpid/python/qpid/brokertest.py

Modified: qpid/trunk/qpid/cpp/src/cluster.cmake
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.cmake?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.cmake (original)
+++ qpid/trunk/qpid/cpp/src/cluster.cmake Fri Dec 11 20:55:45 2009
@@ -109,6 +109,7 @@
        qpid/cluster/ExpiryPolicy.cpp
        qpid/cluster/FailoverExchange.cpp
        qpid/cluster/FailoverExchange.h
+       qpid/cluster/UpdateExchange.cpp
        qpid/cluster/UpdateExchange.h
        qpid/cluster/UpdateReceiver.h
        qpid/cluster/LockedConnectionMap.h

Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Fri Dec 11 20:55:45 2009
@@ -69,6 +69,7 @@
   qpid/cluster/FailoverExchange.cpp		\
   qpid/cluster/FailoverExchange.h		\
   qpid/cluster/UpdateExchange.h			\
+  qpid/cluster/UpdateExchange.cpp		\
   qpid/cluster/UpdateReceiver.h			\
   qpid/cluster/LockedConnectionMap.h		\
   qpid/cluster/Multicaster.cpp			\

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri Dec 11 20:55:45 2009
@@ -154,6 +154,7 @@
     queueCleaner(queues, timer),
     queueEvents(poller,!conf.asyncQueueEvents), 
     recovery(true),
+    clusterUpdatee(false),
     expiryPolicy(new ExpiryPolicy),
     connectionCounter(conf.maxConnections),
     getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Fri Dec 11 20:55:45 2009
@@ -168,8 +168,10 @@
     std::vector<Url> getKnownBrokersImpl();
     std::string federationTag;
     bool recovery;
+    bool clusterUpdatee;
     boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
     ConnectionCounter connectionCounter;
+    
   public:
     virtual ~Broker();
 
@@ -259,6 +261,9 @@
     void setRecovery(bool set) { recovery = set; }
     bool getRecovery() const { return recovery; }
 
+    void setClusterUpdatee(bool set) { clusterUpdatee = set; }
+    bool isClusterUpdatee() const { return clusterUpdatee; }
+
     management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
     
     ConnectionCounter& getConnectionCounter() {return connectionCounter;}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Fri Dec 11 20:55:45 2009
@@ -425,19 +425,4 @@
     return getProperties<MessageProperties>()->getApplicationHeaders();
 }
 
-
-void Message::setUpdateDestination(const std::string& d)
-{
-    updateDestination = d;
-}
-
-
-bool Message::isUpdateMessage()
-{
-    return updateDestination.size() && isA<MessageTransferBody>() 
-        && getMethod<MessageTransferBody>()->getDestination() == updateDestination;
-}
-
-std::string Message::updateDestination;
-
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Dec 11 20:55:45 2009
@@ -104,6 +104,10 @@
         return frames.as<T>();
     }
 
+    template <class T> T* getMethod() {
+        return frames.as<T>();
+    }
+
     template <class T> bool isA() const {
         return frames.isA<T>();
     }
@@ -157,9 +161,6 @@
     void setDequeueCompleteCallback(MessageCallback& cb);
     void resetDequeueCompleteCallback();
 
-    bool isUpdateMessage();
-    static void setUpdateDestination(const std::string&);
-
   private:
     typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement;
 
@@ -190,7 +191,6 @@
     MessageCallback* dequeueCallback;
 
     uint32_t requiredCredit;
-    static std::string updateDestination;
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Dec 11 20:55:45 2009
@@ -598,7 +598,7 @@
             string key = ft->getAsString(qpidVQMatchProperty);
 
             i = lvq.find(key);
-            if (i == lvq.end() || msg->isUpdateMessage()){
+            if (i == lvq.end() || (broker && broker->isClusterUpdatee())) {
                 messages.push_back(qm);
                 listeners.populate(copy);
                 lvq[key] = msg; 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Dec 11 20:55:45 2009
@@ -619,6 +619,7 @@
 
         if (initMap.isUpdateNeeded())  { // Joining established cluster.
             broker.setRecovery(false); // Ditch my current store.
+            broker.setClusterUpdatee(true);
             state = JOINER;
         }
         else {                  // I can go ready.
@@ -813,6 +814,7 @@
         memberUpdate(l);
         mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
         state = CATCHUP;
+        broker.setClusterUpdatee(false);
         discarding = false;     // ok to set, we're stalled for update.
         QPID_LOG(notice, *this << " update complete, starting catch-up.");
         deliverEventQueue.start();

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Fri Dec 11 20:55:45 2009
@@ -139,7 +139,6 @@
         broker->setConnectionFactory(
             boost::shared_ptr<sys::ConnectionCodec::Factory>(
                 new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
-        broker::Message::setUpdateDestination(UpdateClient::UPDATE);
         ManagementAgent* mgmt = broker->getManagementAgent();
         if (mgmt) {
             std::auto_ptr<IdAllocator> allocator(new UpdateClientIdAllocator());

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Fri Dec 11 20:55:45 2009
@@ -217,10 +217,11 @@
         // Disable client code that clears the delivery-properties.exchange
         sb.get()->setDoClearDeliveryPropertiesExchange(false);
         framing::MessageTransferBody transfer(
-            framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE,
-            message::ACQUIRE_MODE_PRE_ACQUIRED);
+            *message.payload->getFrames().as<framing::MessageTransferBody>());
+        transfer.setDestination(UpdateClient::UPDATE);
         
-        sb.get()->send(transfer, message.payload->getFrames(), !message.payload->isContentReleased());
+        sb.get()->send(transfer, message.payload->getFrames(),
+                       !message.payload->isContentReleased());
         if (message.payload->isContentReleased()){
             uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize;
             uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();

Copied: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp (from r889736, qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp?p2=qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp&p1=qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h&r1=889736&r2=889813&rev=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp Fri Dec 11 20:55:45 2009
@@ -1,6 +1,3 @@
-#ifndef QPID_CLUSTER_UPDATEEXCHANGE_H
-#define QPID_CLUSTER_UPDATEEXCHANGE_H
-
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -21,25 +18,30 @@
  * under the License.
  *
  */
-
-#include "qpid/cluster/UpdateClient.h"
-#include "qpid/broker/FanOutExchange.h"
-
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/broker/Message.h"
+#include "UpdateExchange.h"
 
 namespace qpid {
 namespace cluster {
 
-/**
- * A keyless exchange (like fanout exchange) that does not modify delivery-properties.exchange
- * on messages.
- */
-class UpdateExchange : public broker::FanOutExchange
-{
-  public:
-    UpdateExchange(management::Manageable* parent) : broker::Exchange(UpdateClient::UPDATE,
parent), broker::FanOutExchange(UpdateClient::UPDATE, parent) {}
-    void setProperties(const boost::intrusive_ptr<broker::Message>&) {}
-};
+using framing::MessageTransferBody;
+using framing::DeliveryProperties;
 
-}} // namespace qpid::cluster
+UpdateExchange::UpdateExchange(management::Manageable* parent)
+    : broker::Exchange(UpdateClient::UPDATE, parent),
+      broker::FanOutExchange(UpdateClient::UPDATE, parent) {}
+
+
+void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>&
msg) {
+    MessageTransferBody* transfer = msg->getMethod<MessageTransferBody>();
+    assert(transfer);
+    const DeliveryProperties* props = msg->getProperties<DeliveryProperties>();
+    assert(props);
+    if (props->hasExchange())
+        transfer->setDestination(props->getExchange());
+    else
+        transfer->clearDestinationFlag();
+}
 
-#endif  /*!QPID_CLUSTER_UPDATEEXCHANGE_H*/
+}} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateExchange.h Fri Dec 11 20:55:45 2009
@@ -30,14 +30,14 @@
 namespace cluster {
 
 /**
- * A keyless exchange (like fanout exchange) that does not modify delivery-properties.exchange
- * on messages.
+ * A keyless exchange (like fanout exchange) that does not modify
+ * delivery-properties.exchange but copies it to the MessageTransfer.
  */
 class UpdateExchange : public broker::FanOutExchange
 {
   public:
-    UpdateExchange(management::Manageable* parent) : broker::Exchange(UpdateClient::UPDATE,
parent), broker::FanOutExchange(UpdateClient::UPDATE, parent) {}
-    void setProperties(const boost::intrusive_ptr<broker::Message>&) {}
+    UpdateExchange(management::Manageable* parent);
+    void setProperties(const boost::intrusive_ptr<broker::Message>&);
 };
 
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.cpp Fri Dec 11 20:55:45 2009
@@ -52,6 +52,11 @@
     return parts.empty() ? 0 : parts[0].getMethod();
 }
 
+AMQMethodBody* FrameSet::getMethod() 
+{
+    return parts.empty() ? 0 : parts[0].getMethod();
+}
+
 const AMQHeaderBody* FrameSet::getHeaders() const
 {
     return parts.size() < 2 ? 0 : parts[1].castBody<AMQHeaderBody>();

Modified: qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/FrameSet.h Fri Dec 11 20:55:45 2009
@@ -57,6 +57,7 @@
     bool isContentBearing() const;
 
     QPID_COMMON_EXTERN const AMQMethodBody* getMethod() const;
+    QPID_COMMON_EXTERN AMQMethodBody* getMethod();
     QPID_COMMON_EXTERN const AMQHeaderBody* getHeaders() const;
     QPID_COMMON_EXTERN AMQHeaderBody* getHeaders();
      
@@ -70,6 +71,11 @@
         return (method && method->isA<T>()) ? dynamic_cast<const T*>(method)
: 0;
     }    
 
+    template <class T>  T* as()  {
+        AMQMethodBody* method = getMethod();
+        return (method && method->isA<T>()) ? dynamic_cast<T*>(method)
: 0;
+    }    
+
     template <class T> const T* getHeaderProperties() const {
         const AMQHeaderBody* header = getHeaders();
         return header ? header->get<T>() : 0;

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Fri Dec 11 20:55:45 2009
@@ -57,6 +57,34 @@
         self.assertEqual("y", m.content)
         s2.connection.close()
 
+    def test_store_direct_update_match(self):
+        """Verify that brokers stores an identical message whether they receive it
+        direct from clients or during an update, no header or other differences"""
+        cluster = self.cluster(0, args=["--load-module", self.test_store_lib])
+        cluster.start(args=["--test-store-dump", "direct.dump"])
+        # Try messages with various headers
+        cluster[0].send_message("q", Message(durable=True, content="foobar",
+                                             subject="subject",
+                                             reply_to="reply_to",
+                                             properties={"n":10}))
+        # Try messages of different sizes
+        for size in range(0,10000,100):
+            cluster[0].send_message("q", Message(content="x"*size, durable=True))
+        # Try sending via named exchange
+        c = cluster[0].connect_old()
+        s = c.session(str(qpid.datatypes.uuid4()))
+        s.exchange_bind(exchange="amq.direct", binding_key="foo", queue="q")
+        props = s.delivery_properties(routing_key="foo", delivery_mode=2)
+        s.message_transfer(
+            destination="amq.direct",
+            message=qpid.datatypes.Message(props, "content"))
+
+        # Now update a new member and compare their dumps.
+        cluster.start(args=["--test-store-dump", "updatee.dump"])
+        assert file("direct.dump").read() == file("updatee.dump").read()
+        os.remove("direct.dump")
+        os.remove("updatee.dump")
+        
 class LongTests(BrokerTest):
     """Tests that can run for a long time if -DDURATION=<minutes> is set"""
     def duration(self):

Modified: qpid/trunk/qpid/cpp/src/tests/test_store.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/test_store.cpp?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/test_store.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/test_store.cpp Fri Dec 11 20:55:45 2009
@@ -34,11 +34,14 @@
 
 #include "qpid/broker/NullMessageStore.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/framing/AMQFrame.h"
 #include "qpid/log/Statement.h"
 #include "qpid/Plugin.h"
 #include "qpid/Options.h"
 #include <boost/cast.hpp>
 #include <boost/lexical_cast.hpp>
+#include <memory>
+#include <fstream>
 
 using namespace qpid;
 using namespace broker;
@@ -51,10 +54,13 @@
 struct TestStoreOptions : public Options {
 
     string name;
+    string dump;
 
     TestStoreOptions() : Options("Test Store Options") {
         addOptions()
-            ("test-store-name", optValue(name, "NAME"), "Name to identify test store instance.");
+            ("test-store-name", optValue(name, "NAME"), "Name of test store instance.")
+            ("test-store-dump", optValue(dump, "FILE"), "File to dump enqueued messages.")
+            ;
     }
 };
 
@@ -71,19 +77,38 @@
 
 class TestStore : public NullMessageStore {
   public:
-    TestStore(const string& name_, Broker& broker_) : name(name_), broker(broker_)
{}
+    TestStore(const TestStoreOptions& opts, Broker& broker_)
+        : options(opts), name(opts.name), broker(broker_)
+    {
+        QPID_LOG(info, "TestStore name=" << name << " dump=" << options.dump);
+        if (!options.dump.empty()) 
+            dump.reset(new ofstream(options.dump.c_str()));
+    }
 
     ~TestStore() {
         for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1));
     }
 
+    virtual bool isNull() const { return false; }
+    
     void enqueue(TransactionContext* ,
-                 const boost::intrusive_ptr<PersistableMessage>& msg,
+                 const boost::intrusive_ptr<PersistableMessage>& pmsg,
                  const PersistableQueue& )
     {
-        string data = boost::polymorphic_downcast<Message*>(msg.get())->getFrames().getContent();
+        Message* msg = dynamic_cast<Message*>(pmsg.get());
+        assert(msg);
+
+        // Dump the message if there is a dump file.
+        if (dump.get()) {
+            msg->getFrames().getMethod()->print(*dump);
+            *dump  << endl << "  ";
+            msg->getFrames().getHeaders()->print(*dump);
+            *dump << endl << "  ";
+            *dump << msg->getFrames().getContentSize() << endl;
+        }
 
         // Check the message for special instructions.
+        string data = msg->getFrames().getContent();
         size_t i = string::npos;
         size_t j = string::npos;
         if (strncmp(data.c_str(), TEST_STORE_DO.c_str(), strlen(TEST_STORE_DO.c_str())) ==
0
@@ -119,9 +144,11 @@
 
   private:
     static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC;
+    TestStoreOptions options;
     string name;
     Broker& broker;
     vector<Thread> threads;
+    std::auto_ptr<ofstream> dump;
 };
 
 const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: ";
@@ -139,7 +166,7 @@
     {
         Broker* broker = dynamic_cast<Broker*>(&target);
         if (!broker) return;
-        boost::shared_ptr<MessageStore> p(new TestStore(options.name, *broker));
+        boost::shared_ptr<MessageStore> p(new TestStore(options, *broker));
         broker->setStore (p);
     }
 

Modified: qpid/trunk/qpid/python/qpid/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=889813&r1=889812&r2=889813&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/brokertest.py (original)
+++ qpid/trunk/qpid/python/qpid/brokertest.py Fri Dec 11 20:55:45 2009
@@ -259,15 +259,15 @@
         self.args += [ "--load-module", BrokerTest.cluster_lib ]
         self.start_n(count, expect=expect, wait=wait)
 
-    def start(self, name=None, expect=EXPECT_RUNNING, wait=True):
+    def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[]):
         """Add a broker to the cluster. Returns the index of the new broker."""
         if not name: name="%s-%d" % (self.name, len(self._brokers))
         log.debug("Cluster %s starting member %s" % (self.name, name))
-        self._brokers.append(self.test.broker(self.args, name, expect, wait))
+        self._brokers.append(self.test.broker(self.args+args, name, expect, wait))
         return self._brokers[-1]
 
-    def start_n(self, count, expect=EXPECT_RUNNING, wait=True):
-        for i in range(count): self.start(expect=expect, wait=wait)
+    def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[]):
+        for i in range(count): self.start(expect=expect, wait=wait, args=args)
 
     # Behave like a list of brokers.
     def __len__(self): return len(self._brokers)
@@ -289,6 +289,7 @@
     receiver_exec = os.getenv("RECEIVER_EXEC")
     sender_exec = os.getenv("SENDER_EXEC")
     store_lib = os.getenv("STORE_LIB")
+    test_store_lib = os.getenv("TEST_STORE_LIB")
     rootdir = os.getcwd()
 
     def configure(self, config): self.config=config



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message