qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r711587 - in /incubator/qpid/trunk/qpid/cpp: rubygen/framing.0-10/ src/ src/qpid/broker/ src/qpid/cluster/ src/qpid/framing/ src/tests/ xml/
Date Wed, 05 Nov 2008 15:22:49 GMT
Author: aconway
Date: Wed Nov  5 07:22:47 2008
New Revision: 711587

URL: http://svn.apache.org/viewvc?rev=711587&view=rev
Log:

Cluster: replicate transaction state to newcomers.

constants.rb: generate type code constants for AMQP types. Useful with Array.

framing/Array:
 - added some std:::vector like functions & typedefs.
 - use TypeCode enums, human readable  ostream << operator.

rubygen - fixed error in generation of exceptions for bad codes.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.h
    incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    incubator/qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb Wed Nov  5 07:22:47 2008
@@ -152,7 +152,7 @@
           assign = "holder = new #{c.name.caps}Exception(text); " unless c.name == "normal"
           genl "case #{c.value}: #{assign}break;" 
         }
-        genl "    holder = new #{invalid}(QPID_MSG(\"Bad exception code: \" << code
<< \": \" << text));"
+        genl "default: holder = new #{invalid}(QPID_MSG(\"Bad #{enum.parent.name}: \" <<
code << \": \" << text));"
       }
       genl "return holder;"
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Wed Nov  5 07:22:47 2008
@@ -506,6 +506,7 @@
   qpid/broker/TxAccept.h \
   qpid/broker/TxBuffer.h \
   qpid/broker/TxOp.h \
+  qpid/broker/TxOpVisitor.h \
   qpid/broker/TxPublish.h \
   qpid/broker/Vhost.h \
   qpid/client/AckMode.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h Wed Nov  5 07:22:47 2008
@@ -39,6 +39,7 @@
             virtual void commit() throw();
             virtual void rollback() throw();
             virtual ~DtxAck(){}
+            virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h Wed Nov  5 07:22:47 2008
@@ -45,6 +45,10 @@
             virtual void commit() throw();
             virtual void rollback() throw();
             virtual ~RecoveredDequeue(){}
+            virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
+
+            Queue::shared_ptr getQueue() const { return queue; }
+            boost::intrusive_ptr<Message> getMessage() const { return msg; }
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h Wed Nov  5 07:22:47 2008
@@ -45,6 +45,11 @@
             virtual void commit() throw();
             virtual void rollback() throw();
             virtual ~RecoveredEnqueue(){}
+            virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
+
+            Queue::shared_ptr getQueue() const { return queue; }
+            boost::intrusive_ptr<Message> getMessage() const { return msg; }
+            
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Nov  5 07:22:47 2008
@@ -436,7 +436,7 @@
     if(requeue){
         //take copy and clear unacked as requeue may result in redelivery to this session
         //which will in turn result in additions to unacked
-        std::list<DeliveryRecord> copy = unacked;
+        DeliveryRecords copy = unacked;
         unacked.clear();
         for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
     }else{

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Wed Nov  5 07:22:47 2008
@@ -134,7 +134,7 @@
     DeliveryAdapter& deliveryAdapter;
     ConsumerImplMap consumers;
     NameGenerator tagGenerator;
-    std::list<DeliveryRecord> unacked;
+    DeliveryRecords unacked;
     TxBuffer::shared_ptr txBuffer;
     DtxBuffer::shared_ptr dtxBuffer;
     bool dtxSelected;
@@ -216,8 +216,11 @@
     static ConsumerImpl* castToConsumerImpl(OutputTask* p) { return boost::polymorphic_downcast<ConsumerImpl*>(p);
}
 
     template <class F> void eachConsumer(F f) { outputTasks.eachOutput(boost::bind(f,
boost::bind(castToConsumerImpl, _1))); }
-    template <class F> void eachUnacked(F f) { std::for_each(unacked.begin(), unacked.end(),
f); }
-
+    DeliveryRecords& getUnacked() { return unacked; }
+    framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; }
+    TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; }
+    void setTxBuffer(const TxBuffer::shared_ptr& txb) { txBuffer = txb; }
+    void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; }
     void record(const DeliveryRecord& delivery);
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp Wed Nov  5 07:22:47 2008
@@ -69,7 +69,7 @@
     }
 }
 
