qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1161742 - in /qpid/trunk/qpid/cpp: include/qpid/ rubygen/ src/ src/qpid/ src/qpid/broker/ src/qpid/cluster/ src/qpid/log/ src/tests/ xml/
Date Thu, 25 Aug 2011 20:41:30 GMT
Author: aconway
Date: Thu Aug 25 20:41:28 2011
New Revision: 1161742

URL: http://svn.apache.org/viewvc?rev=1161742&view=rev
Log:
QPID-3384: Enable DTX transactions in a cluster.

- Replicate DTX state to new members joining.
- Use cluster timer for DTX timeouts.
- Incidental: quote nulls in qpid::Msg messages (XIDs often have null characters)

Added:
    qpid/trunk/qpid/cpp/src/qpid/Msg.cpp   (with props)
Modified:
    qpid/trunk/qpid/cpp/include/qpid/Msg.h
    qpid/trunk/qpid/cpp/rubygen/amqpgen.rb
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
    qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h
    qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h
    qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h
    qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h
    qpid/trunk/qpid/cpp/src/qpid/log/Statement.cpp
    qpid/trunk/qpid/cpp/src/tests/brokertest.py
    qpid/trunk/qpid/cpp/src/tests/cluster_python_tests_failing.txt
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/include/qpid/Msg.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/include/qpid/Msg.h?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/include/qpid/Msg.h (original)
+++ qpid/trunk/qpid/cpp/include/qpid/Msg.h Thu Aug 25 20:41:28 2011
@@ -41,7 +41,7 @@ struct Msg {
     std::ostringstream os;
     Msg() {}
     Msg(const Msg& m) : os(m.str()) {}
-    std::string str() const { return os.str(); }
+    std::string str() const;
     operator std::string() const { return str(); }
 
     Msg& operator<<(long n) { os << n; return *this; }

Modified: qpid/trunk/qpid/cpp/rubygen/amqpgen.rb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/rubygen/amqpgen.rb?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/rubygen/amqpgen.rb (original)
+++ qpid/trunk/qpid/cpp/rubygen/amqpgen.rb Thu Aug 25 20:41:28 2011
@@ -191,7 +191,8 @@ class AmqpElement
     "command-fragments" => "session.command-fragment",
     "in-doubt" => "dtx.xid",
     "tx-publish" => "str-8",
-    "queues" => "str-8"
+    "queues" => "str-8",
+    "prepared" => "str-8"
   }
 
   def array_type(name)

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Thu Aug 25 20:41:28 2011
@@ -89,7 +89,7 @@ rgen_cmd=ruby -I $(rgen_dir) $(rgen_dir)
 
 $(rgen_srcs) $(srcdir)/rubygen.mk: rgen.timestamp
 rgen.timestamp: $(rgen_generator) $(specs)
-	$(rgen_cmd) $(srcdir)/rubygen.mk; touch $@
+	$(rgen_cmd) $(srcdir)/rubygen.mk && touch $@
 $(rgen_generator):
 
 # The CMake version is needed for dist
@@ -435,6 +435,7 @@ libqpidcommon_la_SOURCES +=			\
   qpid/log/OstreamOutput.h			\
   qpid/log/Selector.cpp				\
   qpid/log/Statement.cpp			\
+  qpid/Msg.cpp					\
   qpid/management/Buffer.cpp			\
   qpid/management/ConnectionSettings.cpp	\
   qpid/management/Manageable.cpp		\

Added: qpid/trunk/qpid/cpp/src/qpid/Msg.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/Msg.cpp?rev=1161742&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/Msg.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/Msg.cpp Thu Aug 25 20:41:28 2011
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Msg.h"
+#include <string>
+
+namespace qpid {
+using namespace std;
+
+struct NonPrint { bool operator()(unsigned char c) { return !isprint(c) && !isspace(c); } };
+
+const char hex[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' };
+
+std::string quote(const std::string& str) {
+    NonPrint nonPrint;
+    size_t n = std::count_if(str.begin(), str.end(), nonPrint);
+    if (n==0) return str;
+    std::string ret;
+    ret.reserve(str.size()+2*n); // Avoid extra allocations.
+    for (string::const_iterator i = str.begin(); i != str.end(); ++i) {
+        if (nonPrint(*i)) {
+            ret.push_back('\\');
+            ret.push_back('x');
+            ret.push_back(hex[((*i) >> 4)&0xf]);
+            ret.push_back(hex[(*i) & 0xf]);
+        }
+        else ret.push_back(*i);
+    }
+    return ret;
+}
+
+// Quote the string so messages with null characters are preserved, e.g. messages with XIDs */
+std::string Msg::str() const {
+    return quote(os.str());
+}
+
+} // namespace qpid

Propchange: qpid/trunk/qpid/cpp/src/qpid/Msg.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/Msg.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Aug 25 20:41:28 2011
@@ -752,6 +752,7 @@ bool Broker::deferDeliveryImpl(const std
 void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) {
     clusterTimer = t;
     queueCleaner.setTimer(clusterTimer.get());
+    dtxManager.setTimer(*clusterTimer.get());
 }
 
 const std::string Broker::TCP_TRANSPORT("tcp");

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp Thu Aug 25 20:41:28 2011
@@ -32,6 +32,10 @@ DtxAck::DtxAck(const qpid::framing::Sequ
                    not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)));
 }
 
+DtxAck::DtxAck(DeliveryRecords& unacked) {
+    pending = unacked;
+}
+
 bool DtxAck::prepare(TransactionContext* ctxt) throw()
 {
     try{

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h Thu Aug 25 20:41:28 2011
@@ -1,3 +1,6 @@
+#ifndef QPID_BROKER_DTXACK_H
+#define QPID_BROKER_DTXACK_H
+
 /*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -18,9 +21,6 @@
  * under the License.
  *
  */
-#ifndef _DtxAck_
-#define _DtxAck_
-
 #include <algorithm>
 #include <functional>
 #include <list>
@@ -29,20 +29,21 @@
 #include "qpid/broker/TxOp.h"
 
 namespace qpid {
-    namespace broker {
-        class DtxAck : public TxOp{
-            DeliveryRecords pending;
+namespace broker {
+class DtxAck : public TxOp{
+    DeliveryRecords pending;
 
-        public:
-            DtxAck(const framing::SequenceSet& acked, DeliveryRecords& unacked);
-            virtual bool prepare(TransactionContext* ctxt) throw();
-            virtual void commit() throw();
-            virtual void rollback() throw();
-            virtual ~DtxAck(){}
-            virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
-        };
-    }
-}
+  public:
+    DtxAck(const framing::SequenceSet& acked, DeliveryRecords& unacked);
+    DtxAck(DeliveryRecords& unacked);
+    virtual bool prepare(TransactionContext* ctxt) throw();
+    virtual void commit() throw();
+    virtual void rollback() throw();
+    virtual ~DtxAck(){}
+    virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
+    const DeliveryRecords& getPending() const { return pending; }
+};
 
+}} // qpid::broker
 
-#endif
+#endif  /*!QPID_BROKER_DTXACK_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp Thu Aug 25 20:41:28 2011
@@ -23,8 +23,11 @@
 using namespace qpid::broker;
 using qpid::sys::Mutex;
 
-DtxBuffer::DtxBuffer(const std::string& _xid) 
-    : xid(_xid), ended(false), suspended(false), failed(false), expired(false) {}
+DtxBuffer::DtxBuffer(
+    const std::string& _xid,
+    bool ended_, bool suspended_, bool failed_, bool expired_)
+    : xid(_xid), ended(ended_), suspended(suspended_), failed(failed_), expired(expired_)
+{}
 
 DtxBuffer::~DtxBuffer() {}
 
@@ -34,7 +37,7 @@ void DtxBuffer::markEnded() 
     ended = true; 
 }
 
