qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1509423 [2/2] - in /qpid/trunk/qpid/cpp: include/qpid/framing/ src/ src/qpid/broker/ src/qpid/ha/ src/tests/
Date Thu, 01 Aug 2013 20:27:27 GMT
Modified: qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h?rev=1509423&r1=1509422&r2=1509423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingFixture.h Thu Aug  1 20:27:26 2013
@@ -115,6 +115,7 @@ struct MessagingFixture : public BrokerF
             (boost::format("amqp:tcp:localhost:%1%") % (port)).str());
         connection.open();
         return connection;
+
     }
 
     /** Open a connection to the broker. */
@@ -231,9 +232,10 @@ inline void receive(messaging::Receiver&
 class MethodInvoker
 {
   public:
-    MethodInvoker(messaging::Session& session) : replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
-                                      sender(session.createSender("qmf.default.direct/broker")),
-                                      receiver(session.createReceiver(replyTo)) {}
+    MethodInvoker(messaging::Session session) :
+        replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
+        sender(session.createSender("qmf.default.direct/broker")),
+        receiver(session.createReceiver(replyTo)) {}
 
     void createExchange(const std::string& name, const std::string& type, bool durable=false)
     {
@@ -292,11 +294,14 @@ class MethodInvoker
         methodRequest("delete", params);
     }
 
-    void methodRequest(const std::string& method, const Variant::Map& inParams, Variant::Map* outParams = 0)
+    void methodRequest(
+        const std::string& method,
+        const Variant::Map& inParams, Variant::Map* outParams = 0,
+        const std::string& objectName="org.apache.qpid.broker:broker:amqp-broker")
     {
         Variant::Map content;
         Variant::Map objectId;
-        objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
+        objectId["_object_name"] = objectName;;
         content["_object_id"] = objectId;
         content["_method_name"] = method;
         content["_arguments"] = inParams;

Modified: qpid/trunk/qpid/cpp/src/tests/TransactionObserverTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/TransactionObserverTest.cpp?rev=1509423&r1=1509422&r2=1509423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/TransactionObserverTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/TransactionObserverTest.cpp Thu Aug  1 20:27:26 2013
@@ -89,6 +89,7 @@ Session simpleTxTransaction(MessagingFix
     // Transaction with 1 enqueue and 1 dequeue.
     Session txSession = fix.connection.createTransactionalSession();
     BOOST_CHECK_EQUAL("foo", txSession.createReceiver("q1").fetch().getContent());
+    txSession.acknowledge();
     txSession.createSender("q2;{create:always}").send(msg("bar"));
     return txSession;
 }

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1509423&r1=1509422&r2=1509423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Thu Aug  1 20:27:26 2013
@@ -244,14 +244,10 @@ class Broker(Popen):
 
     def __str__(self): return "Broker<%s %s :%d>"%(self.log, self.pname, self.port())
 
-    def find_log(self):
-        self.log = "%03d:%s.log" % (Broker._log_count, self.name)
-        Broker._log_count += 1
-
     def get_log(self):
         return os.path.abspath(self.log)
 
-    def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None, show_cmd=False):
+    def __init__(self, test, args=[], test_store=False, name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None, show_cmd=False):
         """Start a broker daemon. name determines the data-dir and log
         file names."""
 
@@ -273,11 +269,18 @@ class Broker(Popen):
         else:
             self.name = "broker%d" % Broker._broker_count
             Broker._broker_count += 1
-        self.find_log()
+
+        self.log = "%03d:%s.log" % (Broker._log_count, self.name)
+        self.store_log = "%03d:%s.store.log" % (Broker._log_count, self.name)
+        Broker._log_count += 1
+
         cmd += ["--log-to-file", self.log]
         cmd += ["--log-to-stderr=no"]
         cmd += ["--log-enable=%s"%(log_level or "info+") ]
 
+        if test_store: cmd += ["--load-module", BrokerTest.test_store_lib,
+                               "--test-store-events", self.store_log]
+
         self.datadir = self.name
         cmd += ["--data-dir", self.datadir]
         if show_cmd: print cmd

Added: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=1509423&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Thu Aug  1 20:27:26 2013
@@ -0,0 +1,1231 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "test_tools.h"
+#include "unit_test.h"
+#include "ForkedBroker.h"
+#include "BrokerFixture.h"
+#include "ClusterFixture.h"
+
+#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/client/ConnectionAccess.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/FailoverListener.h"
+#include "qpid/client/FailoverManager.h"
+#include "qpid/client/QueueOptions.h"
+#include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/Cpg.h"
+#include "qpid/cluster/UpdateClient.h"
+#include "qpid/framing/AMQBody.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/enum.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/log/Logger.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Thread.h"
+
+#include <boost/bind.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/assign.hpp>
+
+#include <string>
+#include <iostream>
+#include <fstream>
+#include <iterator>
+#include <vector>
+#include <set>
+#include <algorithm>
+#include <iterator>
+
+
+using namespace std;
+using namespace qpid;
+using namespace qpid::cluster;
+using namespace qpid::framing;
+using namespace qpid::client;
+using namespace boost::assign;
+using broker::Broker;
+using boost::shared_ptr;
+
+namespace qpid {
+namespace tests {
+
+QPID_AUTO_TEST_SUITE(cluster_test)
+
+bool durableFlag = std::getenv("STORE_LIB") != 0;
+
+void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) {
+    ostringstream clusterLib;
+    clusterLib << getLibPath("CLUSTER_LIB");
+    args += "--auth", "no", "--no-module-dir", "--load-module", clusterLib.str();
+    if (durableFlag)
+        args += "--load-module", getLibPath("STORE_LIB"), "TMP_DATA_DIR";
+    else
+        args += "--no-data-dir";
+}
+
+ClusterFixture::Args prepareArgs(const bool durableFlag = false) {
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    return args;
+}
+
+// Timeout for tests that wait for messages
+const sys::Duration TIMEOUT=2*sys::TIME_SEC;
+
+
+ostream& operator<<(ostream& o, const cpg_name* n) {
+    return o << Cpg::str(*n);
+}
+
+ostream& operator<<(ostream& o, const cpg_address& a) {
+    return o << "(" << a.nodeid <<","<<a.pid<<","<<a.reason<<")";
+}
+
+template <class T>
+ostream& operator<<(ostream& o, const pair<T*, int>& array) {
+    o << "{ ";
+    ostream_iterator<cpg_address> i(o, " ");
+    copy(array.first, array.first+array.second, i);
+    o << "}";
+    return o;
+}
+
+template <class C> set<int> makeSet(const C& c) {
+    set<int> s;
+    copy(c.begin(), c.end(), inserter(s, s.begin()));
+    return s;
+}
+
+class Sender {
+  public:
+    Sender(boost::shared_ptr<ConnectionImpl> ci, uint16_t ch) : connection(ci), channel(ch) {}
+    void send(const AMQBody& body, bool firstSeg, bool lastSeg, bool firstFrame, bool lastFrame) {
+        AMQFrame f(body);
+        f.setChannel(channel);
+        f.setFirstSegment(firstSeg);
+        f.setLastSegment(lastSeg);
+        f.setFirstFrame(firstFrame);
+        f.setLastFrame(lastFrame);
+        connection->expand(f.encodedSize(), false);
+        connection->handle(f);
+    }
+
+  private:
+    boost::shared_ptr<ConnectionImpl> connection;
+    uint16_t channel;
+};
+
+int64_t getMsgSequence(const Message& m) {
+    return m.getMessageProperties().getApplicationHeaders().getAsInt64("qpid.msg_sequence");
+}
+
+Message ttlMessage(const string& data, const string& key, uint64_t ttl, bool durable = false) {
+    Message m(data, key);
+    m.getDeliveryProperties().setTtl(ttl);
+    if (durable) m.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+    return m;
+}
+
+Message makeMessage(const string& data, const string& key, bool durable = false) {
+    Message m(data, key);
+    if (durable) m.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+    return m;
+}
+
+vector<string> browse(Client& c, const string& q, int n) {
+    SubscriptionSettings browseSettings(
+        FlowControl::messageCredit(n),
+        ACCEPT_MODE_NONE,
+        ACQUIRE_MODE_NOT_ACQUIRED,
+        0                       // No auto-ack.
+    );
+    LocalQueue lq;
+    c.subs.subscribe(lq, q, browseSettings);
+    c.session.messageFlush(q);
+    vector<string> result;
+    for (int i = 0; i < n; ++i) {
+        Message m;
+        if (!lq.get(m, TIMEOUT))
+            break;
+        result.push_back(m.getData());
+    }
+    c.subs.getSubscription(q).cancel();
+    return result;
+}
+
+ConnectionSettings aclSettings(int port, const std::string& id) {
+    ConnectionSettings settings;
+    settings.port = port;
+    settings.mechanism = "PLAIN";
+    settings.username = id;
+    settings.password = id;
+    return settings;
+}
+
+// An illegal frame body
+struct PoisonPill : public AMQBody {
+    virtual uint8_t type() const { return 0xFF; }
+    virtual void encode(Buffer& ) const {}
+    virtual void decode(Buffer& , uint32_t=0) {}
+    virtual uint32_t encodedSize() const { return 0; }
+
+    virtual void print(std::ostream&) const {};
+    virtual void accept(AMQBodyConstVisitor&) const {};
+
+    virtual AMQMethodBody* getMethod() { return 0; }
+    virtual const AMQMethodBody* getMethod() const { return 0; }
+
+    /** Match if same type and same class/method ID for methods */
+    static bool match(const AMQBody& , const AMQBody& ) { return false; }
+    virtual boost::intrusive_ptr<AMQBody> clone() const { return new PoisonPill; }
+};
+
+QPID_AUTO_TEST_CASE(testBadClientData) {
+    // Ensure that bad data on a client connection closes the
+    // connection but does not stop the broker.
+    ClusterFixture::Args args;
+    prepareArgs(args, false);
+    args += "--log-enable=critical"; // Supress expected errors
+    ClusterFixture cluster(2, args, -1);
+    Client c0(cluster[0]);
+    Client c1(cluster[1]);
+    boost::shared_ptr<client::ConnectionImpl> ci =
+        client::ConnectionAccess::getImpl(c0.connection);
+    AMQFrame poison(boost::intrusive_ptr<AMQBody>(new PoisonPill));
+    ci->expand(poison.encodedSize(), false);
+    ci->handle(poison);
+    {
+        ScopedSuppressLogging sl;
+        BOOST_CHECK_THROW(c0.session.queueQuery("q0"), Exception);
+    }
+    Client c00(cluster[0]);
+    BOOST_CHECK_EQUAL(c00.session.queueQuery("q00").getQueue(), "");
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getQueue(), "");
+}
+
+QPID_AUTO_TEST_CASE(testAcl) {
+    ofstream policyFile("cluster_test.acl");
+    policyFile << "acl allow foo@QPID create queue name=foo" << endl
+               << "acl allow foo@QPID create queue name=foo2" << endl
+               << "acl deny foo@QPID create queue name=bar" << endl
+               << "acl allow all all" << endl;
+    policyFile.close();
+    char cwd[1024];
+    BOOST_CHECK(::getcwd(cwd, sizeof(cwd)));
+    ostringstream aclLib;
+    aclLib << getLibPath("ACL_LIB");
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    args += "--log-enable=critical"; // Supress expected errors
+    args += "--acl-file", string(cwd) + "/cluster_test.acl",
+            "--cluster-mechanism", "PLAIN",
+            "--cluster-username", "cluster",
+            "--cluster-password", "cluster",
+            "--load-module", aclLib.str();
+    ClusterFixture cluster(2, args, -1);
+
+    Client c0(aclSettings(cluster[0], "c0"), "c0");
+    Client c1(aclSettings(cluster[1], "c1"), "c1");
+    Client foo(aclSettings(cluster[1], "foo"), "foo");
+
+    foo.session.queueDeclare("foo", arg::durable=durableFlag);
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("foo").getQueue(), "foo");
+
+    {
+        ScopedSuppressLogging sl;
+        BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::UnauthorizedAccessException);
+    }
+    BOOST_CHECK(c0.session.queueQuery("bar").getQueue().empty());
+    BOOST_CHECK(c1.session.queueQuery("bar").getQueue().empty());
+
+    cluster.add();
+    Client c2(aclSettings(cluster[2], "c2"), "c2");
+    {
+        ScopedSuppressLogging sl;
+        BOOST_CHECK_THROW(foo.session.queueDeclare("bar", arg::durable=durableFlag), framing::UnauthorizedAccessException);
+    }
+    BOOST_CHECK(c2.session.queueQuery("bar").getQueue().empty());
+}
+
+QPID_AUTO_TEST_CASE(testMessageTimeToLive) {
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(2, args, -1);
+    Client c0(cluster[0], "c0");
+    Client c1(cluster[1], "c1");
+    c0.session.queueDeclare("p", arg::durable=durableFlag);
+    c0.session.queueDeclare("q", arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200, durableFlag));
+    c0.session.messageTransfer(arg::content=makeMessage("b", "q", durableFlag));
+    c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 100000, durableFlag));
+    c0.session.messageTransfer(arg::content=makeMessage("y", "p", durableFlag));
+    cluster.add();
+    Client c2(cluster[1], "c2");
+
+    BOOST_CHECK_EQUAL(browse(c0, "p", 1), list_of<string>("x"));
+    BOOST_CHECK_EQUAL(browse(c1, "p", 1), list_of<string>("x"));
+    BOOST_CHECK_EQUAL(browse(c2, "p", 1), list_of<string>("x"));
+
+    sys::usleep(200*1000);
+    BOOST_CHECK_EQUAL(browse(c0, "q", 1), list_of<string>("b"));
+    BOOST_CHECK_EQUAL(browse(c1, "q", 1), list_of<string>("b"));
+    BOOST_CHECK_EQUAL(browse(c2, "q", 1), list_of<string>("b"));
+}
+
+QPID_AUTO_TEST_CASE(testSequenceOptions) {
+    // Make sure the exchange qpid.msg_sequence property is properly replicated.
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c0(cluster[0], "c0");
+    FieldTable ftargs;
+    ftargs.setInt("qpid.msg_sequence", 1);
+    c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag);
+    c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=ftargs);
+    c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k");
+    c0.session.messageTransfer(arg::content=makeMessage("1", "k", durableFlag), arg::destination="ex");
+    c0.session.messageTransfer(arg::content=makeMessage("2", "k", durableFlag), arg::destination="ex");
+    BOOST_CHECK_EQUAL(1, getMsgSequence(c0.subs.get("q", TIMEOUT)));
+    BOOST_CHECK_EQUAL(2, getMsgSequence(c0.subs.get("q", TIMEOUT)));
+
+    cluster.add();
+    Client c1(cluster[1]);
+    c1.session.messageTransfer(arg::content=makeMessage("3", "k", durableFlag), arg::destination="ex");
+    BOOST_CHECK_EQUAL(3, getMsgSequence(c1.subs.get("q", TIMEOUT)));
+}
+
+QPID_AUTO_TEST_CASE(testTxTransaction) {
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c0(cluster[0], "c0");
+    c0.session.queueDeclare(arg::queue="q", arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=makeMessage("A", "q", durableFlag));
+    c0.session.messageTransfer(arg::content=makeMessage("B", "q", durableFlag));
+
+    // Start a transaction that will commit.
+    Session commitSession = c0.connection.newSession("commit");
+    SubscriptionManager commitSubs(commitSession);
+    commitSession.txSelect();
+    commitSession.messageTransfer(arg::content=makeMessage("a", "q", durableFlag));
+    commitSession.messageTransfer(arg::content=makeMessage("b", "q", durableFlag));
+    BOOST_CHECK_EQUAL(commitSubs.get("q", TIMEOUT).getData(), "A");
+
+    // Start a transaction that will roll back.
+    Session rollbackSession = c0.connection.newSession("rollback");
+    SubscriptionManager rollbackSubs(rollbackSession);
+    rollbackSession.txSelect();
+    rollbackSession.messageTransfer(arg::content=makeMessage("1", "q", durableFlag));
+    Message rollbackMessage = rollbackSubs.get("q", TIMEOUT);
+    BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B");
+
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
+    // Add new member mid transaction.
+    cluster.add();
+    Client c1(cluster[1], "c1");
+
+    // More transactional work
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+    rollbackSession.messageTransfer(arg::content=makeMessage("2", "q", durableFlag));
+    commitSession.messageTransfer(arg::content=makeMessage("c", "q", durableFlag));
+    rollbackSession.messageTransfer(arg::content=makeMessage("3", "q", durableFlag));
+
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+
+    // Commit/roll back.
+    commitSession.txCommit();
+    rollbackSession.txRollback();
+    rollbackSession.messageRelease(rollbackMessage.getId());
+
+    // Verify queue status: just the comitted messages and dequeues should remain.
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u);
+    BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "B");
+    BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "a");
+    BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "b");
+    BOOST_CHECK_EQUAL(c1.subs.get("q", TIMEOUT).getData(), "c");
+
+    commitSession.close();
+    rollbackSession.close();
+}
+
+QPID_AUTO_TEST_CASE(testUnacked) {
+    // Verify replication of unacknowledged messages.
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c0(cluster[0], "c0");
+
+    Message m;
+
+    // Create unacked message: acquired but not accepted.
+    SubscriptionSettings manualAccept(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 0);
+    c0.session.queueDeclare("q1", arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=makeMessage("11","q1", durableFlag));
+    LocalQueue q1;
+    c0.subs.subscribe(q1, "q1", manualAccept);
+    BOOST_CHECK_EQUAL(q1.get(TIMEOUT).getData(), "11"); // Acquired but not accepted
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q1").getMessageCount(), 0u); // Gone from queue
+
+    // Create unacked message: not acquired, accepted or completeed.
+    SubscriptionSettings manualAcquire(FlowControl::unlimited(), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_NOT_ACQUIRED, 0);
+    c0.session.queueDeclare("q2", arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=makeMessage("21","q2", durableFlag));
+    c0.session.messageTransfer(arg::content=makeMessage("22","q2", durableFlag));
+    LocalQueue q2;
+    c0.subs.subscribe(q2, "q2", manualAcquire);
+    m = q2.get(TIMEOUT);  // Not acquired or accepted, still on queue
+    BOOST_CHECK_EQUAL(m.getData(), "21");
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 2u); // Not removed
+    c0.subs.getSubscription("q2").acquire(m); // Acquire manually
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // Removed
+    BOOST_CHECK_EQUAL(q2.get(TIMEOUT).getData(), "22"); // Not acquired or accepted, still on queue
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q2").getMessageCount(), 1u); // 1 not acquired.
+
+    // Create empty credit record: acquire and accept but don't complete.
+    SubscriptionSettings manualComplete(FlowControl::messageWindow(1), ACCEPT_MODE_EXPLICIT, ACQUIRE_MODE_PRE_ACQUIRED, 1, MANUAL_COMPLETION);
+    c0.session.queueDeclare("q3", arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=makeMessage("31", "q3", durableFlag));
+    c0.session.messageTransfer(arg::content=makeMessage("32", "q3", durableFlag));
+    LocalQueue q3;
+    c0.subs.subscribe(q3, "q3", manualComplete);
+    Message m31=q3.get(TIMEOUT);
+    BOOST_CHECK_EQUAL(m31.getData(), "31"); // Automatically acquired & accepted but not completed.
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 1u);
+
+    // Add new member while there are unacked messages.
+    cluster.add();
+    Client c1(cluster[1], "c1");
+
+    // Check queue counts
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 0u);
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 1u);
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 1u);
+
+    // Complete the empty credit message, should unblock the message behind it.
+    BOOST_CHECK_THROW(q3.get(0), Exception);
+    c0.session.markCompleted(SequenceSet(m31.getId()), true);
+    BOOST_CHECK_EQUAL(q3.get(TIMEOUT).getData(), "32");
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q3").getMessageCount(), 0u);
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q3").getMessageCount(), 0u);
+
+    // Close the original session - unacked messages should be requeued.
+    c0.session.close();
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q1").getMessageCount(), 1u);
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q2").getMessageCount(), 2u);
+
+    BOOST_CHECK_EQUAL(c1.subs.get("q1", TIMEOUT).getData(), "11");
+    BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "21");
+    BOOST_CHECK_EQUAL(c1.subs.get("q2", TIMEOUT).getData(), "22");
+}
+
+// FIXME aconway 2009-06-17: test for unimplemented feature, enable when implemented.
+void testUpdateTxState() {
+    // Verify that we update transaction state correctly to new members.
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c0(cluster[0], "c0");
+
+    // Do work in a transaction.
+    c0.session.txSelect();
+    c0.session.queueDeclare("q", arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=makeMessage("1","q", durableFlag));
+    c0.session.messageTransfer(arg::content=makeMessage("2","q", durableFlag));
+    Message m;
+    BOOST_CHECK(c0.subs.get(m, "q", TIMEOUT));
+    BOOST_CHECK_EQUAL(m.getData(), "1");
+
+    // New member, TX not comitted, c1 should see nothing.
+    cluster.add();
+    Client c1(cluster[1], "c1");
+    BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
+
+    // After commit c1 shoudl see results of tx.
+    c0.session.txCommit();
+    BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
+    BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
+    BOOST_CHECK_EQUAL(m.getData(), "2");
+
+    // Another transaction with both members active.
+    c0.session.messageTransfer(arg::content=makeMessage("3","q", durableFlag));
+    BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 0u);
+    c0.session.txCommit();
+    BOOST_CHECK_EQUAL(c1.session.queueQuery(arg::queue="q").getMessageCount(), 1u);
+    BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
+    BOOST_CHECK_EQUAL(m.getData(), "3");
+}
+
+QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) {
+    // Verify that we update a partially recieved message to a new member.
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c0(cluster[0], "c0");
+    c0.session.queueDeclare("q", arg::durable=durableFlag);
+    Sender sender(ConnectionAccess::getImpl(c0.connection), c0.session.getChannel());
+
+    // Send first 2 frames of message.
+    MessageTransferBody transfer(
+        ProtocolVersion(), string(), // default exchange.
+        framing::message::ACCEPT_MODE_NONE,
+        framing::message::ACQUIRE_MODE_PRE_ACQUIRED);
+    sender.send(transfer, true, false, true, true);
+    AMQHeaderBody header;
+    header.get<DeliveryProperties>(true)->setRoutingKey("q");
+    if (durableFlag)
+        header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_PERSISTENT);
+    else
+        header.get<DeliveryProperties>(true)->setDeliveryMode(DELIVERY_MODE_NON_PERSISTENT);
+    sender.send(header, false, false, true, true);
+
+    // No reliable way to ensure the partial message has arrived
+    // before we start the new broker, so we sleep.
+    sys::usleep(2500);
+    cluster.add();
+
+    // Send final 2 frames of message.
+    sender.send(AMQContentBody("ab"), false, true, true, false);
+    sender.send(AMQContentBody("cd"), false, true, false, true);
+
+    // Verify message is enqued correctly on second member.
+    Message m;
+    Client c1(cluster[1], "c1");
+    BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
+    BOOST_CHECK_EQUAL(m.getData(), "abcd");
+    BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());
+}
+
+QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c0(cluster[0], "c0");
+    set<int> kb0 = knownBrokerPorts(c0.connection, 1);
+    BOOST_CHECK_EQUAL(kb0.size(), 1u);
+    BOOST_CHECK_EQUAL(kb0, makeSet(cluster));
+
+    cluster.add();
+    Client c1(cluster[1], "c1");
+    set<int> kb1 = knownBrokerPorts(c1.connection, 2);
+    kb0 = knownBrokerPorts(c0.connection, 2);
+    BOOST_CHECK_EQUAL(kb1.size(), 2u);
+    BOOST_CHECK_EQUAL(kb1, makeSet(cluster));
+    BOOST_CHECK_EQUAL(kb1,kb0);
+
+    cluster.add();
+    Client c2(cluster[2], "c2");
+    set<int> kb2 = knownBrokerPorts(c2.connection, 3);
+    kb1 = knownBrokerPorts(c1.connection, 3);
+    kb0 = knownBrokerPorts(c0.connection, 3);
+    BOOST_CHECK_EQUAL(kb2.size(), 3u);
+    BOOST_CHECK_EQUAL(kb2, makeSet(cluster));
+    BOOST_CHECK_EQUAL(kb2,kb0);
+    BOOST_CHECK_EQUAL(kb2,kb1);
+
+    cluster.killWithSilencer(1,c1.connection,9);
+    kb0 = knownBrokerPorts(c0.connection, 2);
+    kb2 = knownBrokerPorts(c2.connection, 2);
+    BOOST_CHECK_EQUAL(kb0.size(), 2u);
+    BOOST_CHECK_EQUAL(kb0, kb2);
+}
+
+QPID_AUTO_TEST_CASE(testUpdateConsumers) {
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+
+    Client c0(cluster[0], "c0");
+    c0.session.queueDeclare("p", arg::durable=durableFlag);
+    c0.session.queueDeclare("q", arg::durable=durableFlag);
+    c0.subs.subscribe(c0.lq, "q", FlowControl::zero());
+    LocalQueue lp;
+    c0.subs.subscribe(lp, "p", FlowControl::messageCredit(1));
+    c0.session.sync();
+
+    // Start new members
+    cluster.add();              // Local
+    Client c1(cluster[1], "c1");
+    cluster.add();
+    Client c2(cluster[2], "c2");
+
+    // Transfer messages
+    c0.session.messageTransfer(arg::content=makeMessage("aaa", "q", durableFlag));
+
+    c0.session.messageTransfer(arg::content=makeMessage("bbb", "p", durableFlag));
+    c0.session.messageTransfer(arg::content=makeMessage("ccc", "p", durableFlag));
+
+    // Activate the subscription, ensure message removed on all queues.
+    c0.subs.setFlowControl("q", FlowControl::unlimited());
+    Message m;
+    BOOST_CHECK(c0.lq.get(m, TIMEOUT));
+    BOOST_CHECK_EQUAL(m.getData(), "aaa");
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+    BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u);
+
+    // Check second subscription's flow control: gets first message, not second.
+    BOOST_CHECK(lp.get(m, TIMEOUT));
+    BOOST_CHECK_EQUAL(m.getData(), "bbb");
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("p").getMessageCount(), 1u);
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 1u);
+    BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 1u);
+
+    BOOST_CHECK(c0.subs.get(m, "p", TIMEOUT));
+    BOOST_CHECK_EQUAL(m.getData(), "ccc");
+
+    // Kill the subscribing member, ensure further messages are not removed.
+    cluster.killWithSilencer(0,c0.connection,9);
+    BOOST_REQUIRE_EQUAL(knownBrokerPorts(c1.connection, 2).size(), 2u);
+    for (int i = 0; i < 10; ++i) {
+        c1.session.messageTransfer(arg::content=makeMessage("xxx", "q", durableFlag));
+        BOOST_REQUIRE(c1.subs.get(m, "q", TIMEOUT));
+        BOOST_REQUIRE_EQUAL(m.getData(), "xxx");
+    }
+}
+
+// Test that message data and delivery properties are updated properly.
+QPID_AUTO_TEST_CASE(testUpdateMessages) {
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c0(cluster[0], "c0");
+
+    // Create messages with different delivery properties
+    c0.session.queueDeclare("q", arg::durable=durableFlag);
+    c0.session.exchangeBind(arg::exchange="amq.fanout", arg::queue="q");
+    c0.session.messageTransfer(arg::content=makeMessage("foo","q", durableFlag));
+    c0.session.messageTransfer(arg::content=makeMessage("bar","q", durableFlag),
+                               arg::destination="amq.fanout");
+
+    while (c0.session.queueQuery("q").getMessageCount() != 2)
+        sys::usleep(1000);    // Wait for message to show up on broker 0.
+
+    // Add a new broker, it will catch up.
+    cluster.add();
+
+    // Do some work post-add
+    c0.session.queueDeclare("p", arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=makeMessage("pfoo","p", durableFlag));
+
+    // Do some work post-join
+    BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 2).size(), 2u);
+    c0.session.messageTransfer(arg::content=makeMessage("pbar","p", durableFlag));
+
+    // Verify new brokers have state.
+    Message m;
+
+    Client c1(cluster[1], "c1");
+
+    BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
+    BOOST_CHECK_EQUAL(m.getData(), "foo");
+    BOOST_CHECK(m.getDeliveryProperties().hasExchange());
+    BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "");
+    BOOST_CHECK(c1.subs.get(m, "q", TIMEOUT));
+    BOOST_CHECK_EQUAL(m.getData(), "bar");
+    BOOST_CHECK(m.getDeliveryProperties().hasExchange());
+    BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "amq.fanout");
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+
+    // Add another broker, don't wait for join - should be stalled till ready.
+    cluster.add();
+    Client c2(cluster[2], "c2");
+    BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT));
+    BOOST_CHECK_EQUAL(m.getData(), "pfoo");
+    BOOST_CHECK(c2.subs.get(m, "p", TIMEOUT));
+    BOOST_CHECK_EQUAL(m.getData(), "pbar");
+    BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 0u);
+}
+
+QPID_AUTO_TEST_CASE(testWiringReplication) {
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(3, args, -1);
+    Client c0(cluster[0]);
+    BOOST_CHECK(c0.session.queueQuery("q").getQueue().empty());
+    BOOST_CHECK(c0.session.exchangeQuery("ex").getType().empty());
+    c0.session.queueDeclare("q", arg::durable=durableFlag);
+    c0.session.exchangeDeclare("ex", arg::type="direct");
+    c0.session.close();
+    c0.connection.close();
+    // Verify all brokers get wiring update.
+    for (size_t i = 0; i < cluster.size(); ++i) {
+        BOOST_MESSAGE("i == "<< i);
+        Client c(cluster[i]);
+        BOOST_CHECK_EQUAL("q", c.session.queueQuery("q").getQueue());
+        BOOST_CHECK_EQUAL("direct", c.session.exchangeQuery("ex").getType());
+    }
+}
+
+QPID_AUTO_TEST_CASE(testMessageEnqueue) {
+    // Enqueue on one broker, dequeue on another.
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(2, args, -1);
+    Client c0(cluster[0]);
+    c0.session.queueDeclare("q", arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag));
+    c0.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag));
+    c0.session.close();
+    Client c1(cluster[1]);
+    Message msg;
+    BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT));
+    BOOST_CHECK_EQUAL(string("foo"), msg.getData());
+    BOOST_CHECK(c1.subs.get(msg, "q", TIMEOUT));
+    BOOST_CHECK_EQUAL(string("bar"), msg.getData());
+}
+
+QPID_AUTO_TEST_CASE(testMessageDequeue) {
+    // Enqueue on one broker, dequeue on two others.
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(3, args, -1);
+    Client c0(cluster[0], "c0");
+    c0.session.queueDeclare("q", arg::durable=durableFlag);
+    c0.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag));
+    c0.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag));
+
+    Message msg;
+
+    // Dequeue on 2 others, ensure correct order.
+    Client c1(cluster[1], "c1");
+    BOOST_CHECK(c1.subs.get(msg, "q"));
+    BOOST_CHECK_EQUAL("foo", msg.getData());
+
+    Client c2(cluster[2], "c2");
+    BOOST_CHECK(c1.subs.get(msg, "q"));
+    BOOST_CHECK_EQUAL("bar", msg.getData());
+
+    // Queue should be empty on all cluster members.
+    BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount());
+    BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount());
+    BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
+}
+
+QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(3, args, -1);
+    Client c0(cluster[0]);
+    BOOST_REQUIRE_EQUAL(knownBrokerPorts(c0.connection, 3).size(), 3u); // Wait for brokers.
+
+    // First start a subscription.
+    c0.session.queueDeclare("q", arg::durable=durableFlag);
+    c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2));
+
+    // Now send messages
+    Client c1(cluster[1]);
+    c1.session.messageTransfer(arg::content=makeMessage("foo", "q", durableFlag));
+    c1.session.messageTransfer(arg::content=makeMessage("bar", "q", durableFlag));
+
+    // Check they arrived
+    Message m;
+    BOOST_CHECK(c0.lq.get(m, TIMEOUT));
+    BOOST_CHECK_EQUAL("foo", m.getData());
+    BOOST_CHECK(c0.lq.get(m, TIMEOUT));
+    BOOST_CHECK_EQUAL("bar", m.getData());
+
+    // Queue should be empty on all cluster members.
+    Client c2(cluster[2]);
+    BOOST_CHECK_EQUAL(0u, c0.session.queueQuery("q").getMessageCount());
+    BOOST_CHECK_EQUAL(0u, c1.session.queueQuery("q").getMessageCount());
+    BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
+}
+
+QPID_AUTO_TEST_CASE(queueDurabilityPropagationToNewbie)
+{
+    /*
+      Start with a single broker.
+      Set up two queues: one durable, and one not.
+      Add a new broker to the cluster.
+      Make sure it has one durable and one non-durable queue.
+    */
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c0(cluster[0]);
+    c0.session.queueDeclare("durable_queue",     arg::durable=true);
+    c0.session.queueDeclare("non_durable_queue", arg::durable=false);
+    cluster.add();
+    Client c1(cluster[1]);
+    QueueQueryResult durable_query     = c1.session.queueQuery ( "durable_queue" );
+    QueueQueryResult non_durable_query = c1.session.queueQuery ( "non_durable_queue" );
+    BOOST_CHECK_EQUAL(durable_query.getQueue(), std::string("durable_queue"));
+    BOOST_CHECK_EQUAL(non_durable_query.getQueue(), std::string("non_durable_queue"));
+
+    BOOST_CHECK_EQUAL ( durable_query.getDurable(),     true  );
+    BOOST_CHECK_EQUAL ( non_durable_query.getDurable(), false );
+}
+
+
+QPID_AUTO_TEST_CASE(testHeartbeatCancelledOnFailover)
+{
+
+    struct Sender : FailoverManager::Command
+    {
+        std::string queue;
+        std::string content;
+
+        Sender(const std::string& q, const std::string& c) : queue(q), content(c) {}
+
+        void execute(AsyncSession& session, bool)
+        {
+            session.messageTransfer(arg::content=makeMessage(content, queue, durableFlag));
+        }
+    };
+
+    struct Receiver : FailoverManager::Command, MessageListener, qpid::sys::Runnable
+    {
+        FailoverManager& mgr;
+        std::string queue;
+        std::string expectedContent;
+        qpid::client::Subscription subscription;
+        qpid::sys::Monitor lock;
+        bool ready, failed;
+
+        Receiver(FailoverManager& m, const std::string& q, const std::string& c) : mgr(m), queue(q), expectedContent(c), ready(false), failed(false) {}
+
+        void received(Message& message)
+        {
+            BOOST_CHECK_EQUAL(expectedContent, message.getData());
+            subscription.cancel();
+        }
+
+        void execute(AsyncSession& session, bool)
+        {
+            session.queueDeclare(arg::queue=queue, arg::durable=durableFlag);
+            SubscriptionManager subs(session);
+            subscription = subs.subscribe(*this, queue);
+            session.sync();
+            setReady();
+            subs.run();
+            //cleanup:
+            session.queueDelete(arg::queue=queue);
+        }
+
+        void run()
+        {
+            try {
+            mgr.execute(*this);
+        }
+            catch (const std::exception& e) {
+                BOOST_MESSAGE("Exception in mgr.execute: " << e.what());
+                failed = true;
+            }
+        }
+
+        void waitForReady()
+        {
+            qpid::sys::Monitor::ScopedLock l(lock);
+            while (!ready) {
+                lock.wait();
+            }
+        }
+
+        void setReady()
+        {
+            qpid::sys::Monitor::ScopedLock l(lock);
+            ready = true;
+            lock.notify();
+        }
+    };
+
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(2, args, -1);
+    ConnectionSettings settings;
+    settings.port = cluster[1];
+    settings.heartbeat = 1;
+    FailoverManager fmgr(settings);
+    Sender sender("my-queue", "my-data");
+    Receiver receiver(fmgr, "my-queue", "my-data");
+    qpid::sys::Thread runner(receiver);
+    receiver.waitForReady();
+    {
+        ScopedSuppressLogging allQuiet; // suppress connection closed messages
+        cluster.kill(1);
+        //sleep for 2 secs to allow the heartbeat task to fire on the now dead connection:
+        ::usleep(2*1000*1000);
+    }
+    fmgr.execute(sender);
+    runner.join();
+    BOOST_CHECK(!receiver.failed);
+    fmgr.close();
+}
+
+QPID_AUTO_TEST_CASE(testPolicyUpdate) {
+    //tests that the policys internal state is accurate on newly
+    //joined nodes
+    ClusterFixture::Args args;
+    args += "--log-enable", "critical";
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c1(cluster[0], "c1");
+    {
+        ScopedSuppressLogging allQuiet;
+        QueueOptions options;
+        options.setSizePolicy(REJECT, 0, 2);
+        c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+        c1.session.messageTransfer(arg::content=makeMessage("one", "q", durableFlag));
+        cluster.add();
+        Client c2(cluster[1], "c2");
+        c2.session.messageTransfer(arg::content=makeMessage("two", "q", durableFlag));
+
+        BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=makeMessage("three", "q", durableFlag)), framing::ResourceLimitExceededException);
+
+        Message received;
+        BOOST_CHECK(c1.subs.get(received, "q"));
+        BOOST_CHECK_EQUAL(received.getData(), std::string("one"));
+        BOOST_CHECK(c1.subs.get(received, "q"));
+        BOOST_CHECK_EQUAL(received.getData(), std::string("two"));
+        BOOST_CHECK(!c1.subs.get(received, "q"));
+    }
+}
+
+QPID_AUTO_TEST_CASE(testExclusiveQueueUpdate) {
+    //tests that exclusive queues are accurately replicated on newly
+    //joined nodes
+    ClusterFixture::Args args;
+    args += "--log-enable", "critical";
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c1(cluster[0], "c1");
+    {
+        ScopedSuppressLogging allQuiet;
+        c1.session.queueDeclare("q", arg::exclusive=true, arg::autoDelete=true, arg::alternateExchange="amq.fanout");
+        cluster.add();
+        Client c2(cluster[1], "c2");
+        QueueQueryResult result = c2.session.queueQuery("q");
+        BOOST_CHECK_EQUAL(result.getQueue(), std::string("q"));
+        BOOST_CHECK(result.getExclusive());
+        BOOST_CHECK(result.getAutoDelete());
+        BOOST_CHECK(!result.getDurable());
+        BOOST_CHECK_EQUAL(result.getAlternateExchange(), std::string("amq.fanout"));
+        BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::exclusive=true, arg::passive=true), framing::ResourceLockedException);
+        c1.session.close();
+        c1.connection.close();
+        c2.session = c2.connection.newSession();
+        BOOST_CHECK_THROW(c2.session.queueDeclare(arg::queue="q", arg::passive=true), framing::NotFoundException);
+    }
+}
+
+/**
+ * Subscribes to specified queue and acquires up to the specified
+ * number of message but does not accept or release them. These
+ * message are therefore 'locked' by the clients session.
+ */
+Subscription lockMessages(Client& client, const std::string& queue, int count)
+{
+    LocalQueue q;
+    SubscriptionSettings settings(FlowControl::messageCredit(count));
+    settings.autoAck = 0;
+    Subscription sub = client.subs.subscribe(q, queue, settings);
+    client.session.messageFlush(sub.getName());
+    return sub;
+}
+
+/**
+ * check that the specified queue contains the expected set of
+ * messages (matched on content) for all nodes in the cluster
+ */
+void checkQueue(ClusterFixture& cluster, const std::string& queue, const std::vector<std::string>& messages)
+{
+    for (size_t i = 0; i < cluster.size(); i++) {
+        Client client(cluster[i], (boost::format("%1%_%2%") % "c" % (i+1)).str());
+        BOOST_CHECK_EQUAL(browse(client, queue, messages.size()), messages);
+        client.close();
+    }
+}
+
+void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m",
+          const std::string& lvqKey="")
+{
+    for (int i = 0; i < count; i++) {
+        Message message = makeMessage((boost::format("%1%_%2%") % base % (i+start)).str(), queue, durableFlag);
+        if (!lvqKey.empty()) message.getHeaders().setString(QueueOptions::strLVQMatchProperty, lvqKey);
+        client.session.messageTransfer(arg::content=message);
+    }
+}
+
+QPID_AUTO_TEST_CASE(testRingQueueUpdate) {
+    //tests that ring queues are accurately replicated on newly
+    //joined nodes
+    ClusterFixture::Args args;
+    args += "--log-enable", "critical";
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c1(cluster[0], "c1");
+    {
+        ScopedSuppressLogging allQuiet;
+        QueueOptions options;
+        options.setSizePolicy(RING, 0, 5);
+        c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+        send(c1, "q", 5);
+        lockMessages(c1, "q", 1);
+        //add new node
+        cluster.add();
+        BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
+        //send one more message
+        send(c1, "q", 1, 6);
+        //release locked message
+        c1.close();
+        //check state of queue on both nodes
+        checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
+    }
+}
+
+QPID_AUTO_TEST_CASE(testRingQueueUpdate2) {
+    //tests that ring queues are accurately replicated on newly joined
+    //nodes; just like testRingQueueUpdate, but new node joins after
+    //the sixth message has been sent.
+    ClusterFixture::Args args;
+    args += "--log-enable", "critical";
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c1(cluster[0], "c1");
+    {
+        ScopedSuppressLogging allQuiet;
+        QueueOptions options;
+        options.setSizePolicy(RING, 0, 5);
+        c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+        send(c1, "q", 5);
+        lockMessages(c1, "q", 1);
+        //send sixth message
+        send(c1, "q", 1, 6);
+        //add new node
+        cluster.add();
+        BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
+        //release locked message
+        c1.close();
+        //check state of queue on both nodes
+        checkQueue(cluster, "q", list_of<string>("m_2")("m_3")("m_4")("m_5")("m_6"));
+    }
+}
+
+QPID_AUTO_TEST_CASE(testLvqUpdate) {
+    //tests that lvqs are accurately replicated on newly joined nodes
+    ClusterFixture::Args args;
+    args += "--log-enable", "critical";
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c1(cluster[0], "c1");
+    {
+        ScopedSuppressLogging allQuiet;
+        QueueOptions options;
+        options.setOrdering(LVQ);
+        c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+
+        send(c1, "q", 5, 1, "a", "a");
+        send(c1, "q", 2, 1, "b", "b");
+        send(c1, "q", 1, 1, "c", "c");
+        send(c1, "q", 1, 3, "b", "b");
+
+        //add new node
+        cluster.add();
+        BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
+
+        //check state of queue on both nodes
+        checkQueue(cluster, "q", list_of<string>("a_5")("b_3")("c_1"));
+    }
+}
+
+
+QPID_AUTO_TEST_CASE(testBrowsedLvqUpdate) {
+    //tests that lvqs are accurately replicated on newly joined nodes
+    //if the lvq state has been affected by browsers
+    ClusterFixture::Args args;
+    args += "--log-enable", "critical";
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c1(cluster[0], "c1");
+    {
+        ScopedSuppressLogging allQuiet;
+        QueueOptions options;
+        options.setOrdering(LVQ);
+        c1.session.queueDeclare("q", arg::arguments=options, arg::durable=durableFlag);
+
+        send(c1, "q", 1, 1, "a", "a");
+        send(c1, "q", 2, 1, "b", "b");
+        send(c1, "q", 1, 1, "c", "c");
+        checkQueue(cluster, "q", list_of<string>("a_1")("b_2")("c_1"));
+        send(c1, "q", 4, 2, "a", "a");
+        send(c1, "q", 1, 3, "b", "b");
+
+        //add new node
+        cluster.add();
+        BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection, 2).size());//wait till joined
+
+        //check state of queue on both nodes
+        checkQueue(cluster, "q", list_of<string>("a_1")("b_2")("c_1")("a_5")("b_3"));
+    }
+}
+
+QPID_AUTO_TEST_CASE(testRelease) {
+    //tests that releasing a messages that was unacked when one node
+    //joined works correctly
+    ClusterFixture::Args args;
+    args += "--log-enable", "critical";
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c1(cluster[0], "c1");
+    {
+        ScopedSuppressLogging allQuiet;
+        c1.session.queueDeclare("q", arg::durable=durableFlag);
+        for (int i = 0; i < 5; i++) {
+            c1.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % "m" % (i+1)).str(), "q", durableFlag));
+        }
+        //receive but don't ack a message
+        LocalQueue lq;
+        SubscriptionSettings lqSettings(FlowControl::messageCredit(1));
+        lqSettings.autoAck = 0;
+        Subscription lqSub = c1.subs.subscribe(lq, "q", lqSettings);
+        c1.session.messageFlush("q");
+        Message received;
+        BOOST_CHECK(lq.get(received));
+        BOOST_CHECK_EQUAL(received.getData(), std::string("m_1"));
+
+        //add new node
+        cluster.add();
+
+        lqSub.release(lqSub.getUnaccepted());
+
+        //check state of queue on both nodes
+        vector<string> expected = list_of<string>("m_1")("m_2")("m_3")("m_4")("m_5");
+        Client c3(cluster[0], "c3");
+        BOOST_CHECK_EQUAL(browse(c3, "q", 5), expected);
+        Client c2(cluster[1], "c2");
+        BOOST_CHECK_EQUAL(browse(c2, "q", 5), expected);
+    }
+}
+
+
+// Browse for 1 message with byte credit, return true if a message was
+// received false if not.
+bool browseByteCredit(Client& c, const string& q, int n, Message& m) {
+    SubscriptionSettings browseSettings(
+        FlowControl(1, n, false), // 1 message, n bytes credit, no window
+        ACCEPT_MODE_NONE,
+        ACQUIRE_MODE_NOT_ACQUIRED,
+        0                       // No auto-ack.
+    );
+    LocalQueue lq;
+    Subscription s = c.subs.subscribe(lq, q, browseSettings);
+    c.session.messageFlush(arg::destination=q, arg::sync=true);
+    c.session.sync();
+    c.subs.getSubscription(q).cancel();
+    return lq.get(m, 0);        // No timeout, flush should push message thru.
+}
+
+// Ensure cluster update preserves exact message size, use byte credt as test.
+QPID_AUTO_TEST_CASE(testExactByteCredit) {
+    ClusterFixture cluster(1, prepareArgs(), -1);
+    Client c0(cluster[0], "c0");
+    c0.session.queueDeclare("q");
+    c0.session.messageTransfer(arg::content=Message("MyMessage", "q"));
+    cluster.add();
+
+    int size=36;                // Size of message on broker: headers+body
+    Client c1(cluster[1], "c1");
+    Message m;
+
+    // Ensure we get the message with exact credit.
+    BOOST_CHECK(browseByteCredit(c0, "q", size, m));
+    BOOST_CHECK(browseByteCredit(c1, "q", size, m));
+    // and not with one byte less.
+    BOOST_CHECK(!browseByteCredit(c0, "q", size-1, m));
+    BOOST_CHECK(!browseByteCredit(c1, "q", size-1, m));
+}
+
+// Test that consumer positions are updated correctly.
+// Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=541927
+//
+QPID_AUTO_TEST_CASE(testUpdateConsumerPosition) {
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c0(cluster[0], "c0");
+
+    c0.session.queueDeclare("q", arg::durable=durableFlag);
+    SubscriptionSettings settings;
+    settings.autoAck = 0;
+    // Set the acquire mode to 'not-acquired' the consumer moves along the queue
+    // but does not acquire (remove) messages.
+    settings.acquireMode = ACQUIRE_MODE_NOT_ACQUIRED;
+    Subscription s = c0.subs.subscribe(c0.lq, "q", settings);
+    c0.session.messageTransfer(arg::content=makeMessage("1", "q", durableFlag));
+    BOOST_CHECK_EQUAL("1", c0.lq.get(TIMEOUT).getData());
+
+    // Add another member, send/receive another message and acquire
+    // the messages.  With the bug, this creates an inconsistency
+    // because the browse position was not updated to the new member.
+    cluster.add();
+    c0.session.messageTransfer(arg::content=makeMessage("2", "q", durableFlag));
+    BOOST_CHECK_EQUAL("2", c0.lq.get(TIMEOUT).getData());
+    s.acquire(s.getUnacquired());
+    s.accept(s.getUnaccepted());
+
+    // In the bug we now have 0 messages on cluster[0] and 1 message on cluster[1]
+    // Subscribing on cluster[1] provokes an error that shuts down cluster[0]
+    Client c1(cluster[1], "c1");
+    Subscription s1 = c1.subs.subscribe(c1.lq, "q"); // Default auto-ack=1
+    Message m;
+    BOOST_CHECK(!c1.lq.get(m, TIMEOUT/10));
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
+}
+
+QPID_AUTO_TEST_CASE(testFairsharePriorityDelivery) {
+    ClusterFixture::Args args;
+    prepareArgs(args, durableFlag);
+    ClusterFixture cluster(1, args, -1);
+    Client c0(cluster[0], "c0");
+
+    FieldTable arguments;
+    arguments.setInt("x-qpid-priorities", 10);
+    arguments.setInt("x-qpid-fairshare", 5);
+    c0.session.queueDeclare("q", arg::durable=durableFlag, arg::arguments=arguments);
+
+    //send messages of different priorities
+    for (int i = 0; i < 20; i++) {
+        Message msg = makeMessage((boost::format("msg-%1%") % i).str(), "q", durableFlag);
+        msg.getDeliveryProperties().setPriority(i % 2 ? 9 : 5);
+        c0.session.messageTransfer(arg::content=msg);
+    }
+
+    //pull off a couple of the messages (first four should be the top priority messages
+    for (int i = 0; i < 4; i++) {
+        BOOST_CHECK_EQUAL((boost::format("msg-%1%") % ((i*2)+1)).str(), c0.subs.get("q", TIMEOUT).getData());
+    }
+
+    // Add another member
+    cluster.add();
+    Client c1(cluster[1], "c1");
+
+    //pull off some more messages
+    BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 9).str(), c0.subs.get("q", TIMEOUT).getData());
+    BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 0).str(), c1.subs.get("q", TIMEOUT).getData());
+    BOOST_CHECK_EQUAL((boost::format("msg-%1%") % 2).str(), c0.subs.get("q", TIMEOUT).getData());
+
+    //check queue has same content on both nodes
+    BOOST_CHECK_EQUAL(browse(c0, "q", 12), browse(c1, "q", 12));
+}
+
+QPID_AUTO_TEST_SUITE_END()
+}} // namespace qpid::tests