-TxAccept::TxAccept(SequenceSet& _acked, std::list<DeliveryRecord>& _unacked)
: 
+TxAccept::TxAccept(const SequenceSet& _acked, DeliveryRecords& _unacked) : 
     acked(_acked), unacked(_unacked), ops(unacked) 
 {
     //populate the ops

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h Wed Nov  5 07:22:47 2008
@@ -56,8 +56,8 @@
                 void commit();    
             };
 
-            framing::SequenceSet& acked;
-            std::list<DeliveryRecord>& unacked;
+            framing::SequenceSet acked;
+            DeliveryRecords& unacked;
             RangeOps ops;
 
         public:
@@ -66,11 +66,15 @@
              * acks received
              * @param unacked the record of delivered messages
              */
-            TxAccept(framing::SequenceSet& acked, std::list<DeliveryRecord>&
unacked);
+            TxAccept(const framing::SequenceSet& acked, DeliveryRecords& unacked);
             virtual bool prepare(TransactionContext* ctxt) throw();
             virtual void commit() throw();
             virtual void rollback() throw();
             virtual ~TxAccept(){}
+            virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
+
+            // Used by cluster replication.
+            const framing::SequenceSet& getAcked() const { return acked; }
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp Wed Nov  5 07:22:47 2008
@@ -22,6 +22,7 @@
 #include "qpid/log/Statement.h"
 
 #include <boost/mem_fn.hpp>
+#include <boost/bind.hpp>
 using boost::mem_fn;
 using namespace qpid::broker;
 
@@ -73,3 +74,7 @@
     }
     return false;
 }
+
+void TxBuffer::accept(TxOpConstVisitor& v) const {
+    std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v)));

+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h Wed Nov  5 07:22:47 2008
@@ -107,6 +107,9 @@
              * commit
              */
             bool commitLocal(TransactionalStore* const store);
+
+            // Used by cluster to replicate transaction status.
+            void accept(TxOpConstVisitor& v) const;
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h Wed Nov  5 07:22:47 2008
@@ -21,11 +21,15 @@
 #ifndef _TxOp_
 #define _TxOp_
 