-bool DtxBuffer::isEnded() 
+bool DtxBuffer::isEnded() const
 { 
     Mutex::ScopedLock locker(lock); 
     return ended; 
@@ -45,7 +48,7 @@ void DtxBuffer::setSuspended(bool isSusp
     suspended = isSuspended; 
 }
 
-bool DtxBuffer::isSuspended() 
+bool DtxBuffer::isSuspended() const
 { 
     return suspended; 
 }
@@ -58,13 +61,13 @@ void DtxBuffer::fail()
     ended = true;
 }
 
-bool DtxBuffer::isRollbackOnly()
+bool DtxBuffer::isRollbackOnly() const
 {
     Mutex::ScopedLock locker(lock); 
     return failed;
 }
 
-const std::string& DtxBuffer::getXid()
+std::string DtxBuffer::getXid() const
 {
     return xid;
 }
@@ -76,8 +79,13 @@ void DtxBuffer::timedout()
     fail();
 }
 
-bool DtxBuffer::isExpired()
+bool DtxBuffer::isExpired() const
 {
     Mutex::ScopedLock locker(lock); 
     return expired;
 }
+
+bool DtxBuffer::isFailed() const
+{
+    return failed;
+}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h Thu Aug 25 20:41:28 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,31 +26,34 @@
 #include "qpid/sys/Mutex.h"
 
 namespace qpid {
-    namespace broker {
-        class DtxBuffer : public TxBuffer{
-            sys::Mutex lock;
-            const std::string xid;
-            bool ended;
-            bool suspended;           
-            bool failed;
-            bool expired;
-
-        public:
-            typedef boost::shared_ptr<DtxBuffer> shared_ptr;
-
-            QPID_BROKER_EXTERN DtxBuffer(const std::string& xid = "");
-            QPID_BROKER_EXTERN ~DtxBuffer();
-            QPID_BROKER_EXTERN void markEnded();
-            bool isEnded();
-            void setSuspended(bool suspended);
-            bool isSuspended();
-            void fail();
-            bool isRollbackOnly();
-            void timedout();
-            bool isExpired();
-            const std::string& getXid();
-        };
-    }
+namespace broker {
+class DtxBuffer : public TxBuffer{
+    mutable sys::Mutex lock;
+    const std::string xid;
+    bool ended;
+    bool suspended;
+    bool failed;
+    bool expired;
+
+  public:
+    typedef boost::shared_ptr<DtxBuffer> shared_ptr;
+
+    QPID_BROKER_EXTERN DtxBuffer(
+        const std::string& xid = "",
+        bool ended=false, bool suspended=false, bool failed=false, bool expired=false);
+    QPID_BROKER_EXTERN ~DtxBuffer();
+    QPID_BROKER_EXTERN void markEnded();
+    bool isEnded() const;
+    void setSuspended(bool suspended);
+    bool isSuspended() const;
+    void fail();
+    bool isRollbackOnly() const;
+    void timedout();
+    bool isExpired() const;
+    bool isFailed() const;
+    std::string getXid() const;
+};
+}
 }
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp Thu Aug 25 20:41:28 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,7 +34,7 @@ using qpid::ptr_map_ptr;
 using namespace qpid::broker;
 using namespace qpid::framing;
 
-DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(t) {}
+DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {}
 
 DtxManager::~DtxManager() {}
 
@@ -53,8 +53,8 @@ void DtxManager::recover(const std::stri
     createWork(xid)->recover(txn, ops);
 }
 