Propchange: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1509423&r1=1509422&r2=1509423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Thu Aug  1 20:27:26 2013
@@ -107,7 +107,7 @@ class HaBroker(Broker):
         ha_port = ha_port or HaPort(test)
         args = copy(args)
         args += ["--load-module", BrokerTest.ha_lib,
-                 "--log-enable=debug+:ha::",
+                 "--log-enable=trace+:ha::", # FIXME aconway 2013-07-29: debug
                  # Non-standard settings for faster tests.
                  "--link-maintenance-interval=0.1",
                  # Heartbeat and negotiate time are needed so that a broker wont
@@ -221,6 +221,12 @@ acl allow all all
 
     def wait_backup(self, address): self.wait_address(address)
 
+    def browse(self, queue, timeout=0, transform=lambda m: m.content):
+        c = self.connect_admin()
+        try:
+            return browse(c.session(), queue, timeout, transform)
+        finally: c.close()
+
     def assert_browse(self, queue, expected, **kwargs):
         """Verify queue contents by browsing."""
         bs = self.connect().session()

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1509423&r1=1509422&r2=1509423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Thu Aug  1 20:27:26 2013
@@ -1287,50 +1287,101 @@ class StoreTests(BrokerTest):
         cluster[0].assert_browse("q2", ["hello", "end"])
         cluster[1].assert_browse_backup("q2", ["hello", "end"])
 
+def open_read(name):
+    with open(name) as f: return f.read()
+
 class TransactionTests(BrokerTest):
 
+    load_store=["--load-module", BrokerTest.test_store_lib]
+
     def tx_simple_setup(self, broker):
-        """Start a transaction: receive 'foo' from 'a' and send 'bar' to 'b'"""
+        """Start a transaction, remove messages from queue a, add messages to queue b"""
         c = broker.connect()
-        c.session().sender("a;{create:always}").send("foo")
+        # Send messages to a, no transaction.
+        sa = c.session().sender("a;{create:always,node:{durable:true}}")
+        tx_msgs =  ["x","y","z"]
+        for m in tx_msgs: sa.send(Message(content=m, durable=True))
+
+        # Receive messages from a, in transaction.
         tx = c.session(transactional=True)
-        self.assertEqual("foo", tx.receiver("a").fetch(1).content)
-        tx.acknowledge();
-        tx.sender("b;{create:always}").send("bar")
+        txr = tx.receiver("a")
+        tx_msgs2 = [txr.fetch(1).content for i in xrange(3)]
+        self.assertEqual(tx_msgs, tx_msgs2)
+
+        # Send messages to b, transactional, mixed with non-transactional.
+        sb = c.session().sender("b;{create:always,node:{durable:true}}")
+        txs = tx.sender("b")
+        msgs = [str(i) for i in xrange(3)]
+        for tx_m,m in zip(tx_msgs2, msgs):
+            txs.send(tx_m);
+            sb.send(m)
         return tx
 
     def test_tx_simple_commit(self):
-        cluster = HaCluster(self, 2, args=["--log-enable=trace+:ha::"])
+        cluster = HaCluster(self, 2, test_store=True)
         tx = self.tx_simple_setup(cluster[0])
+        tx.sync()
+
+        # NOTE: backup does not process transactional dequeues until prepare
+        cluster[1].assert_browse_backup("a", ["x","y","z"])
+        cluster[1].assert_browse_backup("b", ['0', '1', '2'])
+
+        tx.acknowledge()
         tx.commit()
+        tx.sync()
+
         for b in cluster:
             b.assert_browse_backup("a", [], msg=b)
-            b.assert_browse_backup("b", ["bar"], msg=b)
+            b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b)
+
+        # Check for expected actions on the store
+        expect = """<enqueue a x>
+<enqueue a y>
+<enqueue a z>
+<begin tx 1>
+<dequeue a x tx=1>
+<dequeue a y tx=1>
+<dequeue a z tx=1>
+<commit tx=1>
+"""
+        self.assertEqual(expect, open_read(cluster[0].store_log))
+        self.assertEqual(expect, open_read(cluster[1].store_log))
 
     def test_tx_simple_rollback(self):