+#include "TxOpVisitor.h"
 #include "TransactionalStore.h"
 #include <boost/shared_ptr.hpp>
 
 namespace qpid {
     namespace broker {
+
+class TxOpConstVisitor;
+    
         class TxOp{
         public:
             typedef boost::shared_ptr<TxOp> shared_ptr;
@@ -34,9 +38,11 @@
             virtual void commit()  throw() = 0;
             virtual void rollback()  throw() = 0;
             virtual ~TxOp(){}
+
+            virtual void accept(TxOpConstVisitor&) const = 0;
         };
-    }
-}
+
+}} // namespace qpid::broker
 
 
 #endif

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h?rev=711587&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h Wed Nov  5 07:22:47 2008
@@ -0,0 +1,100 @@
+#ifndef QPID_BROKER_TXOPVISITOR_H
+#define QPID_BROKER_TXOPVISITOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/shared_ptr.h"
+
+namespace qpid {
+namespace broker {
+
+class DtxAck;
+class RecoveredDequeue;
+class RecoveredEnqueue;
+class TxAccept;
+class TxPublish;
+
+/**
+ * Visitor for TxOp familly of classes.
+ */
+struct TxOpConstVisitor
+{
+    virtual ~TxOpConstVisitor() {}
+    virtual void operator()(const DtxAck&) = 0;
+    virtual void operator()(const RecoveredDequeue&) = 0;
+    virtual void operator()(const RecoveredEnqueue&) = 0;
+    virtual void operator()(const TxAccept&) = 0;
+    virtual void operator()(const TxPublish&) = 0;
+};
+
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_TXOPVISITOR_H*/
+#ifndef QPID_BROKER_TXOPVISITOR_H
+#define QPID_BROKER_TXOPVISITOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/shared_ptr.h"
+
+namespace qpid {
+namespace broker {
+
+class DtxAck;
+class RecoveredDequeue;
+class RecoveredEnqueue;
+class TxAccept;
+class TxPublish;
+
+/**
+ * Visitor for TxOp familly of classes.
+ */
+struct TxOpConstVisitor
+{
+    virtual ~TxOpConstVisitor() {}
+    virtual void operator()(const DtxAck&) = 0;
+    virtual void operator()(const RecoveredDequeue&) = 0;
+    virtual void operator()(const RecoveredEnqueue&) = 0;
+    virtual void operator()(const TxAccept&) = 0;
+    virtual void operator()(const TxPublish&) = 0;
+};
+
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_TXOPVISITOR_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h Wed Nov  5 07:22:47 2008
@@ -75,8 +75,12 @@
             virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
 
             virtual ~TxPublish(){}
+            virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
 
             uint64_t contentSize();
+
+            boost::intrusive_ptr<Message> getMessage() const { return msg; }
+            const std::list<Queue::shared_ptr> getQueues() const { return queues; }
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Wed Nov  5 07:22:47 2008
@@ -90,7 +90,4 @@
 
 static ClusterPlugin instance; // Static initialization.
 
-// For test purposes.
-Cluster& getGlobalCluster() { assert(instance.cluster); return *instance.cluster; }
-    
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Nov  5 07:22:47 2008
@@ -24,7 +24,11 @@
 
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/SemanticState.h"
+#include "qpid/broker/TxBuffer.h"
 #include "qpid/broker/TxPublish.h"
+#include "qpid/broker/TxAccept.h"
+#include "qpid/broker/RecoveredEnqueue.h"
+#include "qpid/broker/RecoveredDequeue.h"
 #include "qpid/framing/enum.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/AllInvoker.h"
@@ -36,7 +40,7 @@
 
 #include <boost/current_function.hpp>
 
-// FIXME aconway 2008-11-03:
+// TODO aconway 2008-11-03:
 // 
 // Disproportionate amount of code here is dedicated to receiving a
 // brain-dump when joining a cluster and building initial
@@ -113,7 +117,6 @@
     std::string message;
     if (body.getMethod()) {
         switch (body.getMethod()->amqpClassId()) {
-          case TX_CLASS_ID: message = "TX transactions are not currently supported by cluster.";
break;
           case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster.";
break;
         }
     }
@@ -122,13 +125,13 @@
         if (dp && dp->getTtl()) message = "Message TTL is not currently supported
by cluster.";
     }
     if (!message.empty())
-        connection.close(execution::ERROR_CODE_INTERNAL_ERROR, message, 0, 0);
+        connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message, 0, 0);
     return !message.empty();
 }
 
 // Delivered from cluster.
 void Connection::delivered(framing::AMQFrame& f) {
-    QPID_LOG(trace, cluster << "DLVR " << *this << ": " << f);
+    QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f);
     assert(!catchUp);
     currentChannel = f.getChannel(); 
     if (!framing::invoke(*this, *f.getBody()).wasHandled() // Connection contol.
@@ -247,11 +250,15 @@
     return self.first == cluster.getId() && self.second == 0;
 }
 
+
+shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) {
+    shared_ptr<broker::Queue> queue = cluster.getBroker().getQueues().find(qname);
+    if (!queue) throw Exception(QPID_MSG(cluster << " can't find queue " << qname));
+    return queue;
+}
+
 broker::QueuedMessage Connection::getDumpMessage() {
-    // Get a message from the DUMP queue.
-    broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP);
-    if (!dumpQueue) throw Exception(QPID_MSG(cluster << " missing dump queue"));
-    broker::QueuedMessage m = dumpQueue->get();
+    broker::QueuedMessage m = findQueue(DumpClient::DUMP)->get();
     if (!m.payload) throw Exception(QPID_MSG(cluster << " empty dump queue"));
     return m;
 }