-bool DtxManager::prepare(const std::string& xid) 
-{ 
+bool DtxManager::prepare(const std::string& xid)
+{
     QPID_LOG(debug, "preparing: " << xid);
     try {
         return getWork(xid)->prepare();
@@ -64,8 +64,8 @@ bool DtxManager::prepare(const std::stri
     }
 }
 
-bool DtxManager::commit(const std::string& xid, bool onePhase) 
-{ 
+bool DtxManager::commit(const std::string& xid, bool onePhase)
+{
     QPID_LOG(debug, "committing: " << xid);
     try {
         bool result = getWork(xid)->commit(onePhase);
@@ -77,8 +77,8 @@ bool DtxManager::commit(const std::strin
     }
 }
 
-void DtxManager::rollback(const std::string& xid) 
-{ 
+void DtxManager::rollback(const std::string& xid)
+{
     QPID_LOG(debug, "rolling back: " << xid);
     try {
         getWork(xid)->rollback();
@@ -91,7 +91,7 @@ void DtxManager::rollback(const std::str
 
 DtxWorkRecord* DtxManager::getWork(const std::string& xid)
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     WorkMap::iterator i = work.find(xid);
     if (i == work.end()) {
         throw NotFoundException(QPID_MSG("Unrecognised xid " << xid));
@@ -99,9 +99,14 @@ DtxWorkRecord* DtxManager::getWork(const
     return ptr_map_ptr(i);
 }
 
+bool DtxManager::exists(const std::string& xid) {
+    Mutex::ScopedLock locker(lock);
+    return  work.find(xid) != work.end();
+}
+
 void DtxManager::remove(const std::string& xid)
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     WorkMap::iterator i = work.find(xid);
     if (i == work.end()) {
         throw NotFoundException(QPID_MSG("Unrecognised xid " << xid));
@@ -110,14 +115,15 @@ void DtxManager::remove(const std::strin
     }
 }
 
-DtxWorkRecord* DtxManager::createWork(std::string xid)
+DtxWorkRecord* DtxManager::createWork(const std::string& xid)
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     WorkMap::iterator i = work.find(xid);
     if (i != work.end()) {
         throw NotAllowedException(QPID_MSG("Xid " << xid << " is already known (use 'join' to add work to an existing xid)"));
     } else {
-      return ptr_map_ptr(work.insert(xid, new DtxWorkRecord(xid, store)).first);
+        std::string ncxid = xid; // Work around const correctness problems in ptr_map.
+        return ptr_map_ptr(work.insert(ncxid, new DtxWorkRecord(ncxid, store)).first);
     }
 }
 
@@ -131,7 +137,7 @@ void DtxManager::setTimeout(const std::s
     }
     timeout = intrusive_ptr<DtxTimeout>(new DtxTimeout(secs, *this, xid));
     record->setTimeout(timeout);
-    timer.add(timeout);
+    timer->add(timeout);
 }
 
 uint32_t DtxManager::getTimeout(const std::string& xid)
@@ -142,7 +148,7 @@ uint32_t DtxManager::getTimeout(const st
 
 void DtxManager::timedout(const std::string& xid)
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     WorkMap::iterator i = work.find(xid);
     if (i == work.end()) {
         QPID_LOG(warning, "Transaction timeout failed: no record for xid");
@@ -153,7 +159,7 @@ void DtxManager::timedout(const std::str
     }
 }
 
-DtxManager::DtxCleanup::DtxCleanup(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid) 
+DtxManager::DtxCleanup::DtxCleanup(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid)
     : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxCleanup"), mgr(_mgr), xid(_xid) {}
 
 void DtxManager::DtxCleanup::fire()

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h Thu Aug 25 20:41:28 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,7 +26,6 @@
 #include "qpid/broker/DtxWorkRecord.h"
 #include "qpid/broker/TransactionalStore.h"
 #include "qpid/framing/amqp_types.h"
-#include "qpid/sys/Timer.h"
 #include "qpid/sys/Mutex.h"
 
 namespace qpid {
@@ -39,22 +38,21 @@ class DtxManager{
     {
         DtxManager& mgr;
         const std::string& xid;
-        
-        DtxCleanup(uint32_t timeout, DtxManager& mgr, const std::string& xid);    
+
+        DtxCleanup(uint32_t timeout, DtxManager& mgr, const std::string& xid);
         void fire();
     };
 
     WorkMap work;
     TransactionalStore* store;
     qpid::sys::Mutex lock;
-    qpid::sys::Timer& timer;
+    qpid::sys::Timer* timer;
 
     void remove(const std::string& xid);
-    DtxWorkRecord* getWork(const std::string& xid);
-    DtxWorkRecord* createWork(std::string xid);
+    DtxWorkRecord* createWork(const std::string& xid);
 
 public:
-    DtxManager(qpid::sys::Timer&);
+    DtxManager(sys::Timer&);
     ~DtxManager();
     void start(const std::string& xid, DtxBuffer::shared_ptr work);
     void join(const std::string& xid, DtxBuffer::shared_ptr work);
@@ -66,6 +64,15 @@ public:
     uint32_t getTimeout(const std::string& xid);
     void timedout(const std::string& xid);
     void setStore(TransactionalStore* store);
+    void setTimer(sys::Timer& t) { timer = &t; }
+
+    // Used by cluster for replication.
+    template<class F> void each(F f) const {
+        for (WorkMap::const_iterator i = work.begin(); i != work.end(); ++i)
+            f(*i);
+    }
+    DtxWorkRecord* getWork(const std::string& xid);
+    bool exists(const std::string& xid);
 };
 
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp Thu Aug 25 20:41:28 2011
@@ -25,7 +25,7 @@
 using namespace qpid::broker;
 
 DtxTimeout::DtxTimeout(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid) 
-    : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxTimeout"), timeout(_timeout), mgr(_mgr), xid(_xid)
+    : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxTimeout-"+_xid), timeout(_timeout), mgr(_mgr), xid(_xid)
 {
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h Thu Aug 25 20:41:28 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,7 +29,9 @@ namespace broker {
 
 class DtxManager;
 
-struct DtxTimeoutException : public Exception {};
+struct DtxTimeoutException : public Exception {
+    DtxTimeoutException(const std::string& msg=std::string()) : Exception(msg) {}
+};
 
 struct DtxTimeout : public sys::TimerTask
 {
@@ -37,7 +39,7 @@ struct DtxTimeout : public sys::TimerTas
     DtxManager& mgr;
     const std::string xid;
 
-    DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid);    
+    DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid);
     void fire();
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp Thu Aug 25 20:41:28 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,19 +28,19 @@ using qpid::sys::Mutex;
 using namespace qpid::broker;
 using namespace qpid::framing;
 
-DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : 
+DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) :
     xid(_xid), store(_store), completed(false), rolledback(false), prepared(false), expired(false) {}
 
-DtxWorkRecord::~DtxWorkRecord() 
+DtxWorkRecord::~DtxWorkRecord()
 {
-    if (timeout.get()) {  
+    if (timeout.get()) {
         timeout->cancel();
     }
 }
 
 bool DtxWorkRecord::prepare()
 {
-    Mutex::ScopedLock locker(lock);     
+    Mutex::ScopedLock locker(lock);
     if (check()) {
         txn = store->begin(xid);
         if (prepare(txn.get())) {
@@ -68,7 +68,7 @@ bool DtxWorkRecord::prepare(TransactionC
 
 bool DtxWorkRecord::commit(bool onePhase)
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     if (check()) {
         if (prepared) {
             //already prepared i.e. 2pc
@@ -78,13 +78,13 @@ bool DtxWorkRecord::commit(bool onePhase
 
             store->commit(*txn);
             txn.reset();
-            
+
             std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
             return true;
         } else {
             //1pc commit optimisation, don't need a 2pc transaction context:
             if (!onePhase) {
-                throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!"));        
+                throw IllegalStateException(QPID_MSG("Branch with xid " << xid << " has not been prepared, one-phase option required!"));
             }
             std::auto_ptr<TransactionContext> localtxn = store->begin();
             if (prepare(localtxn.get())) {
@@ -107,16 +107,16 @@ bool DtxWorkRecord::commit(bool onePhase
 
 void DtxWorkRecord::rollback()
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     check();
     abort();
 }
 
 void DtxWorkRecord::add(DtxBuffer::shared_ptr ops)
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     if (expired) {
-        throw DtxTimeoutException();
+        throw DtxTimeoutException(QPID_MSG("Branch with xid " << xid << " has timed out."));
     }
     if (completed) {
         throw CommandInvalidException(QPID_MSG("Branch with xid " << xid << " has been completed!"));
@@ -163,7 +163,7 @@ void DtxWorkRecord::recover(std::auto_pt
 
 void DtxWorkRecord::timedout()
 {
-    Mutex::ScopedLock locker(lock); 
+    Mutex::ScopedLock locker(lock);
     expired = true;
     rolledback = true;
     if (!completed) {
@@ -175,3 +175,17 @@ void DtxWorkRecord::timedout()
     }
     abort();
 }
+
+size_t DtxWorkRecord::indexOf(const DtxBuffer::shared_ptr& buf) {
+    Work::iterator i = std::find(work.begin(), work.end(), buf);
+    if (i == work.end()) throw NotFoundException(
+        QPID_MSG("Can't find DTX buffer for xid: " << buf->getXid()));
+    return i - work.begin();
+}
+
+DtxBuffer::shared_ptr DtxWorkRecord::operator[](size_t i) const {
+    if (i > work.size())
+        throw NotFoundException(
+            QPID_MSG("Can't find DTX buffer " << i << " for xid: " << xid));
+    return work[i];
+}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h Thu Aug 25 20:41:28 2011
@@ -73,9 +73,19 @@ public:
     void timedout();
     void setTimeout(boost::intrusive_ptr<DtxTimeout> t) { timeout = t; }
     boost::intrusive_ptr<DtxTimeout> getTimeout() { return timeout; }
+    std::string getXid() const { return xid; }
+    bool isCompleted() const { return completed; }
+    bool isRolledback() const { return rolledback; }
+    bool isPrepared() const { return prepared; }
+    bool isExpired() const { return expired; }
+
+    // Used by cluster update;
+    size_t size() const { return work.size(); }
+    DtxBuffer::shared_ptr operator[](size_t i) const;
+    uint32_t getTimeout() const { return timeout? timeout->timeout : 0; }
+    size_t indexOf(const DtxBuffer::shared_ptr&);
 };
 
-}
-}
+}} // qpid::broker
 
 #endif

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu Aug 25 20:41:28 2011
@@ -170,8 +170,8 @@ void SemanticState::startDtx(const std::
     if (!dtxSelected) {
         throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx"));
     }
-    dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid));
-    txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer);
+    dtxBuffer.reset(new DtxBuffer(xid));
+    txBuffer = dtxBuffer;
     if (join) {
         mgr.join(xid, dtxBuffer);
     } else {
@@ -239,7 +239,7 @@ void SemanticState::resumeDtx(const std:
 
     checkDtxTimeout();
     dtxBuffer->setSuspended(false);
-    txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer);
+    txBuffer = dtxBuffer;
 }
 
 void SemanticState::checkDtxTimeout()

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Thu Aug 25 20:41:28 2011
@@ -65,7 +65,7 @@ class SessionContext;
  *
  * Message delivery is driven by ConsumerImpl::doOutput(), which is
  * called when a client's socket is ready to write data.
- * 
+ *
  */
 class SemanticState : private boost::noncopyable {
   public:
@@ -99,15 +99,15 @@ class SemanticState : private boost::non
       public:
         typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
 
-        ConsumerImpl(SemanticState* parent, 
+        ConsumerImpl(SemanticState* parent,
                      const std::string& name, boost::shared_ptr<Queue> queue,
                      bool ack, bool acquire, bool exclusive,
                      const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
         ~ConsumerImpl();
         OwnershipToken* getSession();
-        bool deliver(QueuedMessage& msg);            
-        bool filter(boost::intrusive_ptr<Message> msg);            
-        bool accept(boost::intrusive_ptr<Message> msg);            
+        bool deliver(QueuedMessage& msg);
+        bool filter(boost::intrusive_ptr<Message> msg);
+        bool accept(boost::intrusive_ptr<Message> msg);
 
         void disableNotify();
         void enableNotify();
@@ -122,7 +122,7 @@ class SemanticState : private boost::non
         void addMessageCredit(uint32_t value);
         void flush();
         void stop();
-        void complete(DeliveryRecord&);    
+        void complete(DeliveryRecord&);
         boost::shared_ptr<Queue> getQueue() const { return queue; }
         bool isBlocked() const { return blocked; }
         bool setBlocked(bool set) { std::swap(set, blocked); return set; }
@@ -188,7 +188,7 @@ class SemanticState : private boost::non
     const SessionContext& getSession() const { return session; }
 
     ConsumerImpl& find(const std::string& destination);
-    
+
     /**
      * Get named queue, never returns 0.
      * @return: named queue
@@ -196,11 +196,11 @@ class SemanticState : private boost::non
      * @exception: ConnectionException if name="" and session has no default.
      */
     boost::shared_ptr<Queue> getQueue(const std::string& name) const;
-    
+
     bool exists(const std::string& consumerTag);
 
-    void consume(const std::string& destination, 
-                 boost::shared_ptr<Queue> queue, 
+    void consume(const std::string& destination,
+                 boost::shared_ptr<Queue> queue,
                  bool ackRequired, bool acquire, bool exclusive,
                  const std::string& resumeId=std::string(), uint64_t resumeTtl=0,
                  const framing::FieldTable& = framing::FieldTable());
@@ -223,7 +223,7 @@ class SemanticState : private boost::non
     void suspendDtx(const std::string& xid);
     void resumeDtx(const std::string& xid);
     void recover(bool requeue);
-    void deliver(DeliveryRecord& message, bool sync);            
+    void deliver(DeliveryRecord& message, bool sync);
     void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired);
     void release(DeliveryId first, DeliveryId last, bool setRedelivered);
     void reject(DeliveryId first, DeliveryId last);
@@ -244,7 +244,9 @@ class SemanticState : private boost::non
     DeliveryRecords& getUnacked() { return unacked; }
     framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; }
     TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; }
+    DtxBuffer::shared_ptr getDtxBuffer() const { return dtxBuffer; }
     void setTxBuffer(const TxBuffer::shared_ptr& txb) { txBuffer = txb; }
+    void setDtxBuffer(const DtxBuffer::shared_ptr& dtxb) { dtxBuffer = dtxb; txBuffer = dtxb; }
     void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; }
     void record(const DeliveryRecord& delivery);
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp Thu Aug 25 20:41:28 2011
@@ -76,5 +76,5 @@ bool TxBuffer::commitLocal(Transactional
 }
 
 void TxBuffer::accept(TxOpConstVisitor& v) const {
-    std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v))); 
+    std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v)));
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h Thu Aug 25 20:41:28 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,57 +34,58 @@
 #include <boost/intrusive_ptr.hpp>
 
 namespace qpid {
-    namespace broker {
-        /**
-         * Defines the behaviour for publish operations on a
-         * transactional channel. Messages are routed through
-         * exchanges when received but are not at that stage delivered
-         * to the matching queues, rather the queues are held in an
-         * instance of this class. On prepare() the message is marked
-         * enqueued to the relevant queues in the MessagesStore. On
-         * commit() the messages will be passed to the queue for
-         * dispatch or to be added to the in-memory queue.
-         */
-        class QPID_BROKER_CLASS_EXTERN TxPublish : public TxOp, public Deliverable{
-
-            class Commit{
-                boost::intrusive_ptr<Message>& msg;
-            public:
-                Commit(boost::intrusive_ptr<Message>& msg);
-                void operator()(const boost::shared_ptr<Queue>& queue);            
-            };
-            class Rollback{
-                boost::intrusive_ptr<Message>& msg;
-            public:
-                Rollback(boost::intrusive_ptr<Message>& msg);
-                void operator()(const boost::shared_ptr<Queue>& queue);            
-            };
-
-            boost::intrusive_ptr<Message> msg;
-             std::list<boost::shared_ptr<Queue> > queues;
-            std::list<boost::shared_ptr<Queue> > prepared;
-
-            void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>);
-
-        public:
-            QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg);
-            QPID_BROKER_EXTERN virtual bool prepare(TransactionContext* ctxt) throw();
-            QPID_BROKER_EXTERN virtual void commit() throw();
-            QPID_BROKER_EXTERN virtual void rollback() throw();
-
-	    virtual Message& getMessage() { return *msg; };
-            
-            QPID_BROKER_EXTERN virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
-
-            virtual ~TxPublish(){}
-            virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
-
-            QPID_BROKER_EXTERN uint64_t contentSize();
-
-            boost::intrusive_ptr<Message> getMessage() const { return msg; }
-            const std::list<boost::shared_ptr<Queue> > getQueues() const { return queues; }
-        };
-    }
+namespace broker {
+/**
+ * Defines the behaviour for publish operations on a
+ * transactional channel. Messages are routed through
+ * exchanges when received but are not at that stage delivered
+ * to the matching queues, rather the queues are held in an
+ * instance of this class. On prepare() the message is marked
+ * enqueued to the relevant queues in the MessagesStore. On
+ * commit() the messages will be passed to the queue for
+ * dispatch or to be added to the in-memory queue.
+ */
+class QPID_BROKER_CLASS_EXTERN TxPublish : public TxOp, public Deliverable{
+
+    class Commit{
+        boost::intrusive_ptr<Message>& msg;
+      public:
+        Commit(boost::intrusive_ptr<Message>& msg);
+        void operator()(const boost::shared_ptr<Queue>& queue);
+    };
+    class Rollback{
+        boost::intrusive_ptr<Message>& msg;
+      public:
+        Rollback(boost::intrusive_ptr<Message>& msg);
+        void operator()(const boost::shared_ptr<Queue>& queue);
+    };
+
+    boost::intrusive_ptr<Message> msg;
+    std::list<boost::shared_ptr<Queue> > queues;
+    std::list<boost::shared_ptr<Queue> > prepared;
+
+    void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>);
+
+  public:
+    QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg);
+    QPID_BROKER_EXTERN virtual bool prepare(TransactionContext* ctxt) throw();
+    QPID_BROKER_EXTERN virtual void commit() throw();
+    QPID_BROKER_EXTERN virtual void rollback() throw();
+
+    virtual Message& getMessage() { return *msg; };
+
+    QPID_BROKER_EXTERN virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
+
+    virtual ~TxPublish(){}
+    virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
+
+    QPID_BROKER_EXTERN uint64_t contentSize();
+
+    boost::intrusive_ptr<Message> getMessage() const { return msg; }
+    const std::list<boost::shared_ptr<Queue> >& getQueues() const { return queues; }
+    const std::list<boost::shared_ptr<Queue> >& getPrepared() const { return prepared; }
+};
+}
 }
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Aug 25 20:41:28 2011
@@ -57,12 +57,12 @@
  * - management::ManagementBroker: uses MessageHandler supplied by  cluster
  *   to send messages to the broker via the cluster.
  *
- * - Dtx: not yet supported with cluster.
- *
- * cluster::ExpiryPolicy implements the strategy for message expiry.
+ * cluster::ExpiryPolicy uses cluster time.
  *
  * ClusterTimer implements periodic timed events in the cluster context.
- * Used for periodic management events.
+ * Used for:
+ * - periodic management events.
+ * - DTX transaction timeouts.
  *
  * <h1>CLUSTER PROTOCOL OVERVIEW</h1>
  *
@@ -199,7 +199,7 @@ namespace _qmf = ::qmf::org::apache::qpi
  * Currently use SVN revision to avoid clashes with versions from
  * different branches.
  */
-const uint32_t Cluster::CLUSTER_VERSION = 1128070;
+const uint32_t Cluster::CLUSTER_VERSION = 1159329;
 
 struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
     qpid::cluster::Cluster& cluster;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Thu Aug 25 20:41:28 2011
@@ -24,6 +24,8 @@
 #include "Cluster.h"
 #include "UpdateReceiver.h"
 #include "qpid/assert.h"
+#include "qpid/broker/DtxAck.h"
+#include "qpid/broker/DtxBuffer.h"
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/SemanticState.h"
 #include "qpid/broker/TxBuffer.h"
@@ -114,7 +116,7 @@ Connection::Connection(Cluster& c, sys::
         if (!updateIn.nextShadowMgmtId.empty())
             connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
         updateIn.nextShadowMgmtId.clear();
-     }
+    }
     init();
     QPID_LOG(debug, cluster << " local connection " << *this);
 }
@@ -167,7 +169,7 @@ void Connection::announce(
         AMQFrame frame;
         while (frame.decode(buf))
             connection->received(frame);
-         connection->setUserId(username);
+        connection->setUserId(username);
     }
     // Do managment actions now that the connection is replicated.
     connection->raiseConnectEvent();
@@ -214,16 +216,9 @@ void Connection::received(framing::AMQFr
     }
 }
 
-bool Connection::checkUnsupported(const AMQBody& body) {
-    std::string message;
-    if (body.getMethod()) {
-        switch (body.getMethod()->amqpClassId()) {
-          case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break;
-        }
-    }
-    if (!message.empty())
-        connection->close(connection::CLOSE_CODE_FRAMING_ERROR, message);
-    return !message.empty();
+bool Connection::checkUnsupported(const AMQBody&) {
+    // Throw an exception for unsupported commands. Currently all are supported.
+    return false;
 }
 
 struct GiveReadCreditOnExit {
@@ -464,11 +459,21 @@ void Connection::shadowReady(
     output.setSendMax(sendMax);
 }
 
+void Connection::setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &v) {
+    broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
+    broker::DtxWorkRecord* record = mgr.getWork(v.first.first); // XID
+    uint32_t index = v.first.second; // Index
+    v.second->setDtxBuffer((*record)[index]);
+}
+
+// Marks the end of the update.
 void Connection::membership(const FieldTable& joiners, const FieldTable& members,
                             const framing::SequenceNumber& frameSeq)
 {
     QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
     updateIn.consumerNumbering.clear();
+    for_each(updateIn.dtxBuffers.begin(), updateIn.dtxBuffers.end(),
+             boost::bind(&Connection::setDtxBuffer, this, _1));
     closeUpdated();
     cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
 }
@@ -536,8 +541,16 @@ void Connection::deliveryRecord(const st
         } else {                // Message at original position in original queue
             m = queue->find(position);
         }
-        if (!m.payload)
-            throw Exception(QPID_MSG("deliveryRecord no update message"));
+        // FIXME aconway 2011-08-19: removed:
+        // if (!m.payload)
+        //      throw Exception(QPID_MSG("deliveryRecord no update message"));
+        //
+        // It seems this could happen legitimately in the case one
+        // session browses message M, then another session acquires
+        // it. In that case the browsers delivery record is !acquired
+        // but the message is not on its original Queue. In that case
+        // we'll get a deliveryRecord with no payload for the browser.
+        //
     }
 
     broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit);