-        cluster = HaCluster(self, 2)
+        cluster = HaCluster(self, 2, test_store=True)
         tx = self.tx_simple_setup(cluster[0])
+        tx.acknowledge()
         tx.rollback()
         for b in cluster:
-            b.assert_browse_backup("a", ["foo"], msg=b)
-            b.assert_browse_backup("b", [], msg=b)
+            b.assert_browse_backup("a", ["x","y","z"], msg=b)
+            b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
+        # Check for expected actions on the store
+        expect = """<enqueue a x>
+<enqueue a y>
+<enqueue a z>
+"""
+        self.assertEqual(open_read(cluster[0].store_log), expect)
+        self.assertEqual(open_read(cluster[1].store_log), expect)
 
     def test_tx_simple_failover(self):
-        cluster = HaCluster(self, 2)
+        cluster = HaCluster(self, 2, test_store=True)
         tx = self.tx_simple_setup(cluster[0])
+        tx.acknowledge()
         cluster.bounce(0)       # Should cause roll-back
+        cluster[0].wait_status("ready")
         for b in cluster:
-            b.assert_browse_backup("a", ["foo"], msg=b)
-            b.assert_browse_backup("b", [], msg=b)
+            b.assert_browse_backup("a", ["x","y","z"], msg=b)
+            b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
 