@@ -267,14 +274,11 @@
                                 bool ended,
                                 bool windowing)
 {
-    broker::Queue::shared_ptr queue = cluster.getBroker().getQueues().find(qname);
-    if (!queue) throw Exception(QPID_MSG(cluster << " bad deliveryRecord queue " <<
qname));
     broker::QueuedMessage m;
+    broker::Queue::shared_ptr queue = findQueue(qname);
     if (!ended) {               // Has a message
-        if (acquired) {          // Message at front of dump queue
-            broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP);
-            m = dumpQueue->get();
-        }
+        if (acquired)           // Message is on the dump queue
+            m = getDumpMessage();
         else                    // Message at original position in original queue
             m = queue->find(position);
         if (!m.payload)
@@ -286,8 +290,7 @@
     if (cancelled) dr.cancel(dr.getTag());
     if (completed) dr.complete();
     if (ended) dr.setEnded();   // Exsitance of message
-
-    semanticState().record(dr);
+    semanticState().record(dr); // Part of the session's unacked list.
 }
 
 void Connection::queuePosition(const string& qname, const SequenceNumber& position)
{
@@ -304,6 +307,36 @@
     return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup"
: "") << ")";
 }
 
+void Connection::txStart() {
+    txBuffer = make_shared_ptr(new broker::TxBuffer());
+}
+void Connection::txAccept(const framing::SequenceSet& acked) {
+    txBuffer->enlist(make_shared_ptr(new broker::TxAccept(acked, semanticState().getUnacked())));
+}
+
+void Connection::txDequeue(const std::string& queue) {
+    txBuffer->enlist(make_shared_ptr(new broker::RecoveredDequeue(findQueue(queue), getDumpMessage().payload)));
+}
+
+void Connection::txEnqueue(const std::string& queue) {
+    txBuffer->enlist(make_shared_ptr(new broker::RecoveredEnqueue(findQueue(queue), getDumpMessage().payload)));
+}
+
+void Connection::txPublish(const framing::Array& queues, bool delivered) {
+    boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getDumpMessage().payload));
+    for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) 
+        txPub->deliverTo(findQueue((*i)->get<std::string>()));
+    txPub->delivered = delivered;
+    txBuffer->enlist(txPub);
+}
+
+void Connection::txEnd() {
+    semanticState().setTxBuffer(txBuffer);
+}
+
+void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) {
+    semanticState().setAccumulatedAck(s);
+}
 
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Wed Nov  5 07:22:47 2008
@@ -125,12 +125,21 @@
 
     void queuePosition(const std::string&, const framing::SequenceNumber&);
 
+    void txStart();
+    void txAccept(const framing::SequenceSet&);
+    void txDequeue(const std::string&);
+    void txEnqueue(const std::string&);
+    void txPublish(const qpid::framing::Array&, bool);
+    void txEnd();
+    void accumulatedAck(const qpid::framing::SequenceSet&);
+
   private:
     bool checkUnsupported(const framing::AMQBody& body);
     void deliverClose();
     void deliverDoOutput(uint32_t requested);
     void sendDoOutput();
 
+    boost::shared_ptr<broker::Queue> findQueue(const std::string& qname);
     broker::SessionState& sessionState();
     broker::SemanticState& semanticState();
     broker::QueuedMessage getDumpMessage();
@@ -148,6 +157,7 @@
     framing::SequenceNumber mcastSeq;
     framing::SequenceNumber deliverSeq;
     framing::ChannelId currentChannel;