@@ -545,7 +558,11 @@ void Connection::deliveryRecord(const st
     if (cancelled) dr.cancel(dr.getTag());
     if (completed) dr.complete();
     if (ended) dr.setEnded();   // Exsitance of message
-    semanticState().record(dr); // Part of the session's unacked list.
+
+    if (dtxBuffer)              // Record for next dtx-ack
+        dtxAckRecords.push_back(dr);
+    else
+        semanticState().record(dr); // Record on session's unacked list.
 }
 
 void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
@@ -561,29 +578,29 @@ void Connection::queueFairshareState(con
 
 
 namespace {
-    // find a StatefulQueueObserver that matches a given identifier
-    class ObserverFinder {
-        const std::string id;
-        boost::shared_ptr<broker::QueueObserver> target;
-        ObserverFinder(const ObserverFinder&) {}
-    public:
-        ObserverFinder(const std::string& _id) : id(_id) {}
-        broker::StatefulQueueObserver *getObserver()
-        {
-            if (target)
-                return dynamic_cast<broker::StatefulQueueObserver *>(target.get());
-            return 0;
-        }
-        void operator() (boost::shared_ptr<broker::QueueObserver> o)
-        {
-            if (!target) {
-                broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
-                if (p && p->getId() == id) {
-                    target = o;
-                }
+// find a StatefulQueueObserver that matches a given identifier
+class ObserverFinder {
+    const std::string id;
+    boost::shared_ptr<broker::QueueObserver> target;
+    ObserverFinder(const ObserverFinder&) {}
+  public:
+    ObserverFinder(const std::string& _id) : id(_id) {}
+    broker::StatefulQueueObserver *getObserver()
+    {
+        if (target)
+            return dynamic_cast<broker::StatefulQueueObserver *>(target.get());
+        return 0;
+    }
+    void operator() (boost::shared_ptr<broker::QueueObserver> o)
+    {
+        if (!target) {
+            broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
+            if (p && p->getId() == id) {
+                target = o;
             }
         }
-    };
+    }
+};
 }
 
 
@@ -615,6 +632,7 @@ std::ostream& operator<<(std::ostream& o
 void Connection::txStart() {
     txBuffer.reset(new broker::TxBuffer());
 }
+
 void Connection::txAccept(const framing::SequenceSet& acked) {
     txBuffer->enlist(boost::shared_ptr<broker::TxAccept>(
                          new broker::TxAccept(acked, semanticState().getUnacked())));
@@ -630,8 +648,10 @@ void Connection::txEnqueue(const std::st
                          new broker::RecoveredEnqueue(findQueue(queue), getUpdateMessage().payload)));
 }
 
-void Connection::txPublish(const framing::Array& queues, bool delivered) {
-    boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload));
+void Connection::txPublish(const framing::Array& queues, bool delivered)
+{
+    boost::shared_ptr<broker::TxPublish> txPub(
+        new broker::TxPublish(getUpdateMessage().payload));
     for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i)
         txPub->deliverTo(findQueue((*i)->get<std::string>()));
     txPub->delivered = delivered;
@@ -646,6 +666,50 @@ void Connection::accumulatedAck(const qp
     semanticState().setAccumulatedAck(s);
 }
 
+void Connection::dtxStart(const std::string& xid,
+                          bool ended,
+                          bool suspended,
+                          bool failed,
+                          bool expired)
+{
+    dtxBuffer.reset(new broker::DtxBuffer(xid, ended, suspended, failed, expired));
+    txBuffer = dtxBuffer;
+}
+
+void Connection::dtxEnd() {
+    broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
+    std::string xid = dtxBuffer->getXid();
+    if (mgr.exists(xid))
+        mgr.join(xid, dtxBuffer);
+    else
+        mgr.start(xid, dtxBuffer);
+    dtxBuffer.reset();
+    txBuffer.reset();
+}
+
+// Sent after all DeliveryRecords for a dtx-ack have been collected in dtxAckRecords
+void Connection::dtxAck() {
+    dtxBuffer->enlist(
+        boost::shared_ptr<broker::DtxAck>(new broker::DtxAck(dtxAckRecords)));
+    dtxAckRecords.clear();
+}
+
+void Connection::dtxBufferRef(const std::string& xid, uint32_t index) {
+    // Save the association between DtxBuffer and session so we can
+    // set the DtxBuffer on the session at the end of the update
+    // when the DtxManager has been replicated.
+    updateIn.dtxBuffers[std::make_pair(xid, index)] = &semanticState();
+}
+
+// Sent at end of work record.
+void Connection::dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout)
+{
+    broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
+    if (timeout) mgr.setTimeout(xid, timeout);
+    if (prepared) mgr.prepare(xid);
+}
+
+
 void Connection::exchange(const std::string& encoded) {
     Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
     broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Thu Aug 25 20:41:28 2011
@@ -29,6 +29,7 @@
 
 #include "qpid/RefCounted.h"
 #include "qpid/broker/Connection.h"
+#include "qpid/broker/DeliveryRecord.h"
 #include "qpid/broker/SecureConnection.h"
 #include "qpid/broker/SemanticState.h"
 #include "qpid/amqp_0_10/Connection.h"
@@ -164,6 +165,17 @@ class Connection :
     void txEnd();
     void accumulatedAck(const framing::SequenceSet&);
 
+    // Dtx state
+    void dtxStart(const std::string& xid,
+                  bool ended,
+                  bool suspended,
+                  bool failed,
+                  bool expired);
+    void dtxEnd();
+    void dtxAck();
+    void dtxBufferRef(const std::string& xid, uint32_t index);
+    void dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout);
+
     // Encoded exchange replication.
     void exchange(const std::string& encoded);
 
@@ -251,7 +263,7 @@ class Connection :
     broker::SemanticState& semanticState();
     broker::QueuedMessage getUpdateMessage();
     void closeUpdated();
-
+    void setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &);
     Cluster& cluster;
     ConnectionId self;
     bool catchUp;