-    def test_tx_simple_join(self):
-        cluster = HaCluster(self, 2)
-        tx = self.tx_simple_setup(cluster[0])
-        cluster.bounce(1)       # Should catch up with tx
-        tx.commit()
-        for b in cluster:
-            b.assert_browse_backup("a", [], msg=b)
-            b.assert_browse_backup("b", ["bar"], msg=b)
+        # Check for expected actions on the store
+        expect = """<enqueue a x>
+<enqueue a y>
+<enqueue a z>
+"""
+        self.assertEqual(open_read(cluster[0].store_log), expect)
+        self.assertEqual(open_read(cluster[1].store_log), expect)
+
+# FIXME aconway 2013-07-23: test with partial acknowledgement.
 
 if __name__ == "__main__":
     outdir = "ha_tests.tmp"

Modified: qpid/trunk/qpid/cpp/src/tests/test_env.sh.in
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/test_env.sh.in?rev=1509423&r1=1509422&r2=1509423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/test_env.sh.in (original)
+++ qpid/trunk/qpid/cpp/src/tests/test_env.sh.in Thu Aug  1 20:27:26 2013
@@ -81,5 +81,5 @@ if [ ! -e "$HOME" ]; then
 fi
 
 # Options for boost test framework
-export BOOST_TEST_SHOW_PROGRESS=yes
-export BOOST_TEST_CATCH_SYSTEM_ERRORS=no
+test -z "$BOOST_TEST_SHOW_PROGRESS" && export BOOST_TEST_SHOW_PROGRESS=yes
+test -z "$BOOST_TEST_CATCH_SYSTEM_ERRORS" && export BOOST_TEST_CATCH_SYSTEM_ERRORS=no