+    boost::shared_ptr<broker::TxBuffer> txBuffer;
     
   friend std::ostream& operator<<(std::ostream&, const Connection&);
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Wed Nov  5 07:22:47 2008
@@ -32,6 +32,12 @@
 #include "qpid/broker/ExchangeRegistry.h"
 #include "qpid/broker/SessionHandler.h"
 #include "qpid/broker/SessionState.h"
+#include "qpid/broker/TxOpVisitor.h"
+#include "qpid/broker/DtxAck.h"
+#include "qpid/broker/TxAccept.h"
+#include "qpid/broker/TxPublish.h"
+#include "qpid/broker/RecoveredDequeue.h"
+#include "qpid/broker/RecoveredEnqueue.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/ClusterConnectionMembershipBody.h"
 #include "qpid/framing/ClusterConnectionShadowReadyBody.h"
@@ -43,7 +49,7 @@
 #include "qpid/log/Statement.h"
 #include "qpid/Url.h"
 #include <boost/bind.hpp>
-
+#include <algorithm>
 
 namespace qpid {
 namespace cluster {
@@ -198,7 +204,7 @@
     shadowConnection = catchUpConnection();
 
     broker::Connection& bc = dumpConnection->getBrokerConnection();
-    // FIXME aconway 2008-10-20: What authentication info to reconnect?
+    // FIXME aconway 2008-10-20: What authentication info to use on reconnect?
     shadowConnection.open(dumpeeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax());
     bc.eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1));
     ClusterConnectionProxy(shadowConnection).shadowReady(
@@ -227,7 +233,10 @@
     ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this));
 
     QPID_LOG(debug, dumperId << " dumping unacknowledged messages.");
-    ss->getSemanticState().eachUnacked(boost::bind(&DumpClient::dumpUnacked, this,
_1));
+    broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
+    std::for_each(drs.begin(), drs.end(),  boost::bind(&DumpClient::dumpUnacked, this,
_1));
+
+    dumpTxState(ss->getSemanticState());           // Tx transaction state.
 
     //  Adjust for command counter for message in progress, will be sent after state update.
     boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
@@ -283,22 +292,12 @@
 }
     
 void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) {
-    dumpDeliveryRecordMessage(dr);
-    dumpDeliveryRecord(dr);
-}
-
-void DumpClient::dumpDeliveryRecordMessage(const broker::DeliveryRecord& dr) {
-    // Dump the message associated with a dr if need be.
     if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) {
         // If the message is acquired then it is no longer on the
         // dumpees queue, put it on the dump queue for dumpee to pick up.
         //
         MessageDumper(DUMP, shadowSession).dumpQueuedMessage(dr.getMessage());
     }
-}
-
-void DumpClient::dumpDeliveryRecord(const broker::DeliveryRecord& dr) {
-    // Assumes the associated message has already been dumped (if needed)
     ClusterConnectionProxy(shadowSession).deliveryRecord(
         dr.getQueue()->getName(),
         dr.getMessage().position,
@@ -312,4 +311,56 @@
         dr.isWindowing());
 }
 