@@ -263,6 +275,9 @@ class Connection :
     framing::SequenceNumber deliverSeq;
     framing::ChannelId currentChannel;
     boost::shared_ptr<broker::TxBuffer> txBuffer;
+    boost::shared_ptr<broker::DtxBuffer> dtxBuffer;
+    broker::DeliveryRecords dtxAckRecords;
+    broker::DtxWorkRecord* dtxCurrent;
     bool expectProtocolHeader;
     McastFrameHandler mcastFrameHandler;
     UpdateReceiver& updateIn;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Thu Aug 25 20:41:28 2011
@@ -45,6 +45,8 @@
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/TxOpVisitor.h"
 #include "qpid/broker/DtxAck.h"
+#include "qpid/broker/DtxBuffer.h"
+#include "qpid/broker/DtxWorkRecord.h"
 #include "qpid/broker/TxAccept.h"
 #include "qpid/broker/TxPublish.h"
 #include "qpid/broker/RecoveredDequeue.h"
@@ -65,6 +67,7 @@
 #include <boost/bind.hpp>
 #include <boost/cast.hpp>
 #include <algorithm>
+#include <iterator>
 #include <sstream>
 
 namespace qpid {
@@ -177,9 +180,9 @@ void UpdateClient::update() {
     // longer on their original queue.
     session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
     session.sync();
+
     std::for_each(connections.begin(), connections.end(),
                   boost::bind(&UpdateClient::updateConnection, this, _1));
-    session.queueDelete(arg::queue=UPDATE);
 
     // some Queue Observers need session state & msgs synced first, so sync observers now
     b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueObservers, this, _1));