Modified: qpid/trunk/qpid/cpp/src/tests/test_store.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/test_store.cpp?rev=1509423&r1=1509422&r2=1509423&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/test_store.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/test_store.cpp Thu Aug  1 20:27:26 2013
@@ -40,14 +40,19 @@
 #include "qpid/sys/Thread.h"
 #include "qpid/Plugin.h"
 #include "qpid/Options.h"
+#include "qpid/RefCounted.h"
+#include "qpid/Msg.h"
 #include <boost/cast.hpp>
 #include <boost/lexical_cast.hpp>
 #include <memory>
+#include <ostream>
 #include <fstream>
+#include <sstream>
 
-using namespace qpid;
-using namespace broker;
 using namespace std;
+using namespace boost;
+using namespace qpid;
+using namespace qpid::broker;
 using namespace qpid::sys;
 
 namespace qpid {
@@ -57,11 +62,13 @@ struct TestStoreOptions : public Options
 
     string name;
     string dump;
+    string events;
 
     TestStoreOptions() : Options("Test Store Options") {
         addOptions()
             ("test-store-name", optValue(name, "NAME"), "Name of test store instance.")
             ("test-store-dump", optValue(dump, "FILE"), "File to dump enqueued messages.")
+            ("test-store-events", optValue(events, "FILE"), "File to log events, 1 line per event.")
             ;
     }
 };