+class TxOpDumper : public broker::TxOpConstVisitor, public MessageDumper {
+  public:
+    TxOpDumper(DumpClient& dc, client::AsyncSession s)
+        : MessageDumper(DumpClient::DUMP, s), parent(dc), session(s), proxy(s) {}
+
+    void operator()(const broker::DtxAck& ) {
+        throw InternalErrorException("DTX transactions not currently supported by cluster.");
+    }
+    
+    void operator()(const broker::RecoveredDequeue& rdeq) {
+        dumpMessage(rdeq.getMessage());
+        proxy.txEnqueue(rdeq.getQueue()->getName());
+    }
+
+    void operator()(const broker::RecoveredEnqueue& renq) {
+        dumpMessage(renq.getMessage());
+        proxy.txEnqueue(renq.getQueue()->getName());
+    }
+
+    void operator()(const broker::TxAccept& txAccept) {
+        proxy.txAccept(txAccept.getAcked());
+    }
+
+    void operator()(const broker::TxPublish& txPub) {
+        dumpMessage(txPub.getMessage());
+        typedef std::list<Queue::shared_ptr> QueueList;
+        const QueueList& qlist = txPub.getQueues();
+        Array qarray(TYPE_CODE_STR8);
+        for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i) 
+            qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
+        proxy.txPublish(qarray, txPub.delivered);
+    }
+
+  private:
+    DumpClient& parent;
+    client::AsyncSession session;
+    ClusterConnectionProxy proxy;
+};
+    
+void DumpClient::dumpTxState(broker::SemanticState& s) {
+    QPID_LOG(debug, dumperId << " dumping TX transaction state.");
+    ClusterConnectionProxy proxy(shadowSession);
+    proxy.accumulatedAck(s.getAccumulatedAck());
+    broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
+    if (txBuffer) {
+        proxy.txStart();
+        TxOpDumper dumper(*this, shadowSession);
+        txBuffer->accept(dumper);
+        proxy.txEnd();
+    }
+}
+
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h Wed Nov  5 07:22:47 2008
@@ -71,6 +71,8 @@
     void dump();
     void run();                 // Will delete this when finished.
 
+    void dumpUnacked(const broker::DeliveryRecord&);
+
   private:
     void dumpQueue(const boost::shared_ptr<broker::Queue>&);
     void dumpExchange(const boost::shared_ptr<broker::Exchange>&);
@@ -79,10 +81,8 @@
     void dumpBinding(const std::string& queue, const broker::QueueBinding& binding);
     void dumpConnection(const boost::intrusive_ptr<Connection>& connection);
     void dumpSession(broker::SessionHandler& s);
+    void dumpTxState(broker::SemanticState& s);
     void dumpConsumer(const broker::SemanticState::ConsumerImpl*);
-    void dumpUnacked(const broker::DeliveryRecord&);
-    void dumpDeliveryRecord(const broker::DeliveryRecord&);
-    void dumpDeliveryRecordMessage(const broker::DeliveryRecord&);
 
     MemberId dumperId;
     MemberId dumpeeId;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.h Wed Nov  5 07:22:47 2008
@@ -31,9 +31,9 @@
 class SequenceSet : public RangeSet<SequenceNumber> {
   public:
     SequenceSet() {}
-    explicit SequenceSet(const RangeSet<SequenceNumber>& r)
+    SequenceSet(const RangeSet<SequenceNumber>& r)
         : RangeSet<SequenceNumber>(r) {}
-    explicit SequenceSet(const SequenceNumber& s) { add(s); }
+    SequenceSet(const SequenceNumber& s) { add(s); }
     SequenceSet(const SequenceNumber& start, const SequenceNumber finish) { add(start,finish);
}
     
     

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h Wed Nov  5 07:22:47 2008
@@ -114,6 +114,9 @@
     void check(){
         assertEqualVector(expected, actual);
     }
+
+    void accept(TxOpConstVisitor&) const {}
+    
     ~MockTxOp(){}        
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Wed Nov  5 07:22:47 2008
@@ -45,13 +45,6 @@
 #include <algorithm>
 #include <iterator>
 
-namespace qpid {
-namespace cluster {
-// FIXME aconway 2008-11-04: remove.
-Cluster& getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp
-}} // namespace qpid::cluster
-
-
 namespace std {                 // ostream operators in std:: namespace
 template <class T>
 ostream& operator<<(ostream& o, const std::set<T>& s) { return seqPrint(o,
s); }
@@ -69,7 +62,6 @@
 using qpid::broker::Broker;
 using boost::shared_ptr;
 using qpid::cluster::Cluster;
-using qpid::cluster::getGlobalCluster;
 
 /** Parse broker & cluster options */
 Broker::Options parseOpts(size_t argc, const char* argv[]) {
@@ -216,8 +208,19 @@
     uint16_t channel;
 };
 
-QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testTxTransaction, 1) {
-    ClusterFixture cluster(1, 1); // FIXME aconway 2008-11-04: local broker at index 1
+QPID_AUTO_TEST_CASE(testUnsupported) {
+    ScopedSuppressLogging sl;
+    ClusterFixture cluster(1);
+    Client c1(cluster[0], "c1");
+    BOOST_CHECK_THROW(c1.session.dtxSelect(), FramingErrorException);    
+    Client c2(cluster[0], "c2");
+    Message  m;
+    m.getDeliveryProperties().setTtl(1);
+    BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=m), Exception);    
+}
+
+QPID_AUTO_TEST_CASE(testTxTransaction) {
+    ClusterFixture cluster(1);
     Client c0(cluster[0], "c0");
     c0.session.queueDeclare(arg::queue="q");
     c0.session.messageTransfer(arg::content=Message("A", "q"));
@@ -236,7 +239,8 @@
     SubscriptionManager rollbackSubs(rollbackSession);
     rollbackSession.txSelect();
     rollbackSession.messageTransfer(arg::content=Message("1", "q"));
-    BOOST_CHECK_EQUAL(rollbackSubs.get("q", TIME_SEC).getData(), "B");
+    Message rollbackMessage = rollbackSubs.get("q", TIME_SEC);
+    BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B");
 
     BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
     // Add new member mid transaction.
@@ -250,10 +254,14 @@
     rollbackSession.messageTransfer(arg::content=Message("3", "q"));
 
     BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);    