@@ -189,6 +192,8 @@ void UpdateClient::update() {
 
     updateLinks();
     updateManagementAgent();
+    updateDtxManager();
+    session.queueDelete(arg::queue=UPDATE);
 
     session.close();
 
@@ -356,7 +361,8 @@ class MessageUpdater {
             for (uint64_t offset = 0; morecontent; offset += maxContentSize)
             {
                 AMQFrame frame((AMQContentBody()));
-                morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset);
+                morecontent = message.payload->getContentFrame(
+                    *(message.queue), frame, maxContentSize, offset);
                 sb.get()->sendRawFrame(frame);
             }
         }
@@ -479,9 +485,9 @@ void UpdateClient::updateSession(broker:
     QPID_LOG(debug, *this << " updating unacknowledged messages.");
     broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
     std::for_each(drs.begin(), drs.end(),
-                  boost::bind(&UpdateClient::updateUnacked, this, _1));
+                  boost::bind(&UpdateClient::updateUnacked, this, _1, shadowSession));
 
-    updateTxState(ss->getSemanticState());           // Tx transaction state.
+    updateTransactionState(ss->getSemanticState());
 
     // Adjust command counter for message in progress, will be sent after state update.
     boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
@@ -542,14 +548,18 @@ void UpdateClient::updateConsumer(
              << " on " << shadowSession.getId());
 }
 
-void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) {
-    if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) {
+void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr,
+                                 client::AsyncSession& updateSession)
+{
+    if (!dr.isEnded() && dr.isAcquired()) {
+        // FIXME aconway 2011-08-19: should this be assert or if?
+        assert(dr.getMessage().payload);
         // If the message is acquired then it is no longer on the
         // updatees queue, put it on the update queue for updatee to pick up.
         //
-        MessageUpdater(UPDATE, shadowSession, expiry).updateQueuedMessage(dr.getMessage());
+        MessageUpdater(UPDATE, updateSession, expiry).updateQueuedMessage(dr.getMessage());
     }
-    ClusterConnectionProxy(shadowSession).deliveryRecord(
+    ClusterConnectionProxy(updateSession).deliveryRecord(
         dr.getQueue()->getName(),
         dr.getMessage().position,
         dr.getTag(),
@@ -570,8 +580,10 @@ class TxOpUpdater : public broker::TxOpC
     TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry)
         : MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc), session(s), proxy(s) {}
 
-    void operator()(const broker::DtxAck& ) {
-        throw InternalErrorException("DTX transactions not currently supported by cluster.");
+    void operator()(const broker::DtxAck& ack) {
+        std::for_each(ack.getPending().begin(), ack.getPending().end(),
+                      boost::bind(&UpdateClient::updateUnacked, &parent, _1, session));
+        proxy.dtxAck();
     }
 
     void operator()(const broker::RecoveredDequeue& rdeq) {
@@ -588,13 +600,18 @@ class TxOpUpdater : public broker::TxOpC
         proxy.txAccept(txAccept.getAcked());
     }
 
+    typedef std::list<Queue::shared_ptr> QueueList;
+
+    void copy(const QueueList& l, Array& a) {
+        for (QueueList::const_iterator i = l.begin(); i!=l.end(); ++i)
+            a.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
+    }
+
     void operator()(const broker::TxPublish& txPub) {
         updateMessage(txPub.getMessage());
-        typedef std::list<Queue::shared_ptr> QueueList;
-        const QueueList& qlist = txPub.getQueues();
+        assert(txPub.getQueues().empty() || txPub.getPrepared().empty());
         Array qarray(TYPE_CODE_STR8);
-        for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i)
-            qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
+        copy(txPub.getQueues().empty() ? txPub.getPrepared() : txPub.getQueues(), qarray);
         proxy.txPublish(qarray, txPub.delivered);
     }
 
@@ -604,19 +621,33 @@ class TxOpUpdater : public broker::TxOpC
     ClusterConnectionProxy proxy;
 };
 
-void UpdateClient::updateTxState(broker::SemanticState& s) {
-    QPID_LOG(debug, *this << " updating TX transaction state.");
+void UpdateClient::updateTransactionState(broker::SemanticState& s) {
+    broker::TxBuffer::shared_ptr tx = s.getTxBuffer();
+    broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer();
     ClusterConnectionProxy proxy(shadowSession);
     proxy.accumulatedAck(s.getAccumulatedAck());
-    broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
-    if (txBuffer) {
+    if (dtx) {
+        broker::DtxWorkRecord* record =
+            updaterBroker.getDtxManager().getWork(dtx->getXid()); // throws if not found
+        proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx));
+    } else if (tx) {
+        ClusterConnectionProxy proxy(shadowSession);
         proxy.txStart();
         TxOpUpdater updater(*this, shadowSession, expiry);
-        txBuffer->accept(updater);
+        tx->accept(updater);
         proxy.txEnd();
     }
 }
 
+void UpdateClient::updateDtxBuffer(const broker::DtxBuffer::shared_ptr& dtx) {
+    ClusterConnectionProxy proxy(session);
+    proxy.dtxStart(
+        dtx->getXid(), dtx->isEnded(), dtx->isSuspended(), dtx->isFailed(), dtx->isExpired());
+    TxOpUpdater updater(*this, session, expiry);
+    dtx->accept(updater);
+    proxy.dtxEnd();
+}
+
 void UpdateClient::updateQueueListeners(const boost::shared_ptr<broker::Queue>& queue) {
     queue->getListeners().eachListener(
         boost::bind(&UpdateClient::updateQueueListener, this, queue->getName(), _1));
@@ -667,5 +698,17 @@ void UpdateClient::updateObserver(const 
     }
 }
 
+void UpdateClient::updateDtxManager() {
+    broker::DtxManager& dtm = updaterBroker.getDtxManager();
+    dtm.each(boost::bind(&UpdateClient::updateDtxWorkRecord, this, _1));
+}
+
+void UpdateClient::updateDtxWorkRecord(const broker::DtxWorkRecord& r) {
+    QPID_LOG(debug, *this << " updating DTX transaction: " << r.getXid());
+    for (size_t i = 0; i < r.size(); ++i)
+        updateDtxBuffer(r[i]);
+    ClusterConnectionProxy(session).dtxWorkRecord(
+        r.getXid(), r.isPrepared(), r.getTimeout());
+}
 
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Thu Aug 25 20:41:28 2011
@@ -52,7 +52,7 @@ class Decoder;
 class Link;
 class Bridge;
 class QueueObserver;