@@ -82,24 +89,74 @@ class TestStore : public NullMessageStor
     TestStore(const TestStoreOptions& opts, Broker& broker_)
         : options(opts), name(opts.name), broker(broker_)
     {
-        QPID_LOG(info, "TestStore name=" << name << " dump=" << options.dump);
-        if (!options.dump.empty()) 
+        QPID_LOG(info, "TestStore name=" << name
+                 << " dump=" << options.dump
+                 << " events=" << options.events);
+
+        if (!options.dump.empty())
             dump.reset(new ofstream(options.dump.c_str()));
+        if (!options.events.empty())
+            events.reset(new ofstream(options.events.c_str()));
     }
 
     ~TestStore() {
         for_each(threads.begin(), threads.end(), boost::bind(&Thread::join, _1));
     }
 
-    virtual bool isNull() const { return false; }
-    
-    void enqueue(TransactionContext* ,
+    // Dummy transaction context.
+    struct TxContext : public TPCTransactionContext {
+        static int nextId;
+        string id;
+        TxContext() : id(lexical_cast<string>(nextId++)) {}
+        TxContext(string xid) : id(xid) {}
+    };
+
+    static string getId(const TransactionContext& tx) {
+        const TxContext* tc = dynamic_cast<const TxContext*>(&tx);
+        assert(tc);
+        return tc->id;
+    }
+
+
+    bool isNull() const { return false; }
+
+    void log(const string& msg) {
+        QPID_LOG(info, "test_store: " << msg);
+        if (events.get()) *events << msg << endl << std::flush;
+    }
+
+    auto_ptr<TransactionContext> begin() {
+        auto_ptr<TxContext> tx(new TxContext());
+        log(Msg() << "<begin tx " << tx->id << ">");
+        return auto_ptr<TransactionContext>(tx);
+    }
+
+    auto_ptr<TPCTransactionContext> begin(const std::string& xid)  {
+        auto_ptr<TxContext> tx(new TxContext(xid));
+        log(Msg() << "<begin tx " << tx->id << ">");
+        return auto_ptr<TPCTransactionContext>(tx);
+    }
+
+    string getContent(const intrusive_ptr<PersistableMessage>& msg) {
+        intrusive_ptr<broker::Message::Encoding> enc(
+            dynamic_pointer_cast<broker::Message::Encoding>(msg));
+        return enc->getContent();
+    }
+
+    void enqueue(TransactionContext* tx,
                  const boost::intrusive_ptr<PersistableMessage>& pmsg,
-                 const PersistableQueue& )
+                 const PersistableQueue& queue)
     {
+        QPID_LOG(debug, "TestStore enqueue " << queue.getName());
         qpid::broker::amqp_0_10::MessageTransfer* msg = dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get());
         assert(msg);
 