+
     // Commit/roll back.
     commitSession.txCommit();
     rollbackSession.txRollback();
-    // Verify queue status: just the comitted messages
+    rollbackSession.messageRelease(rollbackMessage.getId());
+
+
+    // Verify queue status: just the comitted messages and dequeues should remain.
     BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u);
     BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "B");
     BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "a");
@@ -261,20 +269,6 @@
     BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "c");
 }
 
-QPID_AUTO_TEST_CASE(testUnsupported) {
-    ScopedSuppressLogging sl;
-    ClusterFixture cluster(1);
-    Client c0(cluster[0], "c0");
-    BOOST_CHECK_THROW(c0.session.txSelect(), Exception);
-    BOOST_CHECK(!c0.connection.isOpen());
-    Client c1(cluster[0], "c1");
-    BOOST_CHECK_THROW(c1.session.dtxCommit(), Exception);    
-    Client c2(cluster[0], "c2");
-    Message  m;
-    m.getDeliveryProperties().setTtl(1);
-    BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=m), Exception);    
-}
-
 QPID_AUTO_TEST_CASE(testUnacked) {
     // Verify replication of unacknowledged messages.
     ClusterFixture cluster(1);
@@ -388,8 +382,7 @@
     Client c1(cluster[1], "c1");
     BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
     BOOST_CHECK_EQUAL(m.getData(), "abcd");
-
-    BOOST_CHECK_EQUAL(2u, getGlobalCluster().getUrls().size());
+    BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection).size());
 }
 
 QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {

Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Wed Nov  5 07:22:47 2008
@@ -96,6 +96,18 @@
       <field name="windowing" type="bit"/>
     </control>
     
+    <!-- Tx transaction state. -->
+    <control name="tx-start" code="0x12"/> 
+    <control name="tx-accept" code="0x13"> <field name="commands" type="sequence-set"/>
</control>
+    <control name="tx-dequeue" code="0x14"> <field name="queue" type="str8"/>
</control>
+    <control name="tx-enqueue" code="0x15"> <field name="queue" type="str8"/>
</control>
+    <control name="tx-publish" code="0x16">
+      <field name="queues" type="array"/>  <!--Array of str8 -->
+      <field name="delivered" type="bit"/>
+    </control>
+    <control name="tx-end" code="0x17"/>
+    <control name="accumulated-ack" code="0x18"> <field name="commands" type="sequence-set"/>
</control>
+    
     <!-- Complete a session state dump. -->
     <control name="session-state" code="0x1F" label="Set session state during a brain
dump.">
       <!-- Target session deduced from channel number.  -->



Mime
View raw message