qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From c...@apache.org
Subject svn commit: r1430018 [1/2] - in /qpid/trunk/qpid/cpp/src/tests: ./ legacystore/
Date Mon, 07 Jan 2013 21:24:49 GMT
Author: chug
Date: Mon Jan  7 21:24:48 2013
New Revision: 1430018

URL: http://svn.apache.org/viewvc?rev=1430018&view=rev
Log:
QPID-1726 ASF licensed Qpid Store
Add unit tests.

There are several issues with these tests:

1. Originally the four .cpp unit test sources were compiled into a single
unit_test executable. In the current framework those four sources create
conflicting brokers that overwrite each other's store and fail to open port
5672. In this checkin there are four unit test executables. Running each
serially gets them all to pass. A new strategy is needed to start brokers
that don't conflict.

2. The legacystore.so is not integrated with the rest of the tests. Some
tests may run with the externally compiled msgstore.so and some use the
built in test_store. Plugging legacystore.so into these other tests is TBD.

3. cpp/src/tests/legacystore defines more tests beyond simple unit tests.
None of the issues related to wider system tests are addressed yet.


Added:
    qpid/trunk/qpid/cpp/src/tests/legacystore/
    qpid/trunk/qpid/cpp/src/tests/legacystore/.valgrind.supp
    qpid/trunk/qpid/cpp/src/tests/legacystore/.valgrindrc
    qpid/trunk/qpid/cpp/src/tests/legacystore/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/tests/legacystore/MessageUtils.h
    qpid/trunk/qpid/cpp/src/tests/legacystore/OrderingTest.cpp
    qpid/trunk/qpid/cpp/src/tests/legacystore/SimpleTest.cpp
    qpid/trunk/qpid/cpp/src/tests/legacystore/TestFramework.cpp
    qpid/trunk/qpid/cpp/src/tests/legacystore/TestFramework.h
    qpid/trunk/qpid/cpp/src/tests/legacystore/TransactionalTest.cpp
    qpid/trunk/qpid/cpp/src/tests/legacystore/TwoPhaseCommitTest.cpp
    qpid/trunk/qpid/cpp/src/tests/legacystore/clean.sh
    qpid/trunk/qpid/cpp/src/tests/legacystore/persistence.py
    qpid/trunk/qpid/cpp/src/tests/legacystore/run_long_python_tests
    qpid/trunk/qpid/cpp/src/tests/legacystore/run_python_tests
    qpid/trunk/qpid/cpp/src/tests/legacystore/run_short_python_tests
    qpid/trunk/qpid/cpp/src/tests/legacystore/run_test
    qpid/trunk/qpid/cpp/src/tests/legacystore/start_broker
    qpid/trunk/qpid/cpp/src/tests/legacystore/stop_broker
    qpid/trunk/qpid/cpp/src/tests/legacystore/system_test.sh
    qpid/trunk/qpid/cpp/src/tests/legacystore/tests_env.sh
    qpid/trunk/qpid/cpp/src/tests/legacystore/unit_test.cpp
    qpid/trunk/qpid/cpp/src/tests/legacystore/unit_test.h
Modified:
    qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt

Modified: qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt?rev=1430018&r1=1430017&r2=1430018&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/tests/CMakeLists.txt Mon Jan  7 21:24:48 2013
@@ -344,3 +344,8 @@ add_library (dlclose_noop MODULE dlclose
 #EXTRA_DIST+=$(LONG_TESTS) run_perftest
 #check-long:
 #	$(MAKE) check TESTS="start_broker $(LONG_TESTS) stop_broker" VALGRIND=
+
+#
+# legacystore
+#
+add_subdirectory(legacystore)

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/.valgrind.supp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/.valgrind.supp?rev=1430018&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/.valgrind.supp (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/.valgrind.supp Mon Jan  7 21:24:48 2013
@@ -0,0 +1,35 @@
+{
+   <insert_a_suppression_name_here>
+   Memcheck:Leak
+   fun:_Znwm
+   fun:_ZNSs4_Rep9_S_createEmmRKSaIcE
+   fun:_ZNSs12_S_constructIPKcEEPcT_S3_RKSaIcESt20forward_iterator_tag
+   fun:_ZNSsC1EPKcRKSaIcE
+}
+
+{
+   <insert_a_suppression_name_here>
+   Memcheck:Leak
+   fun:_Znwm
+   fun:_ZNSs4_Rep9_S_createEmmRKSaIcE
+   fun:_ZNSs4_Rep8_M_cloneERKSaIcEm
+   fun:_ZNSs7reserveEm
+}
+
+{
+   <insert_a_suppression_name_here>
+   Memcheck:Leak
+   fun:_Znwm
+   fun:_ZNSs4_Rep9_S_createEmmRKSaIcE
+   fun:_ZNSs9_M_mutateEmmm
+   fun:_ZNSs15_M_replace_safeEmmPKcm
+}
+
+{
+   <insert_a_suppression_name_here>
+   Memcheck:Leak
+   fun:_Znwm
+   fun:_ZNSs4_Rep9_S_createEmmRKSaIcE
+   fun:_ZNSsC1IPcEET_S1_RKSaIcE
+}
+

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/.valgrindrc
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/.valgrindrc?rev=1430018&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/.valgrindrc (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/.valgrindrc Mon Jan  7 21:24:48 2013
@@ -0,0 +1,7 @@
+--gen-suppressions=all
+--leak-check=full
+--demangle=yes
+--suppressions=.valgrind.supp
+--num-callers=25
+--trace-children=yes
+

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/CMakeLists.txt?rev=1430018&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/CMakeLists.txt (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/CMakeLists.txt Mon Jan  7 21:24:48 2013
@@ -0,0 +1,111 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Enable dashboard reporting.
+include (CTest)
+
+# Make sure that everything get built before the tests
+# Need to create a var with all the necessary top level targets
+
+# If we're linking Boost for DLLs, turn that on for the unit test too.
+if (QPID_LINK_BOOST_DYNAMIC)
+    add_definitions(-DBOOST_TEST_DYN_LINK)
+endif (QPID_LINK_BOOST_DYNAMIC)
+
+include_directories( ${CMAKE_CURRENT_SOURCE_DIR} )
+
+include (FindPythonInterp)
+
+# # Inherit environment from parent script
+# set (abs_srcdir ${CMAKE_CURRENT_SOURCE_DIR})
+# set (abs_builddir ${CMAKE_CURRENT_BINARY_DIR})
+# set (abs_top_srcdir ${CMAKE_SOURCE_DIR})
+# set (abs_top_builddir ${CMAKE_BINARY_DIR})
+# set (builddir_lib_suffix "")
+
+# If valgrind is selected in the configuration step, set up the path to it
+# for CTest.
+if (ENABLE_VALGRIND)
+  set (MEMORYCHECK_COMMAND ${VALGRIND})
+  set (MEMORYCHECK_COMMAND_OPTIONS "--gen-suppressions=all
+--leak-check=full
+--demangle=yes
+--suppressions=${CMAKE_CURRENT_SOURCE_DIR}/.valgrind.supp
+--num-callers=25
+--log-file=ctest_valgrind.vglog")
+endif (ENABLE_VALGRIND)
+
+# Like this to work with cmake 2.4 on Unix
+set (qpid_test_boost_libs
+     ${Boost_UNIT_TEST_FRAMEWORK_LIBRARY} ${Boost_SYSTEM_LIBRARY})
+
+#
+# Unit test program
+#
+# Unit tests are built as a single program to reduce valgrind overhead
+# when running the tests. If you want to build a subset of the tests run
+# ccmake and set unit_tests_to_build to the set you want to build.
+# HACK ALERT - Unit tests are built individually to resolve a conflict
+# with running multiple brokers that connect to 0.0.0.0:5672 and that
+# womp on each other's store directory.
+
+#
+# define_selftest
+# macro to accept the name of a single source file and to create a
+#  unit test executable that runs the source.
+#
+MACRO (define_selftest theSourceFile)
+add_executable (legacystore_${theSourceFile}
+            unit_test
+            ${theSourceFile}
+            ${platform_test_additions})
+target_link_libraries (legacystore_${theSourceFile}
+                       ${qpid_test_boost_libs}
+                       qpidmessaging qpidbroker qmfconsole legacystore)
+get_property(ls_include TARGET legacystore_${theSourceFile} PROPERTY INCLUDE_DIRECTORIES)
+list(APPEND  ls_include ${abs_top_srcdir}/src/qpid/legacystore)
+list(APPEND  ls_include ${abs_top_srcdir}/src/tests)
+set_target_properties (legacystore_${theSourceFile} PROPERTIES
+            INCLUDE_DIRECTORIES "${ls_include}"
+            COMPILE_DEFINITIONS _IN_QPID_BROKER)
+remember_location(legacystore_${theSourceFile})
+set(test_wrap ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_test${test_script_suffix})
+
+add_test (legacystore_${theSourceFile} ${test_wrap} ${legacystore_${theSourceFile}_LOCATION})
+ENDMACRO (define_selftest)
+
+# add_definitions(-H)
+
+define_selftest (SimpleTest)
+define_selftest (OrderingTest)
+define_selftest (TransactionalTest)
+define_selftest (TwoPhaseCommitTest)
+
+#
+# Other test programs
+#
+
+# This should ideally be done as part of the test run, but I don't know a way
+# to get these arguments and the working directory set like Makefile.am does,
+# and have that run during the test pass.
+if (PYTHON_EXECUTABLE)
+  set (python_bld ${CMAKE_CURRENT_BINARY_DIR}/python)
+  execute_process(COMMAND ${PYTHON_EXECUTABLE} setup.py install --prefix=${pythoon_bld} --install-lib=${python_bld} --install-scripts=${python_bld}/commands
+                  WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/../python)
+endif (PYTHON_EXECUTABLE)

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/MessageUtils.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/MessageUtils.h?rev=1430018&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/MessageUtils.h (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/MessageUtils.h Mon Jan  7 21:24:48 2013
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/broker/Message.h>
+#include <qpid/broker/Queue.h>
+#include <qpid/broker/amqp_0_10/MessageTransfer.h>
+#include <qpid/framing/AMQFrame.h>
+#include <qpid/framing/all_method_bodies.h>
+#include <qpid/framing/Uuid.h>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+struct MessageUtils
+{
+    static Message createMessage(const std::string& exchange, const std::string& routingKey,
+                                 const Uuid& messageId=Uuid(), const bool durable = false,
+                                 const uint64_t contentSize = 0, const std::string& correlationId = std::string())
+    {
+        boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> msg(new qpid::broker::amqp_0_10::MessageTransfer());
+
+        AMQFrame method(( MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
+        AMQFrame header((AMQHeaderBody()));
+
+        msg->getFrames().append(method);
+        msg->getFrames().append(header);
+        MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
+        props->setContentLength(contentSize);
+        props->setMessageId(messageId);
+        props->setCorrelationId(correlationId);
+        msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+        if (durable)
+            msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(PERSISTENT);
+        return Message(msg, msg);
+    }
+
+    static void addContent(Message msg, const std::string& data)
+    {
+        AMQFrame content((AMQContentBody(data)));
+        qpid::broker::amqp_0_10::MessageTransfer::get(msg).getFrames().append(content);
+    }
+
+    struct MessageRetriever :  public Consumer
+    {
+        MessageRetriever(Queue& q) : Consumer("test", CONSUMER), queue(q) {};
+
+        bool deliver(const QueueCursor& c, const Message& m)
+        {
+            message = m;
+            cursor = c;
+            return true;
+        };
+        void notify() {}
+        void cancel() {}
+        void acknowledged(const DeliveryRecord&) {}
+        OwnershipToken* getSession() { return 0; }
+
+        const Queue& queue;
+        Message message;
+        QueueCursor cursor;
+    };
+
+    static Message get(Queue& queue, QueueCursor* cursor = 0)
+    {
+        boost::shared_ptr<MessageRetriever> consumer(new MessageRetriever(queue));
+        if (!queue.dispatch(consumer))throw qpid::Exception("No message found!");
+        if (cursor) *cursor = consumer->cursor;
+        return consumer->message;
+    }
+
+    static Uuid getMessageId(const Message& message)
+    {
+        return qpid::broker::amqp_0_10::MessageTransfer::get(message).getProperties<MessageProperties>()->getMessageId();
+    }
+
+    static std::string getCorrelationId(const Message& message)
+    {
+        return qpid::broker::amqp_0_10::MessageTransfer::get(message).getProperties<MessageProperties>()->getCorrelationId();
+    }
+
+    static void deliver(Message& msg, FrameHandler& h, uint16_t framesize)
+    {
+        qpid::broker::amqp_0_10::MessageTransfer::get(msg).sendHeader(h, framesize, false, 0, 0, qpid::types::Variant::Map());
+        qpid::broker::amqp_0_10::MessageTransfer::get(msg).sendContent(h, framesize);
+    }
+
+};

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/OrderingTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/OrderingTest.cpp?rev=1430018&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/OrderingTest.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/OrderingTest.cpp Mon Jan  7 21:24:48 2013
@@ -0,0 +1,168 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "unit_test.h"
+
+#include "qpid/legacystore/MessageStoreImpl.h"
+#include <iostream>
+#include "MessageUtils.h"
+#include <qpid/broker/Queue.h>
+#include <qpid/broker/RecoveryManagerImpl.h>
+#include <qpid/framing/AMQHeaderBody.h>
+#include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
+
+using namespace qpid;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace mrg::msgstore;
+
+qpid::broker::Broker::Options opts;
+qpid::broker::Broker br(opts);
+
+QPID_AUTO_TEST_SUITE(OrderingTest)
+
+#define SET_LOG_LEVEL(level) \
+    qpid::log::Options opts(""); \
+    opts.selectors.clear(); \
+    opts.selectors.push_back(level); \
+    qpid::log::Logger::instance().configure(opts);
+
+const std::string test_filename("OrderingTest");
+const char* tdp = getenv("TMP_DATA_DIR");
+const std::string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/OrderingTest");
+
+// === Helper fns ===
+
+const std::string name("OrderingQueue");
+std::auto_ptr<MessageStoreImpl> store;
+QueueRegistry queues;
+Queue::shared_ptr queue;
+std::queue<Uuid> ids;
+
+class TestConsumer :  public Consumer
+{
+    public:
+
+    TestConsumer(Queue::shared_ptr q, std::queue<Uuid>& i) : Consumer("test", CONSUMER), queue(q), ids(i) {};
+
+    bool deliver(const QueueCursor& cursor, const Message& message)
+    {
+        queue->dequeue(0, cursor);
+        BOOST_CHECK_EQUAL(ids.front(), MessageUtils::getMessageId(message));
+        ids.pop();
+        return true;
+    };
+    void notify() {}
+    void cancel() {}
+    void acknowledged(const DeliveryRecord&) {}
+    OwnershipToken* getSession() { return 0; }
+  private:
+    Queue::shared_ptr queue;
+    std::queue<Uuid>& ids;
+};
+boost::shared_ptr<TestConsumer> consumer;
+
+void setup()
+{
+    store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl(&br));
+    store->init(test_dir, 4, 1, true); // truncate store
+
+    queue = Queue::shared_ptr(new Queue(name, 0, store.get(), 0));
+    queue->create();
+    consumer = boost::shared_ptr<TestConsumer>(new TestConsumer(queue, ids));
+}
+
+void push()
+{
+    Uuid messageId(true);
+    ids.push(messageId);
+
+    Message msg = MessageUtils::createMessage("exchange", "routing_key", messageId, true, 0);
+
+    queue->deliver(msg);
+}
+
+bool pop()
+{
+    return queue->dispatch(consumer);
+}
+
+void restart()
+{
+    queue.reset();
+    store.reset();
+
+    store = std::auto_ptr<MessageStoreImpl>(new MessageStoreImpl(&br));
+    store->init(test_dir, 4, 1);
+    ExchangeRegistry exchanges;
+    LinkRegistry links;
+    sys::Timer t;
+    DtxManager mgr(t);
+    mgr.setStore (store.get());
+    RecoveryManagerImpl recoveryMgr(queues, exchanges, links, mgr, br.getProtocolRegistry());
+    store->recover(recoveryMgr);
+
+    queue = queues.find(name);
+    consumer = boost::shared_ptr<TestConsumer>(new TestConsumer(queue, ids));
+}
+
+void check()
+{
+    BOOST_REQUIRE(queue);
+    BOOST_CHECK_EQUAL((u_int32_t) ids.size(), queue->getMessageCount());
+    while (pop()) ;//keeping popping 'till all messages are dequeued
+    BOOST_CHECK_EQUAL((u_int32_t) 0, queue->getMessageCount());
+    BOOST_CHECK_EQUAL((size_t) 0, ids.size());
+}
+
+
+// === Test suite ===
+
+QPID_AUTO_TEST_CASE(Basic)
+{
+    SET_LOG_LEVEL("error+"); // This only needs to be set once.
+
+    std::cout << test_filename << ".Basic: " << std::flush;
+    setup();
+    //push on 10 messages
+    for (int i = 0; i < 10; i++) push();
+    restart();
+    check();
+    std::cout << "ok" << std::endl;
+}
+
+QPID_AUTO_TEST_CASE(Cycle)
+{
+    std::cout << test_filename << ".Cycle: " << std::flush;
+    setup();
+    //push on 10 messages:
+    for (int i = 0; i < 10; i++) push();
+    //pop 5:
+    for (int i = 0; i < 5; i++) pop();
+    //push on another 5:
+    for (int i = 0; i < 5; i++) push();
+    restart();
+    check();
+    std::cout << "ok" << std::endl;
+}
+
+QPID_AUTO_TEST_SUITE_END()

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/SimpleTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/SimpleTest.cpp?rev=1430018&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/SimpleTest.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/SimpleTest.cpp Mon Jan  7 21:24:48 2013
@@ -0,0 +1,497 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "unit_test.h"
+
+#include "qpid/legacystore/MessageStoreImpl.h"
+#include <iostream>
+#include "tests/legacystore/MessageUtils.h"
+#include "qpid/legacystore/StoreException.h"
+#include "qpid/broker/DirectExchange.h"
+#include <qpid/broker/Queue.h>
+#include <qpid/broker/QueueSettings.h>
+#include <qpid/broker/RecoveryManagerImpl.h>
+#include <qpid/framing/AMQHeaderBody.h>
+#include <qpid/framing/FieldTable.h>
+#include <qpid/framing/FieldValue.h>
+#include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
+
+qpid::broker::Broker::Options opts;
+qpid::broker::Broker br(opts);
+
+#define SET_LOG_LEVEL(level) \
+    qpid::log::Options opts(""); \
+    opts.selectors.clear(); \
+    opts.selectors.push_back(level); \
+    qpid::log::Logger::instance().configure(opts);
+
+
+using boost::intrusive_ptr;
+using boost::static_pointer_cast;
+using namespace qpid;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace mrg::msgstore;
+using namespace std;
+
+QPID_AUTO_TEST_SUITE(SimpleTest)
+
+const string test_filename("SimpleTest");
+const char* tdp = getenv("TMP_DATA_DIR");
+const string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/SimpleTest");
+
+// === Helper fns ===
+
+struct DummyHandler : OutputHandler
+{
+    std::vector<AMQFrame> frames;
+
+    virtual void send(AMQFrame& frame){
+        frames.push_back(frame);
+    }
+};
+
+void recover(MessageStoreImpl& store, QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links)
+{
+    sys::Timer t;
+    DtxManager mgr(t);
+    mgr.setStore (&store);
+    RecoveryManagerImpl recovery(queues, exchanges, links, mgr, br.getProtocolRegistry());
+    store.recover(recovery);
+}
+
+void recover(MessageStoreImpl& store, ExchangeRegistry& exchanges)
+{
+    QueueRegistry queues;
+    LinkRegistry links;
+    recover(store, queues, exchanges, links);
+}
+
+void recover(MessageStoreImpl& store, QueueRegistry& queues)
+{
+    ExchangeRegistry exchanges;
+    LinkRegistry links;
+    recover(store, queues, exchanges, links);
+}
+
+void bindAndUnbind(const string& exchangeName, const string& queueName,
+                   const string& key, const FieldTable& args)
+{
+    {
+        MessageStoreImpl store(&br);
+        store.init(test_dir, 4, 1, true); // truncate store
+        Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
+        Queue::shared_ptr queue(new Queue(queueName, 0, &store, 0));
+        store.create(*exchange, qpid::framing::FieldTable());
+        store.create(*queue, qpid::framing::FieldTable());
+        BOOST_REQUIRE(exchange->bind(queue, key, &args));
+        store.bind(*exchange, *queue, key, args);
+    }//db will be closed
+    {
+        MessageStoreImpl store(&br);
+        store.init(test_dir, 4, 1);
+        ExchangeRegistry exchanges;
+        QueueRegistry queues;
+        LinkRegistry links;
+
+        recover(store, queues, exchanges, links);
+
+        Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+        Queue::shared_ptr queue = queues.find(queueName);
+        // check exchange args are still set
+        for (FieldTable::ValueMap::const_iterator i = args.begin(); i!=args.end(); i++) {
+            BOOST_CHECK(exchange->getArgs().get((*i).first)->getData() == (*i).second->getData());
+        }
+        //check it is bound by unbinding
+        BOOST_REQUIRE(exchange->unbind(queue, key, &args));
+        store.unbind(*exchange, *queue, key, args);
+    }
+    {
+        MessageStoreImpl store(&br);
+        store.init(test_dir, 4, 1);
+        ExchangeRegistry exchanges;
+        QueueRegistry queues;
+        LinkRegistry links;
+
+        recover(store, queues, exchanges, links);
+
+        Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+        Queue::shared_ptr queue = queues.find(queueName);
+         // check exchange args are still set
+        for (FieldTable::ValueMap::const_iterator i = args.begin(); i!=args.end(); i++) {
+            BOOST_CHECK(exchange->getArgs().get((*i).first)->getData() == (*i).second->getData());
+        }
+        //make sure it is no longer bound
+        BOOST_REQUIRE(!exchange->unbind(queue, key, &args));
+    }
+}
+
+
+// === Test suite ===
+
+QPID_AUTO_TEST_CASE(CreateDelete)
+{
+    SET_LOG_LEVEL("error+"); // This only needs to be set once.
+
+    cout << test_filename << ".CreateDelete: " << flush;
+    MessageStoreImpl store(&br);
+    store.init(test_dir, 4, 1, true); // truncate store
+    string name("CreateDeleteQueue");
+    Queue queue(name, 0, &store, 0);
+    store.create(queue, qpid::framing::FieldTable());
+// TODO - check dir exists
+    BOOST_REQUIRE(queue.getPersistenceId());
+    store.destroy(queue);
+// TODO - check dir is deleted
+
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(EmptyRecover)
+{
+    cout << test_filename << ".EmptyRecover: " << flush;
+    MessageStoreImpl store(&br);
+    store.init(test_dir, 4, 1, true); // truncate store
+    QueueRegistry registry;
+    registry.setStore (&store);
+    recover(store, registry);
+    //nothing to assert, just testing it doesn't blow up
+
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(QueueCreate)
+{
+    cout << test_filename << ".QueueCreate: " << flush;
+
+    uint64_t id(0);
+    string name("MyDurableQueue");
+    {
+        MessageStoreImpl store(&br);
+        store.init(test_dir, 4, 1, true); // truncate store
+        Queue queue(name, 0, &store, 0);
+        store.create(queue, qpid::framing::FieldTable());
+        BOOST_REQUIRE(queue.getPersistenceId());
+        id = queue.getPersistenceId();
+    }//db will be closed
+    {
+        MessageStoreImpl store(&br);
+        store.init(test_dir, 4, 1);
+        QueueRegistry registry;
+        registry.setStore (&store);
+        recover(store, registry);
+        Queue::shared_ptr queue = registry.find(name);
+        BOOST_REQUIRE(queue.get());
+        BOOST_CHECK_EQUAL(id, queue->getPersistenceId());
+    }
+
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(QueueCreateWithSettings)
+{
+    cout << test_filename << ".QueueCreateWithSettings: " << flush;
+
+    FieldTable arguments;
+    arguments.setInt("qpid.max_count", 202);
+    arguments.setInt("qpid.max_size", 1003);
+    QueueSettings settings;
+    settings.populate(arguments, settings.storeSettings);
+    string name("MyDurableQueue");
+    {
+        MessageStoreImpl store(&br);
+        store.init(test_dir, 4, 1, true); // truncate store
+        Queue queue(name, settings, &store, 0);
+        queue.create();
+        BOOST_REQUIRE(queue.getPersistenceId());
+    }//db will be closed
+    {
+        MessageStoreImpl store(&br);
+        store.init(test_dir, 4, 1);
+        QueueRegistry registry;
+        registry.setStore (&store);
+        recover(store, registry);
+        Queue::shared_ptr queue = registry.find(name);
+        BOOST_REQUIRE(queue);
+        BOOST_CHECK_EQUAL(settings.maxDepth.getCount(), 202);
+        BOOST_CHECK_EQUAL(settings.maxDepth.getSize(), 1003);
+        BOOST_CHECK_EQUAL(settings.maxDepth.getCount(), queue->getSettings().maxDepth.getCount());
+        BOOST_CHECK_EQUAL(settings.maxDepth.getSize(), queue->getSettings().maxDepth.getSize());
+    }
+
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(QueueDestroy)
+{
+    cout << test_filename << ".QueueDestroy: " << flush;
+
+    string name("MyDurableQueue");
+    {
+        MessageStoreImpl store(&br);
+        store.init(test_dir, 4, 1, true); // truncate store
+        Queue queue(name, 0, &store, 0);
+        store.create(queue, qpid::framing::FieldTable());
+        store.destroy(queue);
+    }//db will be closed
+    {
+        MessageStoreImpl store(&br);
+        store.init(test_dir, 4, 1);
+        QueueRegistry registry;
+        registry.setStore (&store);
+        recover(store, registry);
+        BOOST_REQUIRE(!registry.find(name));
+    }
+
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(Enqueue)
+{
+    cout << test_filename << ".Enqueue: " << flush;
+
+    //TODO: this is largely copy & paste'd from MessageTest in
+    //qpid tree. ideally need some helper routines for reducing
+    //this to a simpler less duplicated form
+
+    string name("MyDurableQueue");
+    string exchange("MyExchange");
+    string routingKey("MyRoutingKey");
+    Uuid messageId(true);
+    string data1("abcdefg");
+    string data2("hijklmn");
+    {
+        MessageStoreImpl store(&br);
+        store.init(test_dir, 4, 1, true); // truncate store
+        Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
+        queue->create();
+
+        Message msg = MessageUtils::createMessage(exchange, routingKey, messageId, true, 14);
+        MessageUtils::addContent(msg, data1);
+        MessageUtils::addContent(msg, data2);
+
+        msg.addAnnotation("abc", "xyz");
+
+        queue->deliver(msg);
+    }//db will be closed
+    {
+        MessageStoreImpl store(&br);
+        store.init(test_dir, 4, 1);
+        QueueRegistry registry;
+        registry.setStore (&store);
+        recover(store, registry);
+        Queue::shared_ptr queue = registry.find(name);
+        BOOST_REQUIRE(queue);
+        BOOST_CHECK_EQUAL((u_int32_t) 1, queue->getMessageCount());
+        Message msg = MessageUtils::get(*queue);
+
+        BOOST_CHECK_EQUAL(routingKey, msg.getRoutingKey());
+        BOOST_CHECK_EQUAL(messageId, MessageUtils::getMessageId(msg));
+        BOOST_CHECK_EQUAL(std::string("xyz"), msg.getAnnotation("abc"));
+        BOOST_CHECK_EQUAL((u_int64_t) 14, msg.getContentSize());
+
+        DummyHandler handler;
+        MessageUtils::deliver(msg, handler, 100);
+        BOOST_CHECK_EQUAL((size_t) 2, handler.frames.size());
+        AMQContentBody* contentBody(dynamic_cast<AMQContentBody*>(handler.frames[1].getBody()));
+        BOOST_REQUIRE(contentBody);
+        BOOST_CHECK_EQUAL(data1.size() + data2.size(), contentBody->getData().size());
+        BOOST_CHECK_EQUAL(data1 + data2, contentBody->getData());
+    }
+
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(Dequeue)
+{
+    cout << test_filename << ".Dequeue: " << flush;
+
+    //TODO: reduce the duplication in these tests
+    string name("MyDurableQueue");
+    {
+        string exchange("MyExchange");
+        string routingKey("MyRoutingKey");
+        Uuid messageId(true);
+        string data("abcdefg");
+        MessageStoreImpl store(&br);
+        store.init(test_dir, 4, 1, true); // truncate store
+        Queue::shared_ptr queue(new Queue(name, 0, &store, 0));
+        queue->create();
+
+        Message msg = MessageUtils::createMessage(exchange, routingKey, messageId, true, 7);
+        MessageUtils::addContent(msg, data);
+
+        queue->deliver(msg);
+
+        QueueCursor cursor;
+        MessageUtils::get(*queue, &cursor);
+        queue->dequeue(0, cursor);
+    }//db will be closed
+    {
+        MessageStoreImpl store(&br);
+        store.init(test_dir, 4, 1);
+        QueueRegistry registry;
+        registry.setStore (&store);
+        recover(store, registry);
+        Queue::shared_ptr queue = registry.find(name);
+        BOOST_REQUIRE(queue);
+        BOOST_CHECK_EQUAL((u_int32_t) 0, queue->getMessageCount());
+    }
+
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(ExchangeCreateAndDestroy)
+{
+    cout << test_filename << ".ExchangeCreateAndDestroy: " << flush;
+
+    uint64_t id(0);
+    string name("MyDurableExchange");
+    string type("direct");
+    FieldTable args;
+    args.setString("a", "A");
+    {
+        MessageStoreImpl store(&br);
+        store.init(test_dir, 4, 1, true); // truncate store
+        ExchangeRegistry registry;
+        Exchange::shared_ptr exchange = registry.declare(name, type, true, args).first;
+        store.create(*exchange, qpid::framing::FieldTable());
+        id = exchange->getPersistenceId();
+        BOOST_REQUIRE(id);
+    }//db will be closed
+    {
+        MessageStoreImpl store(&br);
+        store.init(test_dir, 4, 1);
+        ExchangeRegistry registry;
+
+        recover(store, registry);
+
+        Exchange::shared_ptr exchange = registry.get(name);
+        BOOST_CHECK_EQUAL(id, exchange->getPersistenceId());
+        BOOST_CHECK_EQUAL(type, exchange->getType());
+        BOOST_REQUIRE(exchange->isDurable());
+        BOOST_CHECK_EQUAL(*args.get("a"), *exchange->getArgs().get("a"));
+        store.destroy(*exchange);
+    }
+    {
+        MessageStoreImpl store(&br);
+        store.init(test_dir, 4, 1);
+        ExchangeRegistry registry;
+
+        recover(store, registry);
+
+        try {
+            Exchange::shared_ptr exchange = registry.get(name);
+            BOOST_FAIL("Expected exchange not to be found");
+        } catch (const SessionException& e) {
+            BOOST_CHECK_EQUAL((framing::ReplyCode) 404, e.code);
+        }
+    }
+
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(ExchangeBindAndUnbind)
+{
+    cout << test_filename << ".ExchangeBindAndUnbind: " << flush;
+
+    bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", FieldTable());
+
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(ExchangeBindAndUnbindWithArgs)
+{
+    cout << test_filename << ".ExchangeBindAndUnbindWithArgs: " << flush;
+
+    FieldTable args;
+    args.setString("a", "A");
+    args.setString("b", "B");
+    bindAndUnbind("MyDurableExchange", "MyDurableQueue", "my-routing-key", args);
+
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(ExchangeImplicitUnbind)
+{
+    cout << test_filename << ".ExchangeImplicitUnbind: " << flush;
+
+    string exchangeName("MyDurableExchange");
+    string queueName1("MyDurableQueue1");
+    string queueName2("MyDurableQueue2");
+    string key("my-routing-key");
+    FieldTable args;
+    {
+        MessageStoreImpl store(&br);
+        store.init(test_dir, 4, 1, true); // truncate store
+        Exchange::shared_ptr exchange(new DirectExchange(exchangeName, true, args));
+        Queue::shared_ptr queue1(new Queue(queueName1, 0, &store, 0));
+        Queue::shared_ptr queue2(new Queue(queueName2, 0, &store, 0));
+        store.create(*exchange, qpid::framing::FieldTable());
+        store.create(*queue1, qpid::framing::FieldTable());
+        store.create(*queue2, qpid::framing::FieldTable());
+        store.bind(*exchange, *queue1, key, args);
+        store.bind(*exchange, *queue2, key, args);
+        //delete queue1:
+        store.destroy(*queue1);
+    }//db will be closed
+    {
+        MessageStoreImpl store(&br);
+        store.init(test_dir, 4, 1);
+        ExchangeRegistry exchanges;
+        QueueRegistry queues;
+        LinkRegistry links;
+
+        //ensure recovery works ok:
+        recover(store, queues, exchanges, links);
+
+        Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+        BOOST_REQUIRE(!queues.find(queueName1).get());
+        BOOST_REQUIRE(queues.find(queueName2).get());
+
+        //delete exchange:
+        store.destroy(*exchange);
+    }
+    {
+        MessageStoreImpl store(&br);
+        store.init(test_dir, 4, 1);
+        ExchangeRegistry exchanges;
+        QueueRegistry queues;
+        LinkRegistry links;
+
+        //ensure recovery works ok:
+        recover(store, queues, exchanges, links);
+
+        try {
+            Exchange::shared_ptr exchange = exchanges.get(exchangeName);
+            BOOST_FAIL("Expected exchange not to be found");
+        } catch (const SessionException& e) {
+            BOOST_CHECK_EQUAL((framing::ReplyCode) 404, e.code);
+        }
+        Queue::shared_ptr queue = queues.find(queueName2);
+        store.destroy(*queue);
+    }
+
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_SUITE_END()

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/TestFramework.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/TestFramework.cpp?rev=1430018&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/TestFramework.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/TestFramework.cpp Mon Jan  7 21:24:48 2013
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// Defines broker to be used by tests
+
+#include "unit_test.h"
+#include "TestFramework.h"
+#include "qpid/broker/Broker.h"
+
+#include <iostream>
+
+//BOOST_GLOBAL_FIXTURE( testBroker )

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/TestFramework.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/TestFramework.h?rev=1430018&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/TestFramework.h (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/TestFramework.h Mon Jan  7 21:24:48 2013
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+// Defines broker to be used by tests
+
+#include "unit_test.h"
+
+#include <qpid/broker/Broker.h>
+
+namespace {
+    // test broker
+    qpid::broker::Broker::Options opts;
+    qpid::broker::Broker br(opts);
+/*
+    struct testBroker {
+        testBroker() {}
+        ~testBroker() {}
+    };*/
+}

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/TransactionalTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/TransactionalTest.cpp?rev=1430018&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/TransactionalTest.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/TransactionalTest.cpp Mon Jan  7 21:24:48 2013
@@ -0,0 +1,351 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "unit_test.h"
+
+#include "qpid/legacystore/MessageStoreImpl.h"
+#include <iostream>
+#include "MessageUtils.h"
+#include "qpid/legacystore/StoreException.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/RecoveryManagerImpl.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/log/Statement.h"
+#include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
+
+using namespace mrg::msgstore;
+using namespace qpid;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace std;
+
+namespace {
+qpid::broker::Broker::Options opts;
+qpid::broker::Broker br(opts);
+}
+
+QPID_AUTO_TEST_SUITE(TransactionalTest)
+
+#define SET_LOG_LEVEL(level) \
+    qpid::log::Options opts(""); \
+    opts.selectors.clear(); \
+    opts.selectors.push_back(level); \
+    qpid::log::Logger::instance().configure(opts);
+
+const string test_filename("TransactionalTest");
+const char* tdp = getenv("TMP_DATA_DIR");
+const string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/TransactionalTest");
+
+// Test txn context which has special setCompleteFailure() method which prevents entire "txn complete" process from hapenning
+class TestTxnCtxt : public TxnCtxt
+{
+  public:
+    TestTxnCtxt(IdSequence* _loggedtx) : TxnCtxt(_loggedtx) {}
+    void setCompleteFailure(const unsigned num_queues_rem) {
+        // Remove queue members from back of impactedQueues until queues_rem reamin.
+        // to end to simulate multi-queue txn complete failure.
+        while (impactedQueues.size() > num_queues_rem) impactedQueues.erase(impactedQueues.begin());
+    }
+    void resetPreparedXidStorePtr() { preparedXidStorePtr = 0; }
+};
+
+// Test store which has special begin() which returns a TestTPCTxnCtxt, and a method to check for
+// remaining open transactions.
+// begin(), commit(), and abort() all hide functions in MessageStoreImpl. To avoid the compiler
+// warnings/errors these are renamed with a 'TMS' prefix.
+class TestMessageStore: public MessageStoreImpl
+{
+  public:
+    TestMessageStore(qpid::broker::Broker* br, const char* envpath = 0) : MessageStoreImpl(br, envpath) {}
+    std::auto_ptr<qpid::broker::TransactionContext> TMSbegin() {
+        checkInit();
+        // pass sequence number for c/a
+        return auto_ptr<TransactionContext>(new TestTxnCtxt(&messageIdSequence));
+    }
+    void TMScommit(TransactionContext& ctxt, const bool complete_prepared_list) {
+        checkInit();
+        TxnCtxt* txn(check(&ctxt));
+        if (!txn->isTPC()) {
+            localPrepare(dynamic_cast<TxnCtxt*>(txn));
+            if (!complete_prepared_list) dynamic_cast<TestTxnCtxt*>(txn)->resetPreparedXidStorePtr();
+        }
+        completed(*dynamic_cast<TxnCtxt*>(txn), true);
+    }
+    void TMSabort(TransactionContext& ctxt, const bool complete_prepared_list)
+    {
+        checkInit();
+        TxnCtxt* txn(check(&ctxt));
+        if (!txn->isTPC()) {
+            localPrepare(dynamic_cast<TxnCtxt*>(txn));
+            if (!complete_prepared_list) dynamic_cast<TestTxnCtxt*>(txn)->resetPreparedXidStorePtr();
+        }
+        completed(*dynamic_cast<TxnCtxt*>(txn), false);
+    }
+};
+
+// === Helper fns ===
+
+const string nameA("queueA");
+const string nameB("queueB");
+//const Uuid messageId(true);
+std::auto_ptr<MessageStoreImpl> store;
+std::auto_ptr<QueueRegistry> queues;
+Queue::shared_ptr queueA;
+Queue::shared_ptr queueB;
+
+template <class T>
+void setup()
+{
+    store = std::auto_ptr<T>(new T(&br));
+    store->init(test_dir, 4, 1, true); // truncate store
+
+    //create two queues:
+    queueA = Queue::shared_ptr(new Queue(nameA, 0, store.get(), 0));
+    queueA->create();
+    queueB = Queue::shared_ptr(new Queue(nameB, 0, store.get(), 0));
+    queueB->create();
+}
+
+template <class T>
+void restart()
+{
+    queueA.reset();
+    queueB.reset();
+    queues.reset();
+    store.reset();
+
+    store = std::auto_ptr<T>(new T(&br));
+    store->init(test_dir, 4, 1);
+    queues = std::auto_ptr<QueueRegistry>(new QueueRegistry);
+    ExchangeRegistry exchanges;
+    LinkRegistry links;
+    sys::Timer t;
+    DtxManager mgr(t);
+    mgr.setStore (store.get());
+    RecoveryManagerImpl recovery(*queues, exchanges, links, mgr, br.getProtocolRegistry());
+    store->recover(recovery);
+
+    queueA = queues->find(nameA);
+    queueB = queues->find(nameB);
+}
+
+Message createMessage(const string& id, const string& exchange="exchange", const string& key="routing_key")
+{
+    return MessageUtils::createMessage(exchange, key, Uuid(), true, 0, id);
+}
+
+void checkMsg(Queue::shared_ptr& queue, u_int32_t size, const string& msgid = "<none>")
+{
+    BOOST_REQUIRE(queue);
+    BOOST_CHECK_EQUAL(size, queue->getMessageCount());
+    if (size > 0) {
+        Message msg = MessageUtils::get(*queue);
+        BOOST_REQUIRE(msg);
+        BOOST_CHECK_EQUAL(msgid, MessageUtils::getCorrelationId(msg));
+    }
+}
+
+void swap(bool commit)
+{
+    setup<MessageStoreImpl>();
+
+    //create message and enqueue it onto first queue:
+    Message msgA = createMessage("Message", "exchange", "routing_key");
+    queueA->deliver(msgA);
+
+    QueueCursor cursorB;
+    Message msgB = MessageUtils::get(*queueA, &cursorB);
+    BOOST_REQUIRE(msgB);
+    //move the message from one queue to the other as a transaction
+    std::auto_ptr<TransactionContext> txn = store->begin();
+    TxBuffer tx;
+    queueB->deliver(msgB, &tx);//note: need to enqueue it first to avoid message being deleted
+
+    queueA->dequeue(txn.get(), cursorB);
+    tx.prepare(txn.get());
+    if (commit) {
+        store->commit(*txn);
+    } else {
+        store->abort(*txn);
+    }
+
+    restart<MessageStoreImpl>();
+
+    // Check outcome
+    BOOST_REQUIRE(queueA);
+    BOOST_REQUIRE(queueB);
+
+    Queue::shared_ptr x;//the queue from which the message was swapped
+    Queue::shared_ptr y;//the queue on which the message is expected to be
+
+    if (commit) {
+        x = queueA;
+        y = queueB;
+    } else {
+        x = queueB;
+        y = queueA;
+    }
+
+    checkMsg(x, 0);
+    checkMsg(y, 1, "Message");
+    checkMsg(y, 0);
+}
+
+void testMultiQueueTxn(const unsigned num_queues_rem, const bool complete_prepared_list, const bool commit)
+{
+    setup<TestMessageStore>();
+    TestMessageStore* tmsp = static_cast<TestMessageStore*>(store.get());
+    std::auto_ptr<TransactionContext> txn(tmsp->TMSbegin());
+    TxBuffer tx;
+
+    //create two messages and enqueue them onto both queues:
+    Message msgA = createMessage("MessageA", "exchange", "routing_key");
+    queueA->deliver(msgA, &tx);
+    queueB->deliver(msgA, &tx);
+    Message msgB = createMessage("MessageB", "exchange", "routing_key");
+    queueA->deliver(msgB, &tx);
+    queueB->deliver(msgB, &tx);
+
+    tx.prepare(txn.get());
+    static_cast<TestTxnCtxt*>(txn.get())->setCompleteFailure(num_queues_rem);
+    if (commit)
+        tmsp->TMScommit(*txn, complete_prepared_list);
+    else
+        tmsp->TMSabort(*txn, complete_prepared_list);
+    restart<TestMessageStore>();
+
+    // Check outcome
+    if (commit)
+    {
+        checkMsg(queueA, 2, "MessageA");
+        checkMsg(queueB, 2, "MessageA");
+        checkMsg(queueA, 1, "MessageB");
+        checkMsg(queueB, 1, "MessageB");
+    }
+    checkMsg(queueA, 0);
+    checkMsg(queueB, 0);
+}
+
+// === Test suite ===
+
+QPID_AUTO_TEST_CASE(Commit)
+{
+    SET_LOG_LEVEL("error+"); // This only needs to be set once.
+
+    cout << test_filename << ".Commit: " << flush;
+    swap(true);
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(Abort)
+{
+    cout << test_filename << ".Abort: " << flush;
+    swap(false);
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueCommit)
+{
+    cout << test_filename << ".MultiQueueCommit: " << flush;
+    testMultiQueueTxn(2, true, true);
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAbort)
+{
+    cout << test_filename << ".MultiQueueAbort: " << flush;
+    testMultiQueueTxn(2, true, false);
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueCommitRecover)
+{
+    cout << test_filename << ".MultiQueueNoQueueCommitRecover: " << flush;
+    testMultiQueueTxn(0, false, true);
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueAbortRecover)
+{
+    cout << test_filename << ".MultiQueueNoQueueAbortRecover: " << flush;
+    testMultiQueueTxn(0, false, false);
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueCommitRecover)
+{
+    cout << test_filename << ".MultiQueueSomeQueueCommitRecover: " << flush;
+    testMultiQueueTxn(1, false, true);
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueAbortRecover)
+{
+    cout << test_filename << ".MultiQueueSomeQueueAbortRecover: " << flush;
+    testMultiQueueTxn(1, false, false);
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueCommitRecover)
+{
+    cout << test_filename << ".MultiQueueAllQueueCommitRecover: " << flush;
+    testMultiQueueTxn(2, false, true);
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueAbortRecover)
+{
+    cout << test_filename << ".MultiQueueAllQueueAbortRecover: " << flush;
+    testMultiQueueTxn(2, false, false);
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(LockedRecordTest)
+{
+    cout << test_filename << ".LockedRecordTest: " << flush;
+
+    setup<MessageStoreImpl>();
+    queueA->deliver(createMessage("Message", "exchange", "routingKey"));
+    std::auto_ptr<TransactionContext> txn = store->begin();
+
+    QueueCursor cursor;
+    Message msg = MessageUtils::get(*queueA, &cursor);
+    queueA->dequeue(txn.get(), cursor);
+
+    try {
+        store->dequeue(0, msg.getPersistentContext(), *queueA);
+        BOOST_ERROR("Did not throw JERR_MAP_LOCKED exception as expected.");
+    }
+    catch (const mrg::msgstore::StoreException& e) {
+        if (std::strstr(e.what(), "JERR_MAP_LOCKED") == 0)
+            BOOST_ERROR("Unexpected StoreException: " << e.what());
+    }
+    catch (const std::exception& e) {
+        BOOST_ERROR("Unexpected exception: " << e.what());
+    }
+    store->commit(*txn);
+    checkMsg(queueA, 0);
+
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_SUITE_END()

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/TwoPhaseCommitTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/TwoPhaseCommitTest.cpp?rev=1430018&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/TwoPhaseCommitTest.cpp (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/TwoPhaseCommitTest.cpp Mon Jan  7 21:24:48 2013
@@ -0,0 +1,675 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "unit_test.h"
+
+#include "qpid/legacystore/MessageStoreImpl.h"
+#include <iostream>
+#include "MessageUtils.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/RecoveryManagerImpl.h"
+#include "qpid/framing/AMQHeaderBody.h"
+#include "qpid/log/Statement.h"
+#include "qpid/legacystore/TxnCtxt.h"
+#include "qpid/log/Logger.h"
+#include "qpid/sys/Timer.h"
+
+using namespace mrg::msgstore;
+using namespace qpid;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace std;
+
+
+qpid::broker::Broker::Options opts;
+qpid::broker::Broker br(opts);
+
+
+QPID_AUTO_TEST_SUITE(TwoPhaseCommitTest)
+
+#define SET_LOG_LEVEL(level) \
+    qpid::log::Options opts(""); \
+    opts.selectors.clear(); \
+    opts.selectors.push_back(level); \
+    qpid::log::Logger::instance().configure(opts);
+
+
+const string test_filename("TwoPhaseCommitTest");
+const char* tdp = getenv("TMP_DATA_DIR");
+string test_dir(tdp && strlen(tdp) > 0 ? tdp : "/tmp/TwoPhaseCommitTest");
+
+// === Helper fns ===
+
+class TwoPhaseCommitTest
+{
+
+    class Strategy
+    {
+    public:
+        virtual void init() = 0;
+        virtual void run(TPCTransactionContext* txn) = 0;
+        virtual void check(bool committed) = 0;
+        virtual ~Strategy(){}
+    };
+
+    class Swap : public Strategy
+    {
+        TwoPhaseCommitTest* const test;
+        const string messageId;
+        Message msg;
+    public:
+        Swap(TwoPhaseCommitTest* const test_, const string& messageId_): test(test_), messageId(messageId_) {}
+        void init(){ msg = test->deliver(messageId, test->queueA); }
+        void run(TPCTransactionContext* txn) { test->swap(txn, test->queueA, test->queueB); }
+        void check(bool committed) { test->swapCheck(committed, messageId,  test->queueA, test->queueB); }
+    };
+
+    class Enqueue : public Strategy
+    {
+        TwoPhaseCommitTest* const test;
+	    Message msg1;
+	    Message msg2;
+	    Message msg3;
+    public:
+        Enqueue(TwoPhaseCommitTest* const test_): test(test_) {}
+        void init() {}
+        void run(TPCTransactionContext* txn) {
+            msg1 = test->enqueue(txn, "Enqueue1", test->queueA);
+            msg2 = test->enqueue(txn, "Enqueue2", test->queueA);
+            msg3 = test->enqueue(txn, "Enqueue3", test->queueA);
+        }
+        void check(bool committed) {
+            if (committed) {
+                test->checkMsg(test->queueA, 3, "Enqueue1");
+                test->checkMsg(test->queueA, 2, "Enqueue2");
+                test->checkMsg(test->queueA, 1, "Enqueue3");
+            }
+            test->checkMsg(test->queueA, 0);
+        }
+    };
+
+    class Dequeue : public Strategy
+    {
+        TwoPhaseCommitTest* const test;
+	    Message msg1;
+	    Message msg2;
+	    Message msg3;
+    public:
+        Dequeue(TwoPhaseCommitTest* const test_): test(test_) {}
+        void init() {
+            msg1 = test->deliver("Dequeue1", test->queueA);
+            msg2 = test->deliver("Dequeue2", test->queueA);
+            msg3 = test->deliver("Dequeue3", test->queueA);
+        }
+        void run(TPCTransactionContext* txn) {
+            test->dequeue(txn, test->queueA);
+            test->dequeue(txn, test->queueA);
+            test->dequeue(txn, test->queueA);
+        }
+        void check(bool committed) {
+            if (!committed) {
+                test->checkMsg(test->queueA, 3, "Dequeue1");
+                test->checkMsg(test->queueA, 2, "Dequeue2");
+                test->checkMsg(test->queueA, 1, "Dequeue3");
+            }
+            test->checkMsg(test->queueA, 0);
+        }
+    };
+
+    class MultiQueueTxn : public Strategy
+    {
+        TwoPhaseCommitTest* const test;
+	    Message msg1;
+	    Message msg2;
+        std::set<Queue::shared_ptr> queueset;
+    public:
+        MultiQueueTxn(TwoPhaseCommitTest* const test_): test(test_) {}
+        virtual void init() {}
+        virtual void run(TPCTransactionContext* txn) {
+            queueset.insert(test->queueA);
+            queueset.insert(test->queueB);
+            msg1 = test->enqueue(txn, "Message1", queueset);
+            msg2 = test->enqueue(txn, "Message2", queueset);
+            queueset.clear();
+        }
+        virtual void check(bool committed) {
+            TestMessageStore* sptr = static_cast<TestMessageStore*>(test->store.get());
+            if (committed)
+            {
+                test->checkMsg(test->queueA, 2, "Message1");
+                test->checkMsg(test->queueB, 2, "Message1");
+                test->checkMsg(test->queueA, 1, "Message2");
+                test->checkMsg(test->queueB, 1, "Message2");
+            }
+            test->checkMsg(test->queueA, 0);
+            test->checkMsg(test->queueB, 0);
+            // Check there are no remaining open txns in store
+            BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingTxns(*(test->queueA)));
+            BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingTxns(*(test->queueB)));
+            BOOST_CHECK_EQUAL(u_int32_t(0), sptr->getRemainingPreparedListTxns());
+        }
+    };
+
+    // Test txn context which has special setCompleteFailure() method which prevents entire "txn complete" process from hapenning
+    class TestTPCTxnCtxt : public TPCTxnCtxt
+    {
+      public:
+        TestTPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TPCTxnCtxt(_xid, _loggedtx) {}
+        void setCompleteFailure(const unsigned num_queues_rem, const bool complete_prepared_list) {
+            // Remove queue members from back of impactedQueues until queues_rem reamin.
+            // to end to simulate multi-queue txn complete failure.
+            while (impactedQueues.size() > num_queues_rem) impactedQueues.erase(impactedQueues.begin());
+            // If prepared list is not to be committed, set pointer to 0
+            if (!complete_prepared_list) preparedXidStorePtr = 0;
+        }
+    };
+
+    // Test store which has sepcial begin() which returns a TestTPCTxnCtxt, and a method to check for
+    // reamining open transactions
+    class TestMessageStore: public MessageStoreImpl
+    {
+      public:
+        TestMessageStore(qpid::broker::Broker* br, const char* envpath = 0) : MessageStoreImpl(br, envpath) {}
+        std::auto_ptr<qpid::broker::TPCTransactionContext> TMSbegin(const std::string& xid) {
+            checkInit();
+            IdSequence* jtx = &messageIdSequence;
+            // pass sequence number for c/a
+            return auto_ptr<TPCTransactionContext>(new TestTPCTxnCtxt(xid, jtx));
+        }
+        u_int32_t getRemainingTxns(const PersistableQueue& queue) {
+            return static_cast<JournalImpl*>(queue.getExternalQueueStore())->get_open_txn_cnt();
+        }
+        u_int32_t getRemainingPreparedListTxns() {
+            return tplStorePtr->get_open_txn_cnt();
+        }
+    };
+
+    const string nameA;
+    const string nameB;
+    std::auto_ptr<MessageStoreImpl> store;
+    std::auto_ptr<DtxManager> dtxmgr;
+    std::auto_ptr<QueueRegistry> queues;
+    std::auto_ptr<LinkRegistry> links;
+    Queue::shared_ptr queueA;
+    Queue::shared_ptr queueB;
+    Message msg1;
+    Message msg2;
+    Message msg4;
+    std::auto_ptr<TxBuffer> tx;
+
+    void recoverPrepared(bool commit)
+    {
+        setup<MessageStoreImpl>();
+
+        Swap swap(this, "RecoverPrepared");
+        swap.init();
+        std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid"));
+        swap.run(txn.get());
+        if (tx.get()) {
+            tx->prepare(txn.get());
+            tx.reset();
+        }
+
+        store->prepare(*txn);
+        restart<MessageStoreImpl>();
+
+        //check that the message is not available from either queue
+        BOOST_CHECK_EQUAL((u_int32_t) 0, queueA->getMessageCount());
+        BOOST_CHECK_EQUAL((u_int32_t) 0, queueB->getMessageCount());
+
+        //commit/abort the txn - through the dtx manager, not directly on the store
+        if (commit) {
+            dtxmgr->commit("my-xid", false);
+        } else {
+            dtxmgr->rollback("my-xid");
+        }
+
+        swap.check(commit);
+        restart<MessageStoreImpl>();
+        swap.check(commit);
+    }
+
+    void testMultiQueueTxn(const unsigned num_queues_rem, const bool complete_prepared_list, const bool commit)
+    {
+        setup<TestMessageStore>();
+        MultiQueueTxn mqtTest(this);
+        mqtTest.init();
+        std::auto_ptr<TPCTransactionContext> txn(static_cast<TestMessageStore*>(store.get())->begin("my-xid"));
+        mqtTest.run(txn.get());
+        if (tx.get()) {
+            tx->prepare(txn.get());
+            tx.reset();
+        }
+        store->prepare(*txn);
+
+        // As the commits and aborts should happen through DtxManager, and it is too complex to
+        // pass all these test params through, we bypass DtxManager and use the store directly.
+        // This will prevent the queues from seeing committed txns, however. To test the success
+        // or failure of
+        static_cast<TestTPCTxnCtxt*>(txn.get())->setCompleteFailure(num_queues_rem, complete_prepared_list);
+        if (commit)
+            store->commit(*txn);
+        else
+            store->abort(*txn);
+        restart<TestMessageStore>();
+        mqtTest.check(commit);
+    }
+
+    void commit(Strategy& strategy)
+    {
+        setup<MessageStoreImpl>();
+        strategy.init();
+
+        std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid"));
+        strategy.run(txn.get());
+        if (tx.get()) {
+            tx->prepare(txn.get());
+            tx.reset();
+        }
+        store->prepare(*txn);
+        store->commit(*txn);
+        restart<MessageStoreImpl>();
+        strategy.check(true);
+    }
+
+    void abort(Strategy& strategy, bool prepare)
+    {
+        setup<MessageStoreImpl>();
+        strategy.init();
+
+        std::auto_ptr<TPCTransactionContext> txn(store->begin("my-xid"));
+        strategy.run(txn.get());
+        if (tx.get()) {
+            tx->prepare(txn.get());
+            tx.reset();
+        }
+        if (prepare) store->prepare(*txn);
+        store->abort(*txn);
+        restart<MessageStoreImpl>();
+        strategy.check(false);
+    }
+
+    void swap(TPCTransactionContext* txn, Queue::shared_ptr& from, Queue::shared_ptr& to)
+    {
+        QueueCursor c;
+        Message msg1 = MessageUtils::get(*from, &c);//just dequeues in memory
+        //move the message from one queue to the other as part of a
+        //distributed transaction
+        if (!tx.get()) tx = std::auto_ptr<TxBuffer>(new TxBuffer);
+        to->deliver(msg1, tx.get());//note: need to enqueue it first to avoid message being deleted
+        from->dequeue(txn, c);
+    }
+
+    void dequeue(TPCTransactionContext* txn, Queue::shared_ptr& queue)
+    {
+        QueueCursor c;
+        Message msg2 = MessageUtils::get(*queue, &c);//just dequeues in memory
+        queue->dequeue(txn, c);
+    }
+
+    Message enqueue(TPCTransactionContext* /*txn*/, const string& msgid, Queue::shared_ptr& queue)
+    {
+        Message msg = createMessage(msgid);
+        if (!tx.get()) tx = std::auto_ptr<TxBuffer>(new TxBuffer);
+        queue->deliver(msg, tx.get());
+        return msg;
+    }
+
+    Message enqueue(TPCTransactionContext* /*txn*/, const string& msgid, std::set<Queue::shared_ptr>& queueset)
+    {
+        if (!tx.get()) tx = std::auto_ptr<TxBuffer>(new TxBuffer);
+        Message msg = createMessage(msgid);
+        for (std::set<Queue::shared_ptr>::iterator i = queueset.begin(); i != queueset.end(); i++) {
+            (*i)->deliver(msg, tx.get());
+        }
+        return msg;
+    }
+
+    Message deliver(const string& msgid, Queue::shared_ptr& queue)
+    {
+        Message m = createMessage(msgid);
+        queue->deliver(m);
+        return m;
+    }
+
+    template <class T>
+    void setup()
+    {
+        store = std::auto_ptr<T>(new T(&br));
+        store->init(test_dir, 4, 1, true); // truncate store
+
+        //create two queues:
+        queueA = Queue::shared_ptr(new Queue(nameA, 0, store.get(), 0));
+        queueA->create();
+        queueB = Queue::shared_ptr(new Queue(nameB, 0, store.get(), 0));
+        queueB->create();
+    }
+
+    Message createMessage(const string& id, const string& exchange="exchange", const string& key="routing_key")
+    {
+        Message msg = MessageUtils::createMessage(exchange, key, Uuid(), true, 0, id);
+        return msg;
+    }
+
+    template <class T>
+    void restart()
+    {
+        queueA.reset();
+        queueB.reset();
+        store.reset();
+        queues.reset();
+        links.reset();
+
+        store = std::auto_ptr<T>(new T(&br));
+        store->init(test_dir, 4, 1);
+        sys::Timer t;
+        ExchangeRegistry exchanges;
+        queues = std::auto_ptr<QueueRegistry>(new QueueRegistry);
+        links = std::auto_ptr<LinkRegistry>(new LinkRegistry);
+        dtxmgr = std::auto_ptr<DtxManager>(new DtxManager(t));
+        dtxmgr->setStore (store.get());
+        RecoveryManagerImpl recovery(*queues, exchanges, *links, *dtxmgr, br.getProtocolRegistry());
+        store->recover(recovery);
+
+        queueA = queues->find(nameA);
+        queueB = queues->find(nameB);
+    }
+
+    void checkMsg(Queue::shared_ptr& queue, u_int32_t size, const string& msgid = "<none>")
+    {
+        BOOST_REQUIRE(queue);
+        BOOST_CHECK_EQUAL(size, queue->getMessageCount());
+        if (size > 0) {
+            Message msg = MessageUtils::get(*queue);
+            BOOST_REQUIRE(msg);
+            BOOST_CHECK_EQUAL(msgid, MessageUtils::getCorrelationId(msg));
+        }
+    }
+
+    void swapCheck(bool swapped, const string& msgid, Queue::shared_ptr& from, Queue::shared_ptr& to)
+    {
+        BOOST_REQUIRE(from);
+        BOOST_REQUIRE(to);
+
+        Queue::shared_ptr x; //the queue from which the message was swapped
+        Queue::shared_ptr y; //the queue on which the message is expected to be
+
+        if (swapped) {
+            x = from;
+            y = to;
+        } else {
+            x = to;
+            y = from;
+        }
+
+        checkMsg(x, 0);
+        checkMsg(y, 1, msgid);
+        checkMsg(y, 0);
+    }
+
+public:
+    TwoPhaseCommitTest() : nameA("queueA"), nameB("queueB") {}
+
+    void testCommitEnqueue()
+    {
+        Enqueue enqueue(this);
+        commit(enqueue);
+    }
+
+    void testCommitDequeue()
+    {
+        Dequeue dequeue(this);
+        commit(dequeue);
+    }
+
+    void testCommitSwap()
+    {
+        Swap swap(this, "SwapMessageId");
+        commit(swap);
+    }
+
+    void testPrepareAndAbortEnqueue()
+    {
+        Enqueue enqueue(this);
+        abort(enqueue, true);
+    }
+
+    void testPrepareAndAbortDequeue()
+    {
+        Dequeue dequeue(this);
+        abort(dequeue, true);
+    }
+
+    void testPrepareAndAbortSwap()
+    {
+        Swap swap(this, "SwapMessageId");
+        abort(swap, true);
+    }
+
+    void testAbortNoPrepareEnqueue()
+    {
+        Enqueue enqueue(this);
+        abort(enqueue, false);
+    }
+
+    void testAbortNoPrepareDequeue()
+    {
+        Dequeue dequeue(this);
+        abort(dequeue, false);
+    }
+
+    void testAbortNoPrepareSwap()
+    {
+        Swap swap(this, "SwapMessageId");
+        abort(swap, false);
+    }
+
+    void testRecoverPreparedThenCommitted()
+    {
+        recoverPrepared(true);
+    }
+
+    void testRecoverPreparedThenAborted()
+    {
+        recoverPrepared(false);
+    }
+
+    void testMultiQueueCommit()
+    {
+        testMultiQueueTxn(2, true, true);
+    }
+
+    void testMultiQueueAbort()
+    {
+        testMultiQueueTxn(2, true, false);
+    }
+
+    void testMultiQueueNoQueueCommitRecover()
+    {
+        testMultiQueueTxn(0, false, true);
+    }
+
+    void testMultiQueueNoQueueAbortRecover()
+    {
+        testMultiQueueTxn(0, false, false);
+    }
+
+    void testMultiQueueSomeQueueCommitRecover()
+    {
+        testMultiQueueTxn(1, false, true);
+    }
+
+    void testMultiQueueSomeQueueAbortRecover()
+    {
+        testMultiQueueTxn(1, false, false);
+    }
+
+    void testMultiQueueAllQueueCommitRecover()
+    {
+        testMultiQueueTxn(2, false, true);
+    }
+
+    void testMultiQueueAllQueueAbortRecover()
+    {
+        testMultiQueueTxn(2, false, false);
+    }
+};
+
+TwoPhaseCommitTest tpct;
+
+// === Test suite ===
+
+QPID_AUTO_TEST_CASE(CommitEnqueue)
+{
+    SET_LOG_LEVEL("error+"); // This only needs to be set once.
+
+    cout << test_filename << ".CommitEnqueue: " << flush;
+    tpct.testCommitEnqueue();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(CommitDequeue)
+{
+    cout << test_filename << ".CommitDequeue: " << flush;
+    tpct.testCommitDequeue();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(CommitSwap)
+{
+    cout << test_filename << ".CommitSwap: " << flush;
+    tpct.testCommitSwap();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(PrepareAndAbortEnqueue)
+{
+    cout << test_filename << ".PrepareAndAbortEnqueue: " << flush;
+    tpct.testPrepareAndAbortEnqueue();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(PrepareAndAbortDequeue)
+{
+    cout << test_filename << ".PrepareAndAbortDequeue: " << flush;
+    tpct.testPrepareAndAbortDequeue();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(PrepareAndAbortSwap)
+{
+    cout << test_filename << ".PrepareAndAbortSwap: " << flush;
+    tpct.testPrepareAndAbortSwap();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(AbortNoPrepareEnqueue)
+{
+    cout << test_filename << ".AbortNoPrepareEnqueue: " << flush;
+    tpct.testAbortNoPrepareEnqueue();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(AbortNoPrepareDequeue)
+{
+    cout << test_filename << ".AbortNoPrepareDequeue: " << flush;
+    tpct.testAbortNoPrepareDequeue();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(AbortNoPrepareSwap)
+{
+    cout << test_filename << ".AbortNoPrepareSwap: " << flush;
+    tpct.testAbortNoPrepareSwap();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(RecoverPreparedThenCommitted)
+{
+    cout << test_filename << ".RecoverPreparedThenCommitted: " << flush;
+    tpct.testRecoverPreparedThenCommitted();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(RecoverPreparedThenAborted)
+{
+    cout << test_filename << ".RecoverPreparedThenAborted: " << flush;
+    tpct.testRecoverPreparedThenAborted();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueCommit)
+{
+    cout << test_filename << ".MultiQueueCommit: " << flush;
+    tpct.testMultiQueueCommit();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAbort)
+{
+    cout << test_filename << ".MultiQueueAbort: " << flush;
+    tpct.testMultiQueueAbort();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueCommitRecover)
+{
+    cout << test_filename << ".MultiQueueNoQueueCommitRecover: " << flush;
+    tpct.testMultiQueueNoQueueCommitRecover();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueNoQueueAbortRecover)
+{
+    cout << test_filename << ".MultiQueueNoQueueAbortRecover: " << flush;
+    tpct.testMultiQueueNoQueueAbortRecover();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueCommitRecover)
+{
+    cout << test_filename << ".MultiQueueSomeQueueCommitRecover: " << flush;
+    tpct.testMultiQueueSomeQueueCommitRecover();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueSomeQueueAbortRecover)
+{
+    cout << test_filename << ".MultiQueueSomeQueueAbortRecover: " << flush;
+    tpct.testMultiQueueSomeQueueAbortRecover();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueCommitRecover)
+{
+    cout << test_filename << ".MultiQueueAllQueueCommitRecover: " << flush;
+    tpct.testMultiQueueAllQueueCommitRecover();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_CASE(MultiQueueAllQueueAbortRecover)
+{
+    cout << test_filename << ".MultiQueueAllQueueAbortRecover: " << flush;
+    tpct.testMultiQueueAllQueueAbortRecover();
+    cout << "ok" << endl;
+}
+
+QPID_AUTO_TEST_SUITE_END()

Added: qpid/trunk/qpid/cpp/src/tests/legacystore/clean.sh
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/legacystore/clean.sh?rev=1430018&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/legacystore/clean.sh (added)
+++ qpid/trunk/qpid/cpp/src/tests/legacystore/clean.sh Mon Jan  7 21:24:48 2013
@@ -0,0 +1,32 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# This script cleans up any previous database and journal files, and should
+# be run prior to the store system tests, as these are prone to crashing or
+# hanging under some circumstances if the database is old or inconsistent.
+
+if [ -d ${TMP_DATA_DIR} ]; then
+    rm -rf ${TMP_DATA_DIR}
+fi
+if [ -d ${TMP_PYTHON_TEST_DIR} ]; then
+    rm -rf ${TMP_PYTHON_TEST_DIR}
+fi
+rm -f ${abs_srcdir}/*.vglog*



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message