-
+class DtxBuffer;
 } // namespace broker
 
 namespace cluster {
@@ -88,7 +88,7 @@ class UpdateClient : public sys::Runnabl
     void update();
     void run();                 // Will delete this when finished.
 
-    void updateUnacked(const broker::DeliveryRecord&);
+    void updateUnacked(const broker::DeliveryRecord&, client::AsyncSession&);
 
   private:
     void updateQueue(client::AsyncSession&, const boost::shared_ptr<broker::Queue>&);
@@ -100,7 +100,7 @@ class UpdateClient : public sys::Runnabl
     void updateBinding(client::AsyncSession&, const std::string& queue, const broker::QueueBinding& binding);
     void updateConnection(const boost::intrusive_ptr<Connection>& connection);
     void updateSession(broker::SessionHandler& s);
-    void updateTxState(broker::SemanticState& s);
+    void updateTransactionState(broker::SemanticState& s);
     void updateOutputTask(const sys::OutputTask* task);
     void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&);
     void updateQueueListeners(const boost::shared_ptr<broker::Queue>&);
@@ -112,6 +112,9 @@ class UpdateClient : public sys::Runnabl
     void updateBridge(const boost::shared_ptr<broker::Bridge>&);
     void updateQueueObservers(const boost::shared_ptr<broker::Queue>&);
     void updateObserver(const boost::shared_ptr<broker::Queue>&, boost::shared_ptr<broker::QueueObserver>);
+    void updateDtxManager();
+    void updateDtxBuffer(const boost::shared_ptr<broker::DtxBuffer>& );
+    void updateDtxWorkRecord(const broker::DtxWorkRecord&);
 
 
     Numbering<broker::SemanticState::ConsumerImpl*> consumerNumbering;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h Thu Aug 25 20:41:28 2011
@@ -39,6 +39,13 @@ class UpdateReceiver {
 
     /** Management-id for the next shadow connection */
     std::string nextShadowMgmtId;
+
+    /** Relationship between DtxBuffers, identified by xid, index in DtxManager,
+     * and sessions represented by their SemanticState.
+     */
+    typedef std::pair<std::string, uint32_t> DtxBufferRef;
+    typedef std::map<DtxBufferRef, broker::SemanticState* > DtxBuffers;
+    DtxBuffers dtxBuffers;
 };
 }} // namespace qpid::cluster
 