+        ostringstream o;
+        o << "<enqueue " << queue.getName() << " " << getContent(msg);
+        if (tx) o << " tx=" << getId(*tx);
+        o << ">";
+        log(o.str());
+
         // Dump the message if there is a dump file.
         if (dump.get()) {
             msg->getFrames().getMethod()->print(*dump);
@@ -144,6 +201,31 @@ class TestStore : public NullMessageStor
             msg->enqueueComplete();
     }
 
+    void dequeue(TransactionContext* tx,
+                 const boost::intrusive_ptr<PersistableMessage>& msg,
+                 const PersistableQueue& queue)
+    {
+        QPID_LOG(debug, "TestStore dequeue " << queue.getName());
+        ostringstream o;
+        o<< "<dequeue " << queue.getName() << " " << getContent(msg);
+        if (tx) o << " tx=" << getId(*tx);
+        o << ">";
+        log(o.str());
+    }
+
+    void prepare(TPCTransactionContext& txn) {
+        log(Msg() << "<prepare tx=" << getId(txn) << ">");
+    }
+
+    void commit(TransactionContext& txn) {
+        log(Msg() << "<commit tx=" << getId(txn) << ">");
+    }
+
+    void abort(TransactionContext& txn) {
+        log(Msg() << "<abort tx=" << getId(txn) << ">");
+    }
+
+
   private:
     static const string TEST_STORE_DO, EXCEPTION, EXIT_PROCESS, ASYNC;
     TestStoreOptions options;
@@ -151,8 +233,11 @@ class TestStore : public NullMessageStor
     Broker& broker;
     vector<Thread> threads;
     std::auto_ptr<ofstream> dump;
+    std::auto_ptr<ofstream> events;
 };
 
+int TestStore::TxContext::nextId(1);
+
 const string TestStore::TEST_STORE_DO = "TEST_STORE_DO: ";
 const string TestStore::EXCEPTION = "exception";
 const string TestStore::EXIT_PROCESS = "exit_process";



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message