Modified: qpid/trunk/qpid/cpp/src/qpid/log/Statement.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/log/Statement.cpp?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/log/Statement.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/log/Statement.cpp Thu Aug 25 20:41:28 2011
@@ -24,35 +24,9 @@
 #include <ctype.h>
 
 namespace qpid {
+std::string quote(const std::string& str); // Defined in Msg.cpp
 namespace log {
 
-namespace {
-using namespace std;
-
-struct NonPrint { bool operator()(unsigned char c) { return !isprint(c) && !isspace(c); } };
-
-const char hex[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' };
-
-std::string quote(const std::string& str) {
-    NonPrint nonPrint;
-    size_t n = std::count_if(str.begin(), str.end(), nonPrint);
-    if (n==0) return str;
-    std::string ret;
-    ret.reserve(str.size()+2*n); // Avoid extra allocations.
-    for (string::const_iterator i = str.begin(); i != str.end(); ++i) {
-        if (nonPrint(*i)) {
-            ret.push_back('\\');
-            ret.push_back('x');
-            ret.push_back(hex[((*i) >> 4)&0xf]);
-            ret.push_back(hex[(*i) & 0xf]);
-        }
-        else ret.push_back(*i);
-    }
-    return ret;
-}
-
-}
-
 void Statement::log(const std::string& message) {
     Logger::instance().log(*this, quote(message));
 }

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Thu Aug 25 20:41:28 2011
@@ -493,9 +493,7 @@ class BrokerTest(TestCase):
         return cluster
 
     def browse(self, session, queue, timeout=0):
-        """Assert that the contents of messages on queue (as retrieved
-        using session and timeout) exactly match the strings in
-        expect_contents"""
+        """Return a list with the contents of each message on queue."""
         r = session.receiver("%s;{mode:browse}"%(queue))
         try:
             contents = []

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_python_tests_failing.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_python_tests_failing.txt?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_python_tests_failing.txt (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_python_tests_failing.txt Thu Aug 25 20:41:28 2011
@@ -1,32 +1,4 @@
 qpid_tests.broker_0_10.management.ManagementTest.test_purge_queue
 qpid_tests.broker_0_10.management.ManagementTest.test_connection_close
-qpid_tests.broker_0_10.dtx.DtxTests.test_bad_resume
-qpid_tests.broker_0_10.dtx.DtxTests.test_commit_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_end
-qpid_tests.broker_0_10.dtx.DtxTests.test_end_suspend_and_fail
-qpid_tests.broker_0_10.dtx.DtxTests.test_end_unknown_xid
-qpid_tests.broker_0_10.dtx.DtxTests.test_forget_xid_on_completion
-qpid_tests.broker_0_10.dtx.DtxTests.test_get_timeout
-qpid_tests.broker_0_10.dtx.DtxTests.test_get_timeout_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_implicit_end
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_not_ended
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_one_phase_false
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_commit_one_phase_true
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_prepare_not_ended
-qpid_tests.broker_0_10.dtx.DtxTests.test_invalid_rollback_not_ended
-qpid_tests.broker_0_10.dtx.DtxTests.test_prepare_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_recover
-qpid_tests.broker_0_10.dtx.DtxTests.test_rollback_unknown
-qpid_tests.broker_0_10.dtx.DtxTests.test_select_required
-qpid_tests.broker_0_10.dtx.DtxTests.test_set_timeout
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_commit
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_prepare_commit
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_prepare_rollback
-qpid_tests.broker_0_10.dtx.DtxTests.test_simple_rollback
-qpid_tests.broker_0_10.dtx.DtxTests.test_start_already_known
-qpid_tests.broker_0_10.dtx.DtxTests.test_start_join
-qpid_tests.broker_0_10.dtx.DtxTests.test_start_join_and_resume
-qpid_tests.broker_0_10.dtx.DtxTests.test_suspend_resume
-qpid_tests.broker_0_10.dtx.DtxTests.test_suspend_start_end_resume
 qpid_tests.broker_0_10.message.MessageTests.test_ttl
 qpid_tests.broker_0_10.management.ManagementTest.test_broker_connectivity_oldAPI

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Thu Aug 25 20:41:28 2011
@@ -713,6 +713,204 @@ acl allow all all
         cluster.start()
         fetch(cluster[2])
 
+# Some utility code for transaction tests
+XA_RBROLLBACK = 1
+XA_RBTIMEOUT = 2
+XA_OK = 0
+dtx_branch_counter = 0
+
+class DtxTestFixture:
+    """Bundle together some common requirements for dtx tests."""
+    def __init__(self, test, broker, name, exclusive=False):
+        self.test = test
+        self.broker = broker
+        self.name = name
+        # Use old API. DTX is not supported in messaging API.
+        self.connection = broker.connect_old()
+        self.session = self.connection.session(name, 1) # 1 second timeout
+        self.queue = self.session.queue_declare(name, exclusive=exclusive)
+        self.xid = self.session.xid(format=0, global_id=name)
+        self.session.dtx_select()
+        self.consumer = None
+
+    def start(self):
+        self.test.assertEqual(XA_OK, self.session.dtx_start(xid=self.xid).status)
+
+    def end(self):
+        self.test.assertEqual(XA_OK, self.session.dtx_end(xid=self.xid).status)
+
+    def prepare(self):
+        self.test.assertEqual(XA_OK, self.session.dtx_prepare(xid=self.xid).status)
+
+    def commit(self, one_phase=True):
+        self.test.assertEqual(
+            XA_OK, self.session.dtx_commit(xid=self.xid, one_phase=one_phase).status)
+
+    def rollback(self):
+        self.test.assertEqual(XA_OK, self.session.dtx_rollback(xid=self.xid).status)
+
+    def send(self, messages):
+       for m in messages:
+           dp=self.session.delivery_properties(routing_key=self.name)
+           mp=self.session.message_properties()
+           self.session.message_transfer(message=qpid.datatypes.Message(dp, mp, m))
+
+    def accept(self):
+        """Accept 1 message from queue"""
+        consumer_tag="%s-consumer"%(self.name)
+        self.session.message_subscribe(queue=self.name, destination=consumer_tag)
+        self.session.message_flow(unit = self.session.credit_unit.message, value = 1, destination = consumer_tag)
+        self.session.message_flow(unit = self.session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag)
+        msg = self.session.incoming(consumer_tag).get(timeout=1)
+        self.session.message_cancel(destination=consumer_tag)
+        self.session.message_accept(qpid.datatypes.RangedSet(msg.id))
+        return msg
+
+
+    def verify(self, cluster, messages):
+        for b in cluster:
+            self.test.assert_browse(b.connect().session(), self.name, messages)
+
+
+class DtxTests(BrokerTest):
+
+    def test_dtx_update(self):
+        """Verify that DTX transaction state is updated to a new broker.
+        Start a collection of transactions, then add a new cluster member,
+        then verify they commit/rollback correctly on the new broker."""
+
+        # Note: multiple test have been bundled into one to avoid the need to start/stop
+        # multiple brokers per test.
+
+        cluster=self.cluster(1)
+
+        # Transaction that will be open when new member joins, then committed.
+        t1 = DtxTestFixture(self, cluster[0], "t1")
+        t1.start()
+        t1.send(["1", "2"])
+        t1.verify(cluster, [])          # Not visible outside of transaction
+
+        # Transaction that will be open when  new member joins, then rolled back.
+        t2 = DtxTestFixture(self, cluster[0], "t2")
+        t2.start()
+        t2.send(["1", "2"])
+
+        # Transaction that will be prepared when new member joins, then committed.
+        t3 = DtxTestFixture(self, cluster[0], "t3")
+        t3.start()
+        t3.send(["1", "2"])
+        t3.end()
+        t3.prepare()
+        t1.verify(cluster, [])          # Not visible outside of transaction
+
+        # Transaction that will be prepared when  new member joins, then rolled back.
+        t4 = DtxTestFixture(self, cluster[0], "t4")
+        t4.start()
+        t4.send(["1", "2"])
+        t4.end()
+        t4.prepare()
+
+        # Transaction using an exclusive queue
+        t5 = DtxTestFixture(self, cluster[0], "t5", exclusive=True)
+        t5.start()
+        t5.send(["1", "2"])
+
+        # Accept messages in a transaction before/after join then commit
+        t6 = DtxTestFixture(self, cluster[0], "t6")
+        t6.send(["a","b","c"])
+        t6.start()
+        t6.verify(cluster, ["a","b","c"])
+        self.assertEqual(t6.accept().body, "a");
+        t6.verify(cluster, ["b","c"])
+
+        # Accept messages in a transaction before/after join then roll back
+        t7 = DtxTestFixture(self, cluster[0], "t7")
+        t7.send(["a","b","c"])
+        t7.start()
+        t7.verify(cluster, ["a","b","c"])
+        self.assertEqual(t7.accept().body, "a");
+        t7.verify(cluster, ["b","c"])
+
+        # Start new member
+        cluster.start()
+
+        # Commit t1
+        t1.send(["3","4"])
+        t1.verify(cluster, [])
+        t1.end()
+        t1.commit(one_phase=True)
+        t1.verify(cluster, ["1","2","3","4"])
+
+        # Rollback t2
+        t2.send(["3","4"])
+        t2.verify(cluster, [])
+        t2.end()
+        t2.rollback()
+        t2.verify(cluster, [])
+
+        # Commit t3
+        t3.verify(cluster, [])
+        t3.commit(one_phase=False)
+        t3.verify(cluster, ["1","2"])
+
+        # Rollback t4
+        t4.verify(cluster, [])
+        t4.rollback()
+        t4.verify(cluster, [])
+
+        # Commit t5
+        t5.send(["3","4"])
+        t5.verify(cluster, [])
+        t5.end()
+        t5.commit(one_phase=True)
+        t5.verify(cluster, ["1","2","3","4"])
+
+        # Commit t7
+        t6.verify(cluster, ["b", "c"])
+        self.assertEqual(t6.accept().body, "b");
+        t6.verify(cluster, ["c"])
+        t6.end()
+        t6.commit(one_phase=True)
+        t6.verify(cluster, ["c"])
+        t6.session.close()              # Make sure they're not requeued by the session.
+        t6.verify(cluster, ["c"])
+
+        # Rollback t7
+        t7.verify(cluster, ["b", "c"])
+        self.assertEqual(t7.accept().body, "b");
+        t7.verify(cluster, ["c"])
+        t7.end()
+        t7.rollback()
+        t7.verify(cluster, ["a", "b", "c"])
+
+
+class TxTests(BrokerTest):
+
+    def test_tx_update(self):
+        """Verify that transaction state is updated to a new broker"""
+
+        def make_message(session, body=None, key=None, id=None):
+            dp=session.delivery_properties(routing_key=key)
+            mp=session.message_properties(correlation_id=id)
+            return qpid.datatypes.Message(dp, mp, body)
+
+        cluster=self.cluster(1)
+        # Use old API. TX is not supported in messaging API.
+        c = cluster[0].connect_old()
+        s = c.session("tx-session", 1)
+        s.queue_declare(queue="q")
+        # Start transaction
+        s.tx_select()
+        s.message_transfer(message=make_message(s, "1", "q"))
+        # Start new member mid-transaction
+        cluster.start()
+        # Do more work
+        s.message_transfer(message=make_message(s, "2", "q"))
+        # Commit the transaction and verify the results.
+        s.tx_commit()
+        for b in cluster: self.assert_browse(b.connect().session(), "q", ["1","2"])
+
+
 class LongTests(BrokerTest):
     """Tests that can run for a long time if -DDURATION=<minutes> is set"""
     def duration(self):
@@ -1001,6 +1199,8 @@ class LongTests(BrokerTest):
         logger = logging.getLogger()
         log_level = logger.getEffectiveLevel()
         logger.setLevel(logging.ERROR)
+        sender = None
+        receiver = None
         try:
             # Start sender and receiver threads
             receiver = Receiver(cluster[0], "q;{create:always}")
@@ -1031,8 +1231,8 @@ class LongTests(BrokerTest):
 
         finally:
             # Detach to avoid slow reconnect attempts during shut-down if test fails.
-            sender.connection.detach()
-            receiver.connection.detach()
+            if sender: sender.connection.detach()
+            if receiver: receiver.connection.detach()
             logger.setLevel(log_level)
 
 class StoreTests(BrokerTest):

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=1161742&r1=1161741&r2=1161742&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Thu Aug 25 20:41:28 2011
@@ -212,6 +212,29 @@
       <field name="name" type="str8"/>
     </control>
 
+    <!-- Dtx transaction state. -->
+    <control name="dtx-start" code="0x1A">
+      <field name="xid" type="str16"/>
+      <field name="ended" type="bit"/>
+      <field name="suspended" type="bit"/>
+      <field name="failed" type="bit"/>
+      <field name="expired" type="bit"/>
+    </control>
+    <control name="dtx-end" code="0x1B"/>
+
+    <control name="dtx-ack" code="0x1C"/>
+
+    <control name="dtx-buffer-ref" code="0x1D">
+      <field name="xid" type="str16"/>
+      <field name="index" type="uint32"/>
+    </control>
+
+    <control name="dtx-work-record" code="0x1E">
+      <field name="xid" type="str16"/>
+      <field name="prepared" type="bit"/>
+      <field name="timeout" type="uint32"/>
+    </control>
+
     <!-- Complete a session state update. -->
     <control name="session-state" code="0x1F">
       <!-- Target session deduced from channel number.  -->



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message