qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1172657 [4/21] - in /qpid/branches/qpid-3346/qpid: ./ cpp/ cpp/bindings/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qpid/dotnet/ cpp/bindings/qpid/dotnet/examples/csharp.direct.receiver/Properties/ cpp/bindings/qpid/dotnet/examples/cshar...
Date Mon, 19 Sep 2011 15:13:38 GMT
Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SessionState.cpp Mon Sep 19 15:13:18 2011
@@ -445,8 +445,6 @@ void SessionState::addPendingExecutionSy
 boost::intrusive_ptr<AsyncCompletion::Callback>
 SessionState::IncompleteIngressMsgXfer::clone()
 {
-    boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer> cb(new SessionState::IncompleteIngressMsgXfer(session, msg));
-
     // Optimization: this routine is *only* invoked when the message needs to be asynchronously completed.
     // If the client is pending the message.transfer completion, flush now to force immediate write to journal.
     if (requiresSync)
@@ -457,7 +455,8 @@ SessionState::IncompleteIngressMsgXfer::
         pending = true;
         completerContext->addPendingMessage(msg);
     }
-    return cb;
+
+    return boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer>(new SessionState::IncompleteIngressMsgXfer(*this));
 }
 
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SessionState.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SessionState.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/SessionState.h Mon Sep 19 15:13:18 2011
@@ -256,7 +256,15 @@ class SessionState : public qpid::Sessio
           requiresAccept(m->requiresAccept()),
           requiresSync(m->getFrames().getMethod()->isSync()),
           pending(false) {}
-        virtual ~IncompleteIngressMsgXfer() {};
+        IncompleteIngressMsgXfer( const IncompleteIngressMsgXfer& x )
+          : AsyncCommandContext(x.session, x.msg->getCommandId()),
+          session(x.session),
+          msg(x.msg),
+          requiresAccept(x.requiresAccept),
+          requiresSync(x.requiresSync),
+          pending(x.pending) {}
+
+  virtual ~IncompleteIngressMsgXfer() {};
 
         virtual void completed(bool);
         virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/TxBuffer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/TxBuffer.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/TxBuffer.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/TxBuffer.cpp Mon Sep 19 15:13:18 2011
@@ -76,5 +76,5 @@ bool TxBuffer::commitLocal(Transactional
 }
 
 void TxBuffer::accept(TxOpConstVisitor& v) const {
-    std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v))); 
+    std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1, boost::ref(v)));
 }

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/TxPublish.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/TxPublish.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/TxPublish.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/TxPublish.h Mon Sep 19 15:13:18 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -34,57 +34,58 @@
 #include <boost/intrusive_ptr.hpp>
 
 namespace qpid {
-    namespace broker {
-        /**
-         * Defines the behaviour for publish operations on a
-         * transactional channel. Messages are routed through
-         * exchanges when received but are not at that stage delivered
-         * to the matching queues, rather the queues are held in an
-         * instance of this class. On prepare() the message is marked
-         * enqueued to the relevant queues in the MessagesStore. On
-         * commit() the messages will be passed to the queue for
-         * dispatch or to be added to the in-memory queue.
-         */
-        class QPID_BROKER_CLASS_EXTERN TxPublish : public TxOp, public Deliverable{
-
-            class Commit{
-                boost::intrusive_ptr<Message>& msg;
-            public:
-                Commit(boost::intrusive_ptr<Message>& msg);
-                void operator()(const boost::shared_ptr<Queue>& queue);            
-            };
-            class Rollback{
-                boost::intrusive_ptr<Message>& msg;
-            public:
-                Rollback(boost::intrusive_ptr<Message>& msg);
-                void operator()(const boost::shared_ptr<Queue>& queue);            
-            };
-
-            boost::intrusive_ptr<Message> msg;
-             std::list<boost::shared_ptr<Queue> > queues;
-            std::list<boost::shared_ptr<Queue> > prepared;
-
-            void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>);
-
-        public:
-            QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg);
-            QPID_BROKER_EXTERN virtual bool prepare(TransactionContext* ctxt) throw();
-            QPID_BROKER_EXTERN virtual void commit() throw();
-            QPID_BROKER_EXTERN virtual void rollback() throw();
-
-	    virtual Message& getMessage() { return *msg; };
-            
-            QPID_BROKER_EXTERN virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
-
-            virtual ~TxPublish(){}
-            virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
-
-            QPID_BROKER_EXTERN uint64_t contentSize();
-
-            boost::intrusive_ptr<Message> getMessage() const { return msg; }
-            const std::list<boost::shared_ptr<Queue> > getQueues() const { return queues; }
-        };
-    }
+namespace broker {
+/**
+ * Defines the behaviour for publish operations on a
+ * transactional channel. Messages are routed through
+ * exchanges when received but are not at that stage delivered
+ * to the matching queues, rather the queues are held in an
+ * instance of this class. On prepare() the message is marked
+ * enqueued to the relevant queues in the MessagesStore. On
+ * commit() the messages will be passed to the queue for
+ * dispatch or to be added to the in-memory queue.
+ */
+class QPID_BROKER_CLASS_EXTERN TxPublish : public TxOp, public Deliverable{
+
+    class Commit{
+        boost::intrusive_ptr<Message>& msg;
+      public:
+        Commit(boost::intrusive_ptr<Message>& msg);
+        void operator()(const boost::shared_ptr<Queue>& queue);
+    };
+    class Rollback{
+        boost::intrusive_ptr<Message>& msg;
+      public:
+        Rollback(boost::intrusive_ptr<Message>& msg);
+        void operator()(const boost::shared_ptr<Queue>& queue);
+    };
+
+    boost::intrusive_ptr<Message> msg;
+    std::list<boost::shared_ptr<Queue> > queues;
+    std::list<boost::shared_ptr<Queue> > prepared;
+
+    void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>);
+
+  public:
+    QPID_BROKER_EXTERN TxPublish(boost::intrusive_ptr<Message> msg);
+    QPID_BROKER_EXTERN virtual bool prepare(TransactionContext* ctxt) throw();
+    QPID_BROKER_EXTERN virtual void commit() throw();
+    QPID_BROKER_EXTERN virtual void rollback() throw();
+
+    virtual Message& getMessage() { return *msg; };
+
+    QPID_BROKER_EXTERN virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
+
+    virtual ~TxPublish(){}
+    virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
+
+    QPID_BROKER_EXTERN uint64_t contentSize();
+
+    boost::intrusive_ptr<Message> getMessage() const { return msg; }
+    const std::list<boost::shared_ptr<Queue> >& getQueues() const { return queues; }
+    const std::list<boost::shared_ptr<Queue> >& getPrepared() const { return prepared; }
+};
+}
 }
 
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/windows/SaslAuthenticator.cpp Mon Sep 19 15:13:18 2011
@@ -93,6 +93,7 @@ NullAuthenticator::~NullAuthenticator() 
 void NullAuthenticator::getMechanisms(Array& mechanisms)
 {
     mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("ANONYMOUS")));
+    mechanisms.add(boost::shared_ptr<FieldValue>(new Str16Value("PLAIN")));
 }
 
 void NullAuthenticator::start(const string& mechanism, const string& response)

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp Mon Sep 19 15:13:18 2011
@@ -27,10 +27,14 @@
 #include "qpid/sys/AsynchIOHandler.h"
 #include "qpid/sys/ConnectionCodec.h"
 #include "qpid/sys/Socket.h"
+#include "qpid/sys/SocketAddress.h"
 #include "qpid/sys/SystemInfo.h"
 #include "qpid/sys/windows/SslAsynchIO.h"
+
 #include <boost/bind.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
 #include <memory>
+
 // security.h needs to see this to distinguish from kernel use.
 #define SECURITY_WIN32
 #include <security.h>
@@ -68,9 +72,10 @@ struct SslServerOptions : qpid::Options
 };
 
 class SslProtocolFactory : public qpid::sys::ProtocolFactory {
-    qpid::sys::Socket listener;
     const bool tcpNoDelay;
-    const uint16_t listeningPort;
+    boost::ptr_vector<Socket> listeners;
+    boost::ptr_vector<AsynchAcceptor> acceptors;
+    uint16_t listeningPort;
     std::string brokerHost;
     const bool clientAuthSelected;
     std::auto_ptr<qpid::sys::AsynchAcceptor> acceptor;
@@ -78,7 +83,7 @@ class SslProtocolFactory : public qpid::
     CredHandle credHandle;
 
   public:
-    SslProtocolFactory(const SslServerOptions&, int backlog, bool nodelay);
+    SslProtocolFactory(const SslServerOptions&, const std::string& host, const std::string& port, int backlog, bool nodelay);
     ~SslProtocolFactory();
     void accept(sys::Poller::shared_ptr, sys::ConnectionCodec::Factory*);
     void connect(sys::Poller::shared_ptr, const std::string& host, const std::string& port,
@@ -114,6 +119,7 @@ static struct SslPlugin : public Plugin 
             try {
                 const broker::Broker::Options& opts = broker->getOptions();
                 ProtocolFactory::shared_ptr protocol(new SslProtocolFactory(options,
+                                                                            "", boost::lexical_cast<std::string>(options.port),
                                                                             opts.connectionBacklog,
                                                                             opts.tcpNoDelay));
                 QPID_LOG(notice, "Listening for SSL connections on TCP port " << protocol->getPort());
@@ -126,12 +132,13 @@ static struct SslPlugin : public Plugin 
 } sslPlugin;
 
 SslProtocolFactory::SslProtocolFactory(const SslServerOptions& options,
-                                       int backlog,
+                                       const std::string& host, const std::string& port, int backlog,
                                        bool nodelay)
     : tcpNoDelay(nodelay),
-    listeningPort(listener.listen("", boost::lexical_cast<std::string>(options.port), backlog)),
       clientAuthSelected(options.clientAuth) {
 
+    // Make sure that certificate store is good before listening to sockets
+    // to avoid having open and listening sockets when there is no cert store
     SecInvalidateHandle(&credHandle);
 
     // Get the certificate for this server.
@@ -176,6 +183,23 @@ SslProtocolFactory::SslProtocolFactory(c
         throw QPID_WINDOWS_ERROR(status);
     ::CertFreeCertificateContext(certContext);
     ::CertCloseStore(certStoreHandle, 0);
+
+    // Listen to socket(s)
+    SocketAddress sa(host, port);
+
+    // We must have at least one resolved address
+    QPID_LOG(info, "SSL Listening to: " << sa.asString())
+    Socket* s = new Socket;
+    listeningPort = s->listen(sa, backlog);
+    listeners.push_back(s);
+
+    // Try any other resolved addresses
+    while (sa.nextAddress()) {
+        QPID_LOG(info, "SSL Listening to: " << sa.asString())
+        Socket* s = new Socket;
+        s->listen(sa, backlog);
+        listeners.push_back(s);
+    }
 }
 
 SslProtocolFactory::~SslProtocolFactory() {
@@ -238,10 +262,12 @@ uint16_t SslProtocolFactory::getPort() c
 
 void SslProtocolFactory::accept(sys::Poller::shared_ptr poller,
                                 sys::ConnectionCodec::Factory* fact) {
-    acceptor.reset(
-        AsynchAcceptor::create(listener,
-                               boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false)));
-    acceptor->start(poller);
+    for (unsigned i = 0; i<listeners.size(); ++i) {
+        acceptors.push_back(
+            AsynchAcceptor::create(listeners[i],
+                            boost::bind(&SslProtocolFactory::established, this, poller, _1, fact, false)));
+        acceptors[i].start(poller);
+    }
 }
 
 void SslProtocolFactory::connect(sys::Poller::shared_ptr poller,

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Mon Sep 19 15:13:18 2011
@@ -143,7 +143,9 @@ void ConnectionHandler::outgoing(AMQFram
 void ConnectionHandler::waitForOpen()
 {
     waitFor(ESTABLISHED);
-    if (getState() == FAILED || getState() == CLOSED) {
+    if (getState() == FAILED) {
+        throw TransportFailure(errorText);
+    } else if (getState() == CLOSED) {
         throw ConnectionException(errorCode, errorText);
     }
 }

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Mon Sep 19 15:13:18 2011
@@ -265,7 +265,7 @@ void ConnectionImpl::open()
     } catch (const std::exception& e) {
         QPID_LOG(debug, "Failed to connect to " << protocol << ":" << host << ":" << port << " " << e.what());
         connector.reset();
-        throw;
+        throw TransportFailure(e.what());
     }
     connector->init();
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/SessionImpl.cpp Mon Sep 19 15:13:18 2011
@@ -170,6 +170,7 @@ Demux& SessionImpl::getDemux()
 void SessionImpl::waitForCompletion(const SequenceNumber& id)
 {
     Lock l(state);
+    sys::Waitable::ScopedWait w(state);
     waitForCompletionImpl(id);
 }
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/SslConnector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/SslConnector.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/SslConnector.cpp Mon Sep 19 15:13:18 2011
@@ -197,7 +197,7 @@ void SslConnector::connect(const std::st
         socket.connect(host, port);
     } catch (const std::exception& e) {
         socket.close();
-        throw ConnectionException(framing::connection::CLOSE_CODE_FRAMING_ERROR, e.what());
+        throw TransportFailure(e.what());
     }
 
     identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Mon Sep 19 15:13:18 2011
@@ -129,6 +129,10 @@ const std::string HEADERS_EXCHANGE("head
 const std::string XML_EXCHANGE("xml");
 const std::string WILDCARD_ANY("#");
 
+//exchange prefixes:
+const std::string PREFIX_AMQ("amq.");
+const std::string PREFIX_QPID("qpid.");
+
 const Verifier verifier;
 }
 
@@ -199,6 +203,7 @@ class Exchange : protected Node
     void checkCreate(qpid::client::AsyncSession&, CheckMode);
     void checkAssert(qpid::client::AsyncSession&, CheckMode);
     void checkDelete(qpid::client::AsyncSession&, CheckMode);
+    bool isReservedName();
 
   protected:
     const std::string specifiedType;
@@ -490,7 +495,7 @@ std::string Subscription::getSubscriptio
     if (name.empty()) {
         return (boost::format("%1%_%2%") % base % Uuid(true).str()).str();
     } else {
-        return (boost::format("%1%_%2%") % base % name).str();
+        return name;
     }
 }
 
@@ -772,18 +777,32 @@ Exchange::Exchange(const Address& a) : N
     linkBindings.setDefaultExchange(name);
 }
 
+bool Exchange::isReservedName()
+{
+    return name.find(PREFIX_AMQ) != std::string::npos || name.find(PREFIX_QPID) != std::string::npos;
+}
+
 void Exchange::checkCreate(qpid::client::AsyncSession& session, CheckMode mode)
 {
     if (enabled(createPolicy, mode)) {
         try {
-            std::string type = specifiedType;
-            if (type.empty()) type = TOPIC_EXCHANGE;
-            session.exchangeDeclare(arg::exchange=name,
-                                          arg::type=type,
-                                          arg::durable=durable,
-                                          arg::autoDelete=autoDelete,
-                                          arg::alternateExchange=alternateExchange,
-                                          arg::arguments=arguments);
+            if (isReservedName()) {
+                try {
+                    sync(session).exchangeDeclare(arg::exchange=name, arg::passive=true);
+                } catch (const qpid::framing::NotFoundException& /*e*/) {
+                    throw ResolutionError((boost::format("Cannot create exchange %1%; names beginning with \"amq.\" or \"qpid.\" are reserved.") % name).str());
+                }
+
+            } else {
+                std::string type = specifiedType;
+                if (type.empty()) type = TOPIC_EXCHANGE;
+                session.exchangeDeclare(arg::exchange=name,
+                                        arg::type=type,
+                                        arg::durable=durable,
+                                        arg::autoDelete=autoDelete,
+                                        arg::alternateExchange=alternateExchange,
+                                        arg::arguments=arguments);
+            }
             nodeBindings.bind(session);
             session.sync();
         } catch (const qpid::framing::NotAllowedException& e) {
@@ -833,7 +852,7 @@ void Exchange::checkAssert(qpid::client:
                 FieldTable::ValuePtr v = result.getArguments().get(i->first);
                 if (!v) {
                     throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str());
-                } else if (i->second != v) {
+                } else if (*i->second != *v) {
                     throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%")
                                           % i->first % name % *(i->second) % *v).str());
                 }

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/amqp0_10/ConnectionImpl.cpp Mon Sep 19 15:13:18 2011
@@ -20,7 +20,6 @@
  */
 #include "ConnectionImpl.h"
 #include "SessionImpl.h"
-#include "SimpleUrlParser.h"
 #include "qpid/messaging/exceptions.h"
 #include "qpid/messaging/Session.h"
 #include "qpid/messaging/PrivateImplRef.h"
@@ -133,6 +132,8 @@ void ConnectionImpl::setOption(const std
         settings.protocol = value.asString();
     } else if (name == "ssl-cert-name" || name == "ssl_cert_name") {
         settings.sslCertName = value.asString();
+    } else if (name == "x-reconnect-on-limit-exceeded" || name == "x_reconnect_on_limit_exceeded") {
+        reconnectOnLimitExceeded = value;
     } else {
         throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised"));
     }
@@ -273,21 +274,14 @@ bool ConnectionImpl::tryConnect()
     for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
         try {
             QPID_LOG(info, "Trying to connect to " << *i << "...");
-            //TODO: when url support is more complete can avoid this test here
-            if (i->find("amqp:") == 0) {
-                Url url(*i);
-                connection.open(url, settings);
-            } else {
-                SimpleUrlParser::parse(*i, settings);
-                connection.open(settings);
-            }
+            Url url(*i);
+            if (url.getUser().size()) settings.username = url.getUser();
+            if (url.getPass().size()) settings.password = url.getPass();
+            connection.open(url, settings);
             QPID_LOG(info, "Connected to " << *i);
             mergeUrls(connection.getInitialBrokers(), l);
             return resetSessions(l);
-        } catch (const qpid::ConnectionException& e) {
-            //TODO: need to fix timeout on
-            //qpid::client::Connection::open() so that it throws
-            //TransportFailure rather than a ConnectionException
+        } catch (const qpid::TransportFailure& e) {
             QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
         }
     }

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Mon Sep 19 15:13:18 2011
@@ -60,12 +60,14 @@ SessionImpl::SessionImpl(ConnectionImpl&
 
 void SessionImpl::checkError()
 {
+    ScopedLock l(lock);
     qpid::client::SessionBase_0_10Access s(session);
     s.get()->assertOpen();
 }
 
 bool SessionImpl::hasError()
 {
+    ScopedLock l(lock);
     qpid::client::SessionBase_0_10Access s(session);
     return s.get()->hasError();
 }
@@ -129,27 +131,29 @@ void SessionImpl::close()
         senders.clear();
         receivers.clear();
     } else {
-        while (true) {
-            Sender s;
-            {
-                ScopedLock l(lock);
-                if (senders.empty()) break;
-                s = senders.begin()->second;
-            }
-            s.close();  // outside the lock, will call senderCancelled
+        Senders sCopy;
+        Receivers rCopy;
+        {
+            ScopedLock l(lock);
+            senders.swap(sCopy);
+            receivers.swap(rCopy);
         }
-        while (true) {
-            Receiver r;
-            {
-                ScopedLock l(lock);
-                if (receivers.empty()) break;
-                r = receivers.begin()->second;
-            }
-            r.close();  // outside the lock, will call receiverCancelled
+        for (Senders::iterator i = sCopy.begin(); i != sCopy.end(); ++i)
+        {
+            // outside the lock, will call senderCancelled
+            i->second.close();
+        }
+        for (Receivers::iterator i = rCopy.begin(); i != rCopy.end(); ++i)
+        {
+            // outside the lock, will call receiverCancelled
+            i->second.close();
         }
     }
     connection->closed(*this);
-    if (!hasError()) session.close();
+    if (!hasError()) {
+        ScopedLock l(lock);
+        session.close();
+    }
 }
 
 template <class T, class S> boost::intrusive_ptr<S> getImplPtr(T& t)
@@ -432,8 +436,11 @@ uint32_t SessionImpl::getUnsettledAcksIm
 
 void SessionImpl::syncImpl(bool block)
 {
-    if (block) session.sync();
-    else session.flush();
+    {
+        ScopedLock l(lock);
+        if (block) session.sync();
+        else session.flush();
+    }
     //cleanup unconfirmed accept records:
     incoming.pendingAccept();
 }

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Sep 19 15:13:18 2011
@@ -57,12 +57,12 @@
  * - management::ManagementBroker: uses MessageHandler supplied by  cluster
  *   to send messages to the broker via the cluster.
  *
- * - Dtx: not yet supported with cluster.
- *
- * cluster::ExpiryPolicy implements the strategy for message expiry.
+ * cluster::ExpiryPolicy uses cluster time.
  *
  * ClusterTimer implements periodic timed events in the cluster context.
- * Used for periodic management events.
+ * Used for:
+ * - periodic management events.
+ * - DTX transaction timeouts.
  *
  * <h1>CLUSTER PROTOCOL OVERVIEW</h1>
  *
@@ -199,7 +199,7 @@ namespace _qmf = ::qmf::org::apache::qpi
  * Currently use SVN revision to avoid clashes with versions from
  * different branches.
  */
-const uint32_t Cluster::CLUSTER_VERSION = 1128070;
+const uint32_t Cluster::CLUSTER_VERSION = 1159329;
 
 struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
     qpid::cluster::Cluster& cluster;
@@ -526,7 +526,7 @@ void Cluster::deliveredFrame(const Event
 
 void Cluster::processFrame(const EventFrame& e, Lock& l) {
     if (e.isCluster()) {
-        QPID_LOG(trace, *this << " DLVR: " << e);
+        QPID_LOG_IF(trace, loggable(e.frame), *this << " DLVR: " << e);
         ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
         if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
             throw Exception(QPID_MSG("Invalid cluster control"));
@@ -535,14 +535,15 @@ void Cluster::processFrame(const EventFr
         map.incrementFrameSeq();
         ConnectionPtr connection = getConnection(e, l);
         if (connection) {
-            QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ":  " << e);
+            QPID_LOG_IF(trace, loggable(e.frame),
+                        *this << " DLVR " << map.getFrameSeq() << ":  " << e);
             connection->deliveredFrame(e);
         }
         else
             throw Exception(QPID_MSG("Unknown connection: " << e));
     }
     else // Drop connection frames while state < CATCHUP
-        QPID_LOG(trace, *this << " DROP (joining): " << e);
+        QPID_LOG_IF(trace, loggable(e.frame), *this << " DROP (joining): " << e);
 }
 
 // Called in deliverFrameQueue thread
@@ -1219,4 +1220,12 @@ bool Cluster::deferDeliveryImpl(const st
     return true;
 }
 
+bool Cluster::loggable(const AMQFrame& f) {
+    const  AMQMethodBody* method = (f.getMethod());
+    if (!method) return true;     // Not a method
+    bool isClock = method->amqpClassId() ==  ClusterClockBody::CLASS_ID
+        && method->amqpMethodId() == ClusterClockBody::METHOD_ID;
+    return !isClock;
+}
+
 }} // namespace qpid::cluster

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Cluster.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Cluster.h Mon Sep 19 15:13:18 2011
@@ -59,6 +59,7 @@ class Message;
 }
 
 namespace framing {
+class AMQFrame;
 class AMQBody;
 struct Uuid;
 }
@@ -95,10 +96,10 @@ class Cluster : private Cpg::Handler, pu
     void initialize();
 
     // Connection map.
-    void addLocalConnection(const ConnectionPtr&); 
-    void addShadowConnection(const ConnectionPtr&); 
-    void erase(const ConnectionId&);       
-    
+    void addLocalConnection(const ConnectionPtr&);
+    void addShadowConnection(const ConnectionPtr&);
+    void erase(const ConnectionId&);
+
     // URLs of current cluster members.
     std::vector<std::string> getIds() const;
     std::vector<Url> getUrls() const;
@@ -113,7 +114,7 @@ class Cluster : private Cpg::Handler, pu
     void updateInRetracted();
     // True if we are expecting to receive catch-up connections.
     bool isExpectingUpdate();
-    
+
     MemberId getId() const;
     broker::Broker& getBroker() const;
     Multicaster& getMulticast() { return mcast; }
@@ -144,6 +145,9 @@ class Cluster : private Cpg::Handler, pu
     sys::AbsTime getClusterTime();
     void sendClockUpdate();
     void clock(const uint64_t time);
+
+    static bool loggable(const framing::AMQFrame&); // True if the frame should be logged.
+
   private:
     typedef sys::Monitor::ScopedLock Lock;
 
@@ -153,10 +157,10 @@ class Cluster : private Cpg::Handler, pu
 
     /** Version number of the cluster protocol, to avoid mixed versions. */
     static const uint32_t CLUSTER_VERSION;
-    
+
     // NB: A dummy Lock& parameter marks functions that must only be
     // called with Cluster::lock  locked.
- 
+
     void leave(Lock&);
     std::vector<std::string> getIds(Lock&) const;
     std::vector<Url> getUrls(Lock&) const;
@@ -165,11 +169,11 @@ class Cluster : private Cpg::Handler, pu
     void brokerShutdown();
 
     // == Called in deliverEventQueue thread
-    void deliveredEvent(const Event&); 
+    void deliveredEvent(const Event&);
 
     // == Called in deliverFrameQueue thread
-    void deliveredFrame(const EventFrame&); 
-    void processFrame(const EventFrame&, Lock&); 
+    void deliveredFrame(const EventFrame&);
+    void processFrame(const EventFrame&, Lock&);
 
     // Cluster controls implement XML methods from cluster.xml.
     void updateRequest(const MemberId&, const std::string&, Lock&);
@@ -204,7 +208,7 @@ class Cluster : private Cpg::Handler, pu
     void setReady(Lock&);
     void memberUpdate(Lock&);
     void setClusterId(const framing::Uuid&, Lock&);
-    void erase(const ConnectionId&, Lock&);       
+    void erase(const ConnectionId&, Lock&);
     void requestUpdate(Lock& );
     void initMapCompleted(Lock&);
     void becomeElder(Lock&);
@@ -212,7 +216,7 @@ class Cluster : private Cpg::Handler, pu
     void updateMgmtMembership(Lock&);
 
     // == Called in CPG dispatch thread
-    void deliver( // CPG deliver callback. 
+    void deliver( // CPG deliver callback.
         cpg_handle_t /*handle*/,
         const struct cpg_name *group,
         uint32_t /*nodeid*/,
@@ -221,7 +225,7 @@ class Cluster : private Cpg::Handler, pu
         int /*msg_len*/);
 
     void deliverEvent(const Event&);
-    
+
     void configChange( // CPG config change callback.
         cpg_handle_t /*handle*/,
         const struct cpg_name */*group*/,
@@ -272,7 +276,7 @@ class Cluster : private Cpg::Handler, pu
     // Used only in deliverEventQueue thread or when stalled for update.
     Decoder decoder;
     bool discarding;
-    
+
 
     // Remaining members are protected by lock.
     mutable sys::Monitor lock;
@@ -285,7 +289,7 @@ class Cluster : private Cpg::Handler, pu
         JOINER,  ///< Sent update request, waiting for update offer.
         UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete.
         CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event.
-        READY,   ///< Fully operational 
+        READY,   ///< Fully operational
         OFFER,   ///< Sent an offer, waiting for accept/reject.
         UPDATER, ///< Offer accepted, sending a state update.
         LEFT     ///< Final state, left the cluster.

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Connection.cpp Mon Sep 19 15:13:18 2011
@@ -24,6 +24,8 @@
 #include "Cluster.h"
 #include "UpdateReceiver.h"
 #include "qpid/assert.h"
+#include "qpid/broker/DtxAck.h"
+#include "qpid/broker/DtxBuffer.h"
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/SemanticState.h"
 #include "qpid/broker/TxBuffer.h"
@@ -97,7 +99,9 @@ Connection::Connection(Cluster& c, sys::
                    external,
                    isLink,
                    isCatchUp ? ++catchUpId : 0,
-                   isCatchUp),  // isCatchUp => shadow
+                   // The first catch-up connection is not considered a shadow
+                   // as it needs to be authenticated.
+                   isCatchUp && self.second > 1),
     expectProtocolHeader(isLink),
     mcastFrameHandler(cluster.getMulticast(), self),
     updateIn(c.getUpdateReceiver()),
@@ -114,7 +118,7 @@ Connection::Connection(Cluster& c, sys::
         if (!updateIn.nextShadowMgmtId.empty())
             connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
         updateIn.nextShadowMgmtId.clear();
-     }
+    }
     init();
     QPID_LOG(debug, cluster << " local connection " << *this);
 }
@@ -167,7 +171,7 @@ void Connection::announce(
         AMQFrame frame;
         while (frame.decode(buf))
             connection->received(frame);
-         connection->setUserId(username);
+        connection->setUserId(username);
     }
     // Do managment actions now that the connection is replicated.
     connection->raiseConnectEvent();
@@ -194,7 +198,7 @@ void Connection::received(framing::AMQFr
                  << *this << ": " << f);
         return;
     }
-    QPID_LOG(trace, cluster << " RECV " << *this << ": " << f);
+    QPID_LOG_IF(trace, Cluster::loggable(f), cluster << " RECV " << *this << ": " << f);
     if (isLocal()) {            // Local catch-up connection.
         currentChannel = f.getChannel();
         if (!framing::invoke(*this, *f.getBody()).wasHandled())
@@ -214,16 +218,9 @@ void Connection::received(framing::AMQFr
     }
 }
 
-bool Connection::checkUnsupported(const AMQBody& body) {
-    std::string message;
-    if (body.getMethod()) {
-        switch (body.getMethod()->amqpClassId()) {
-          case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break;
-        }
-    }
-    if (!message.empty())
-        connection->close(connection::CLOSE_CODE_FRAMING_ERROR, message);
-    return !message.empty();
+bool Connection::checkUnsupported(const AMQBody&) {
+    // Throw an exception for unsupported commands. Currently all are supported.
+    return false;
 }
 
 struct GiveReadCreditOnExit {
@@ -424,7 +421,8 @@ void Connection::sessionState(
     const SequenceNumber& expected,
     const SequenceNumber& received,
     const SequenceSet& unknownCompleted,
-    const SequenceSet& receivedIncomplete)
+    const SequenceSet& receivedIncomplete,
+    bool dtxSelected)
 {
     sessionState().setState(
         replayStart,
@@ -434,7 +432,9 @@ void Connection::sessionState(
         received,
         unknownCompleted,
         receivedIncomplete);
-    QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
+    if (dtxSelected) semanticState().selectDtx();
+    QPID_LOG(debug, cluster << " received session state update for "
+             << sessionState().getId());
     // The output tasks will be added later in the update process.
     connection->getOutputTasks().removeAll();
 }
@@ -464,11 +464,24 @@ void Connection::shadowReady(
     output.setSendMax(sendMax);
 }
 
+void Connection::setDtxBuffer(const UpdateReceiver::DtxBufferRef& bufRef) {
+    broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
+    broker::DtxWorkRecord* record = mgr.getWork(bufRef.xid);
+    broker::DtxBuffer::shared_ptr buffer = (*record)[bufRef.index];
+    if (bufRef.suspended)
+        bufRef.semanticState->getSuspendedXids()[bufRef.xid] = buffer;
+    else
+        bufRef.semanticState->setDtxBuffer(buffer);
+}
+
+// Marks the end of the update.
 void Connection::membership(const FieldTable& joiners, const FieldTable& members,
                             const framing::SequenceNumber& frameSeq)
 {
     QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
     updateIn.consumerNumbering.clear();
+    for_each(updateIn.dtxBuffers.begin(), updateIn.dtxBuffers.end(),
+             boost::bind(&Connection::setDtxBuffer, this, _1));
     closeUpdated();
     cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
 }
@@ -536,8 +549,16 @@ void Connection::deliveryRecord(const st
         } else {                // Message at original position in original queue
             queue->find(position, m);
         }
-        if (!m.payload)
-            throw Exception(QPID_MSG("deliveryRecord no update message"));
+        // FIXME aconway 2011-08-19: removed:
+        // if (!m.payload)
+        //      throw Exception(QPID_MSG("deliveryRecord no update message"));
+        //
+        // It seems this could happen legitimately in the case one
+        // session browses message M, then another session acquires
+        // it. In that case the browsers delivery record is !acquired
+        // but the message is not on its original Queue. In that case
+        // we'll get a deliveryRecord with no payload for the browser.
+        //
     }
 
     broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit);
@@ -545,7 +566,11 @@ void Connection::deliveryRecord(const st
     if (cancelled) dr.cancel(dr.getTag());
     if (completed) dr.complete();
     if (ended) dr.setEnded();   // Exsitance of message
-    semanticState().record(dr); // Part of the session's unacked list.
+
+    if (dtxBuffer)              // Record for next dtx-ack
+        dtxAckRecords.push_back(dr);
+    else
+        semanticState().record(dr); // Record on session's unacked list.
 }
 
 void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
@@ -561,29 +586,29 @@ void Connection::queueFairshareState(con
 
 
 namespace {
-    // find a StatefulQueueObserver that matches a given identifier
-    class ObserverFinder {
-        const std::string id;
-        boost::shared_ptr<broker::QueueObserver> target;
-        ObserverFinder(const ObserverFinder&) {}
-    public:
-        ObserverFinder(const std::string& _id) : id(_id) {}
-        broker::StatefulQueueObserver *getObserver()
-        {
-            if (target)
-                return dynamic_cast<broker::StatefulQueueObserver *>(target.get());
-            return 0;
-        }
-        void operator() (boost::shared_ptr<broker::QueueObserver> o)
-        {
-            if (!target) {
-                broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
-                if (p && p->getId() == id) {
-                    target = o;
-                }
+// find a StatefulQueueObserver that matches a given identifier
+class ObserverFinder {
+    const std::string id;
+    boost::shared_ptr<broker::QueueObserver> target;
+    ObserverFinder(const ObserverFinder&) {}
+  public:
+    ObserverFinder(const std::string& _id) : id(_id) {}
+    broker::StatefulQueueObserver *getObserver()
+    {
+        if (target)
+            return dynamic_cast<broker::StatefulQueueObserver *>(target.get());
+        return 0;
+    }
+    void operator() (boost::shared_ptr<broker::QueueObserver> o)
+    {
+        if (!target) {
+            broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get());
+            if (p && p->getId() == id) {
+                target = o;
             }
         }
-    };
+    }
+};
 }
 
 
@@ -615,6 +640,7 @@ std::ostream& operator<<(std::ostream& o
 void Connection::txStart() {
     txBuffer.reset(new broker::TxBuffer());
 }
+
 void Connection::txAccept(const framing::SequenceSet& acked) {
     txBuffer->enlist(boost::shared_ptr<broker::TxAccept>(
                          new broker::TxAccept(acked, semanticState().getUnacked())));
@@ -630,8 +656,10 @@ void Connection::txEnqueue(const std::st
                          new broker::RecoveredEnqueue(findQueue(queue), getUpdateMessage().payload)));
 }
 
-void Connection::txPublish(const framing::Array& queues, bool delivered) {
-    boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload));
+void Connection::txPublish(const framing::Array& queues, bool delivered)
+{
+    boost::shared_ptr<broker::TxPublish> txPub(
+        new broker::TxPublish(getUpdateMessage().payload));
     for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i)
         txPub->deliverTo(findQueue((*i)->get<std::string>()));
     txPub->delivered = delivered;
@@ -646,6 +674,51 @@ void Connection::accumulatedAck(const qp
     semanticState().setAccumulatedAck(s);
 }
 
+void Connection::dtxStart(const std::string& xid,
+                          bool ended,
+                          bool suspended,
+                          bool failed,
+                          bool expired)
+{
+    dtxBuffer.reset(new broker::DtxBuffer(xid, ended, suspended, failed, expired));
+    txBuffer = dtxBuffer;
+}
+
+void Connection::dtxEnd() {
+    broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
+    std::string xid = dtxBuffer->getXid();
+    if (mgr.exists(xid))
+        mgr.join(xid, dtxBuffer);
+    else
+        mgr.start(xid, dtxBuffer);
+    dtxBuffer.reset();
+    txBuffer.reset();
+}
+
+// Sent after all DeliveryRecords for a dtx-ack have been collected in dtxAckRecords
+void Connection::dtxAck() {
+    dtxBuffer->enlist(
+        boost::shared_ptr<broker::DtxAck>(new broker::DtxAck(dtxAckRecords)));
+    dtxAckRecords.clear();
+}
+
+void Connection::dtxBufferRef(const std::string& xid, uint32_t index, bool suspended) {
+    // Save the association between DtxBuffers and the session so we
+    // can set the DtxBuffers at the end of the update when the
+    // DtxManager has been replicated.
+    updateIn.dtxBuffers.push_back(
+        UpdateReceiver::DtxBufferRef(xid, index, suspended, &semanticState()));
+}
+
+// Sent at end of work record.
+void Connection::dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout)
+{
+    broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
+    if (timeout) mgr.setTimeout(xid, timeout);
+    if (prepared) mgr.prepare(xid);
+}
+
+
 void Connection::exchange(const std::string& encoded) {
     Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
     broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf);

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Connection.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Connection.h Mon Sep 19 15:13:18 2011
@@ -29,6 +29,7 @@
 
 #include "qpid/RefCounted.h"
 #include "qpid/broker/Connection.h"
+#include "qpid/broker/DeliveryRecord.h"
 #include "qpid/broker/SecureConnection.h"
 #include "qpid/broker/SemanticState.h"
 #include "qpid/amqp_0_10/Connection.h"
@@ -123,7 +124,8 @@ class Connection :
                       const framing::SequenceNumber& expected,
                       const framing::SequenceNumber& received,
                       const framing::SequenceSet& unknownCompleted,
-                      const SequenceSet& receivedIncomplete);
+                      const SequenceSet& receivedIncomplete,
+                      bool dtxSelected);
 
     void outputTask(uint16_t channel, const std::string& name);
 
@@ -164,6 +166,17 @@ class Connection :
     void txEnd();
     void accumulatedAck(const framing::SequenceSet&);
 
+    // Dtx state
+    void dtxStart(const std::string& xid,
+                  bool ended,
+                  bool suspended,
+                  bool failed,
+                  bool expired);
+    void dtxEnd();
+    void dtxAck();
+    void dtxBufferRef(const std::string& xid, uint32_t index, bool suspended);
+    void dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout);
+
     // Encoded exchange replication.
     void exchange(const std::string& encoded);
 
@@ -251,7 +264,7 @@ class Connection :
     broker::SemanticState& semanticState();
     broker::QueuedMessage getUpdateMessage();
     void closeUpdated();
-
+    void setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &);
     Cluster& cluster;
     ConnectionId self;
     bool catchUp;
@@ -263,6 +276,9 @@ class Connection :
     framing::SequenceNumber deliverSeq;
     framing::ChannelId currentChannel;
     boost::shared_ptr<broker::TxBuffer> txBuffer;
+    boost::shared_ptr<broker::DtxBuffer> dtxBuffer;
+    broker::DeliveryRecords dtxAckRecords;
+    broker::DtxWorkRecord* dtxCurrent;
     bool expectProtocolHeader;
     McastFrameHandler mcastFrameHandler;
     UpdateReceiver& updateIn;

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/Multicaster.cpp Mon Sep 19 15:13:18 2011
@@ -21,6 +21,7 @@
 
 #include "qpid/cluster/Multicaster.h"
 #include "qpid/cluster/Cpg.h"
+#include "qpid/cluster/Cluster.h"
 #include "qpid/log/Statement.h"
 #include "qpid/framing/AMQBody.h"
 #include "qpid/framing/AMQFrame.h"
@@ -58,7 +59,7 @@ void Multicaster::mcast(const Event& e) 
             return;
         }
     }
-    QPID_LOG(trace, "MCAST " << e);
+    QPID_LOG_IF(trace, e.isControl() && Cluster::loggable(e.getFrame()), "MCAST " << e);
     if (bypass) {               // direct, don't queue
         iovec iov = e.toIovec();
         while (!cpg.mcast(&iov, 1))

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/SecureConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/SecureConnectionFactory.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/SecureConnectionFactory.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/SecureConnectionFactory.cpp Mon Sep 19 15:13:18 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -48,7 +48,7 @@ SecureConnectionFactory::create(Protocol
     if (clusterCodec) {
         SecureConnectionPtr sc(new SecureConnection());
         clusterCodec->setSecureConnection(sc.get());
-        sc->setCodec(codec);        
+        sc->setCodec(codec);
         return sc.release();
     }
     return 0;
@@ -63,7 +63,7 @@ SecureConnectionFactory::create(sys::Out
     if (clusterCodec) {
         SecureConnectionPtr sc(new SecureConnection());
         clusterCodec->setSecureConnection(sc.get());
-        sc->setCodec(codec);        
+        sc->setCodec(codec);
         return sc.release();
     }
     return 0;

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Mon Sep 19 15:13:18 2011
@@ -45,6 +45,8 @@
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/TxOpVisitor.h"
 #include "qpid/broker/DtxAck.h"
+#include "qpid/broker/DtxBuffer.h"
+#include "qpid/broker/DtxWorkRecord.h"
 #include "qpid/broker/TxAccept.h"
 #include "qpid/broker/TxPublish.h"
 #include "qpid/broker/RecoveredDequeue.h"
@@ -65,6 +67,7 @@
 #include <boost/bind.hpp>
 #include <boost/cast.hpp>
 #include <algorithm>
+#include <iterator>
 #include <sstream>
 
 namespace qpid {
@@ -148,6 +151,7 @@ void UpdateClient::run() {
     try {
         connection.open(updateeUrl, connectionSettings);
         session = connection.newSession(UPDATE);
+        session.sync();
         update();
         done();
     } catch (const std::exception& e) {
@@ -177,9 +181,9 @@ void UpdateClient::update() {
     // longer on their original queue.
     session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
     session.sync();
+
     std::for_each(connections.begin(), connections.end(),
                   boost::bind(&UpdateClient::updateConnection, this, _1));
-    session.queueDelete(arg::queue=UPDATE);
 
     // some Queue Observers need session state & msgs synced first, so sync observers now
     b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueObservers, this, _1));
@@ -189,6 +193,8 @@ void UpdateClient::update() {
 
     updateLinks();
     updateManagementAgent();
+    updateDtxManager();
+    session.queueDelete(arg::queue=UPDATE);
 
     session.close();
 
@@ -318,22 +324,22 @@ class MessageUpdater {
         lastPos = message.position;
 
         // if the ttl > 0, we need to send the calculated expiration time to the updatee
-        if (message.payload->getProperties<DeliveryProperties>()->getTtl() > 0) {
+        const DeliveryProperties* dprops =
+            message.payload->getProperties<DeliveryProperties>();
+        if (dprops && dprops->getTtl() > 0) {
             bool hadMessageProps =
                 message.payload->hasProperties<framing::MessageProperties>();
-            framing::MessageProperties* mprops =
+            const framing::MessageProperties* mprops =
                 message.payload->getProperties<framing::MessageProperties>();
             bool hadApplicationHeaders = mprops->hasApplicationHeaders();
-            FieldTable& applicationHeaders = mprops->getApplicationHeaders();
-            applicationHeaders.setInt64(
-                UpdateClient::X_QPID_EXPIRATION,
-                sys::Duration(sys::EPOCH, message.payload->getExpiration()));
+            message.payload->insertCustomProperty(UpdateClient::X_QPID_EXPIRATION,
+                            sys::Duration(sys::EPOCH, message.payload->getExpiration()));
             // If message properties or application headers didn't exist
             // prior to us adding data, we want to remove them on the other side.
             if (!hadMessageProps)
-                applicationHeaders.setInt(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0);
+                message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_MESSAGE_PROPS, 0);
             else if (!hadApplicationHeaders)
-                applicationHeaders.setInt(UpdateClient::X_QPID_NO_HEADERS, 0);
+                message.payload->insertCustomProperty(UpdateClient::X_QPID_NO_HEADERS, 0);
         }
 
         // We can't send a broker::Message via the normal client API,
@@ -356,7 +362,8 @@ class MessageUpdater {
             for (uint64_t offset = 0; morecontent; offset += maxContentSize)
             {
                 AMQFrame frame((AMQContentBody()));
-                morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset);
+                morecontent = message.payload->getContentFrame(
+                    *(message.queue), frame, maxContentSize, offset);
                 sb.get()->sendRawFrame(frame);
             }
         }
@@ -402,7 +409,11 @@ void UpdateClient::updateNonExclusiveQue
 }
 
 void UpdateClient::updateBinding(client::AsyncSession& s, const std::string& queue, const QueueBinding& binding) {
-    s.exchangeBind(queue, binding.exchange, binding.key, binding.args);
+    if (binding.exchange.size())
+        s.exchangeBind(queue, binding.exchange, binding.key, binding.args);
+    //else its the default exchange and there is no need to replicate
+    //the binding, the creation of the queue will have done so
+    //automatically
 }
 
 void UpdateClient::updateOutputTask(const sys::OutputTask* task) {
@@ -475,9 +486,9 @@ void UpdateClient::updateSession(broker:
     QPID_LOG(debug, *this << " updating unacknowledged messages.");
     broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
     std::for_each(drs.begin(), drs.end(),
-                  boost::bind(&UpdateClient::updateUnacked, this, _1));
+                  boost::bind(&UpdateClient::updateUnacked, this, _1, shadowSession));
 
-    updateTxState(ss->getSemanticState());           // Tx transaction state.
+    updateTransactionState(ss->getSemanticState());
 
     // Adjust command counter for message in progress, will be sent after state update.
     boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
@@ -496,7 +507,8 @@ void UpdateClient::updateSession(broker:
         std::max(received, ss->receiverGetExpected().command),
         received,
         ss->receiverGetUnknownComplete(),
-        ss->receiverGetIncomplete()
+        ss->receiverGetIncomplete(),
+        ss->getSemanticState().getDtxSelected()
     );
 
     // Send frames for partial message in progress.
@@ -538,14 +550,17 @@ void UpdateClient::updateConsumer(
              << " on " << shadowSession.getId());
 }
 
-void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) {
-    if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) {
+void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr,
+                                 client::AsyncSession& updateSession)
+{
+    if (!dr.isEnded() && dr.isAcquired()) {
+        assert(dr.getMessage().payload);
         // If the message is acquired then it is no longer on the
         // updatees queue, put it on the update queue for updatee to pick up.
         //
-        MessageUpdater(UPDATE, shadowSession, expiry).updateQueuedMessage(dr.getMessage());
+        MessageUpdater(UPDATE, updateSession, expiry).updateQueuedMessage(dr.getMessage());
     }
-    ClusterConnectionProxy(shadowSession).deliveryRecord(
+    ClusterConnectionProxy(updateSession).deliveryRecord(
         dr.getQueue()->getName(),
         dr.getMessage().position,
         dr.getTag(),
@@ -566,8 +581,10 @@ class TxOpUpdater : public broker::TxOpC
     TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry)
         : MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc), session(s), proxy(s) {}
 
-    void operator()(const broker::DtxAck& ) {
-        throw InternalErrorException("DTX transactions not currently supported by cluster.");
+    void operator()(const broker::DtxAck& ack) {
+        std::for_each(ack.getPending().begin(), ack.getPending().end(),
+                      boost::bind(&UpdateClient::updateUnacked, &parent, _1, session));
+        proxy.dtxAck();
     }
 
     void operator()(const broker::RecoveredDequeue& rdeq) {
@@ -584,13 +601,18 @@ class TxOpUpdater : public broker::TxOpC
         proxy.txAccept(txAccept.getAcked());
     }
 
+    typedef std::list<Queue::shared_ptr> QueueList;
+
+    void copy(const QueueList& l, Array& a) {
+        for (QueueList::const_iterator i = l.begin(); i!=l.end(); ++i)
+            a.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
+    }
+
     void operator()(const broker::TxPublish& txPub) {
         updateMessage(txPub.getMessage());
-        typedef std::list<Queue::shared_ptr> QueueList;
-        const QueueList& qlist = txPub.getQueues();
+        assert(txPub.getQueues().empty() || txPub.getPrepared().empty());
         Array qarray(TYPE_CODE_STR8);
-        for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i)
-            qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
+        copy(txPub.getQueues().empty() ? txPub.getPrepared() : txPub.getQueues(), qarray);
         proxy.txPublish(qarray, txPub.delivered);
     }
 
@@ -600,17 +622,43 @@ class TxOpUpdater : public broker::TxOpC
     ClusterConnectionProxy proxy;
 };
 
-void UpdateClient::updateTxState(broker::SemanticState& s) {
-    QPID_LOG(debug, *this << " updating TX transaction state.");
+void UpdateClient::updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx,bool suspended)
+{
+    ClusterConnectionProxy proxy(shadowSession);
+    broker::DtxWorkRecord* record =
+        updaterBroker.getDtxManager().getWork(dtx->getXid());
+    proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx), suspended);
+
+}
+
+void UpdateClient::updateTransactionState(broker::SemanticState& s) {
     ClusterConnectionProxy proxy(shadowSession);
     proxy.accumulatedAck(s.getAccumulatedAck());
-    broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
-    if (txBuffer) {
+    broker::TxBuffer::shared_ptr tx = s.getTxBuffer();
+    broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer();
+    if (dtx) {
+        updateBufferRef(dtx, false); // Current transaction.
+    } else if (tx) {
         proxy.txStart();
         TxOpUpdater updater(*this, shadowSession, expiry);
-        txBuffer->accept(updater);
+        tx->accept(updater);
         proxy.txEnd();
     }
+    for (SemanticState::DtxBufferMap::iterator i = s.getSuspendedXids().begin();
+         i != s.getSuspendedXids().end();
+         ++i)
+    {
+        updateBufferRef(i->second, true);
+    }
+}
+
+void UpdateClient::updateDtxBuffer(const broker::DtxBuffer::shared_ptr& dtx) {
+    ClusterConnectionProxy proxy(session);
+    proxy.dtxStart(
+        dtx->getXid(), dtx->isEnded(), dtx->isSuspended(), dtx->isFailed(), dtx->isExpired());
+    TxOpUpdater updater(*this, session, expiry);
+    dtx->accept(updater);
+    proxy.dtxEnd();
 }
 
 void UpdateClient::updateQueueListeners(const boost::shared_ptr<broker::Queue>& queue) {
@@ -663,5 +711,17 @@ void UpdateClient::updateObserver(const 
     }
 }
 
+void UpdateClient::updateDtxManager() {
+    broker::DtxManager& dtm = updaterBroker.getDtxManager();
+    dtm.each(boost::bind(&UpdateClient::updateDtxWorkRecord, this, _1));
+}
+
+void UpdateClient::updateDtxWorkRecord(const broker::DtxWorkRecord& r) {
+    QPID_LOG(debug, *this << " updating DTX transaction: " << r.getXid());
+    for (size_t i = 0; i < r.size(); ++i)
+        updateDtxBuffer(r[i]);
+    ClusterConnectionProxy(session).dtxWorkRecord(
+        r.getXid(), r.isPrepared(), r.getTimeout());
+}
 
 }} // namespace qpid::cluster

Propchange: qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:13:18 2011
@@ -1,2 +1,2 @@
 /qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.cpp:1061302-1072333
-/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp:1156188
+/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp:1144319-1172654

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/UpdateClient.h Mon Sep 19 15:13:18 2011
@@ -52,7 +52,7 @@ class Decoder;
 class Link;
 class Bridge;
 class QueueObserver;
-
+class DtxBuffer;
 } // namespace broker
 
 namespace cluster {
@@ -88,7 +88,7 @@ class UpdateClient : public sys::Runnabl
     void update();
     void run();                 // Will delete this when finished.
 
-    void updateUnacked(const broker::DeliveryRecord&);
+    void updateUnacked(const broker::DeliveryRecord&, client::AsyncSession&);
 
   private:
     void updateQueue(client::AsyncSession&, const boost::shared_ptr<broker::Queue>&);
@@ -100,7 +100,8 @@ class UpdateClient : public sys::Runnabl
     void updateBinding(client::AsyncSession&, const std::string& queue, const broker::QueueBinding& binding);
     void updateConnection(const boost::intrusive_ptr<Connection>& connection);
     void updateSession(broker::SessionHandler& s);
-    void updateTxState(broker::SemanticState& s);
+    void updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx, bool suspended);
+    void updateTransactionState(broker::SemanticState& s);
     void updateOutputTask(const sys::OutputTask* task);
     void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&);
     void updateQueueListeners(const boost::shared_ptr<broker::Queue>&);
@@ -112,6 +113,9 @@ class UpdateClient : public sys::Runnabl
     void updateBridge(const boost::shared_ptr<broker::Bridge>&);
     void updateQueueObservers(const boost::shared_ptr<broker::Queue>&);
     void updateObserver(const boost::shared_ptr<broker::Queue>&, boost::shared_ptr<broker::QueueObserver>);
+    void updateDtxManager();
+    void updateDtxBuffer(const boost::shared_ptr<broker::DtxBuffer>& );
+    void updateDtxWorkRecord(const broker::DtxWorkRecord&);
 
 
     Numbering<broker::SemanticState::ConsumerImpl*> consumerNumbering;

Propchange: qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/UpdateClient.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:13:18 2011
@@ -1,2 +1,2 @@
 /qpid/branches/qpid-2935/qpid/cpp/src/qpid/cluster/UpdateClient.h:1061302-1072333
-/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h:1156188
+/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h:1144319-1172654

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/UpdateExchange.cpp Mon Sep 19 15:13:18 2011
@@ -49,18 +49,18 @@ void UpdateExchange::setProperties(const
 
     // Copy expiration from x-property if present.
     if (msg->hasProperties<MessageProperties>()) {
-        MessageProperties* mprops = msg->getProperties<MessageProperties>();
+        const MessageProperties* mprops = msg->getProperties<MessageProperties>();
         if (mprops->hasApplicationHeaders()) {
-            FieldTable& headers = mprops->getApplicationHeaders();
+            const FieldTable& headers = mprops->getApplicationHeaders();
             if (headers.isSet(UpdateClient::X_QPID_EXPIRATION)) {
                 msg->setExpiration(
                     sys::AbsTime(sys::EPOCH, headers.getAsInt64(UpdateClient::X_QPID_EXPIRATION)));
-                headers.erase(UpdateClient::X_QPID_EXPIRATION);
+                msg->removeCustomProperty(UpdateClient::X_QPID_EXPIRATION);
                 // Erase props/headers that were added by the UpdateClient
                 if (headers.isSet(UpdateClient::X_QPID_NO_MESSAGE_PROPS))
                     msg->eraseProperties<MessageProperties>();
                 else if (headers.isSet(UpdateClient::X_QPID_NO_HEADERS))
-                    mprops->clearApplicationHeadersFlag();
+                    msg->clearApplicationHeadersFlag();
             }
         }
     }

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/UpdateReceiver.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/UpdateReceiver.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/UpdateReceiver.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/cluster/UpdateReceiver.h Mon Sep 19 15:13:18 2011
@@ -39,6 +39,20 @@ class UpdateReceiver {
 
     /** Management-id for the next shadow connection */
     std::string nextShadowMgmtId;
+
+    /** Record the position of a DtxBuffer in the DtxManager (xid + index)
+     * and the association with a session, either suspended or current.
+     */
+    struct DtxBufferRef {
+        std::string xid;
+        uint32_t index;         // Index in WorkRecord in DtxManager
+        bool suspended;         // Is this a suspended or current transaction?
+        broker::SemanticState* semanticState; // Associated session
+        DtxBufferRef(const std::string& x, uint32_t i, bool s, broker::SemanticState* ss)
+            : xid(x), index(i), suspended(s), semanticState(ss) {}
+    };
+    typedef std::vector<DtxBufferRef> DtxBuffers;
+    DtxBuffers dtxBuffers;
 };
 }} // namespace qpid::cluster
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/framing/AMQFrame.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/framing/AMQFrame.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/framing/AMQFrame.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/framing/AMQFrame.cpp Mon Sep 19 15:13:18 2011
@@ -139,6 +139,11 @@ bool AMQFrame::decode(Buffer& buffer)
     return true;
 }
 
+void AMQFrame::cloneBody()
+{
+    body = body->clone();
+}
+
 std::ostream& operator<<(std::ostream& out, const AMQFrame& f)
 {
     return

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/framing/AMQFrame.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/framing/AMQFrame.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/framing/AMQFrame.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/framing/AMQFrame.h Mon Sep 19 15:13:18 2011
@@ -59,6 +59,11 @@ class QPID_COMMON_CLASS_EXTERN AMQFrame 
         return boost::polymorphic_downcast<const T*>(getBody());
     }
 
+    /**
+     * Take a deep copy of the body currently referenced
+     */
+    QPID_COMMON_EXTERN void cloneBody();
+
     QPID_COMMON_EXTERN void encode(Buffer& buffer) const; 
     QPID_COMMON_EXTERN bool decode(Buffer& buffer); 
     QPID_COMMON_EXTERN uint32_t encodedSize() const;

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/framing/FieldTable.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/framing/FieldTable.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/framing/FieldTable.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/framing/FieldTable.cpp Mon Sep 19 15:13:18 2011
@@ -198,10 +198,12 @@ void FieldTable::encode(Buffer& buffer) 
 
 void FieldTable::decode(Buffer& buffer){
     clear();
+    if (buffer.available() < 4)
+        throw IllegalArgumentException(QPID_MSG("Not enough data for field table."));
     uint32_t len = buffer.getLong();
     if (len) {
         uint32_t available = buffer.available();
-        if (available < len)
+        if ((available < len) || (available < 4))
             throw IllegalArgumentException(QPID_MSG("Not enough data for field table."));
         uint32_t count = buffer.getLong();
         uint32_t leftover = available - len;

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/framing/List.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/framing/List.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/framing/List.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/framing/List.cpp Mon Sep 19 15:13:18 2011
@@ -49,6 +49,9 @@ void List::encode(Buffer& buffer) const
 void List::decode(Buffer& buffer)
 {
     values.clear();
+    if (buffer.available() < 4)
+        throw IllegalArgumentException(QPID_MSG("Not enough data for list, expected at least "
+                                                " 4 bytes but only " << buffer.available() << " available"));
     uint32_t size = buffer.getLong();
     uint32_t available = buffer.available();
     if (available < size) {
@@ -56,6 +59,9 @@ void List::decode(Buffer& buffer)
                                                 << size << " bytes but only " << available << " available"));
     }
     if (size) {
+        if (buffer.available() < 4)
+            throw IllegalArgumentException(QPID_MSG("Not enough data for list, expected at least "
+                                                    " 4 bytes but only " << buffer.available() << " available"));
         uint32_t count = buffer.getLong();        
         for (uint32_t i = 0; i < count; i++) {
             ValuePtr value(new FieldValue);

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/log/Statement.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/log/Statement.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/log/Statement.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/log/Statement.cpp Mon Sep 19 15:13:18 2011
@@ -27,8 +27,6 @@ namespace qpid {
 namespace log {
 
 namespace {
-using namespace std;
-
 struct NonPrint { bool operator()(unsigned char c) { return !isprint(c) && !isspace(c); } };
 
 const char hex[] = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F' };
@@ -39,7 +37,7 @@ std::string quote(const std::string& str
     if (n==0) return str;
     std::string ret;
     ret.reserve(str.size()+2*n); // Avoid extra allocations.
-    for (string::const_iterator i = str.begin(); i != str.end(); ++i) {
+    for (std::string::const_iterator i = str.begin(); i != str.end(); ++i) {
         if (nonPrint(*i)) {
             ret.push_back('\\');
             ret.push_back('x');
@@ -50,7 +48,6 @@ std::string quote(const std::string& str
     }
     return ret;
 }
-
 }
 
 void Statement::log(const std::string& message) {

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp Mon Sep 19 15:13:18 2011
@@ -180,7 +180,7 @@ qpid::log::SinkOptions& SinkOptions::ope
 }
 
 void SinkOptions::detached(void) {
-    if (logToStderr && !logToStdout && !logToSyslog) {
+    if (logToStderr && !logToStdout && !logToSyslog && logFile.empty()) {
         logToStderr = false;
         logToSyslog = true;
     }

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/management/ManagementAgent.cpp Mon Sep 19 15:13:18 2011
@@ -614,7 +614,7 @@ void ManagementAgent::sendBufferLH(const
     props->setAppId("qmf2");
 
     for (i = headers.begin(); i != headers.end(); ++i) {
-        msg->getOrInsertHeaders().setString(i->first, i->second.asString());
+        msg->insertCustomProperty(i->first, i->second.asString());
     }
 
     DeliveryProperties* dp =

Propchange: qpid/branches/qpid-3346/qpid/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:13:18 2011
@@ -1,2 +1,2 @@
 /qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.cpp:1061302-1072333
-/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:1156188
+/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp:1144319-1172654

Propchange: qpid/branches/qpid-3346/qpid/cpp/src/qpid/management/ManagementAgent.h
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:13:18 2011
@@ -1,2 +1,2 @@
 /qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.h:1061302-1072333
-/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:1156188
+/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h:1144319-1172654

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/messaging/AddressParser.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/messaging/AddressParser.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/messaging/AddressParser.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/messaging/AddressParser.cpp Mon Sep 19 15:13:18 2011
@@ -201,7 +201,8 @@ bool AddressParser::readSimpleValue(Vari
 {
     std::string s;
     if (readWord(s)) {
-        value.parse(s);        
+        value.parse(s);
+        if (value.getType() == VAR_STRING) value.setEncoding("utf8");
         return true;
     } else {
         return false;

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/messaging/Message.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/messaging/Message.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/messaging/Message.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/messaging/Message.cpp Mon Sep 19 15:13:18 2011
@@ -21,6 +21,7 @@
 #include "qpid/messaging/Message.h"
 #include "qpid/messaging/MessageImpl.h"
 #include "qpid/amqp_0_10/Codecs.h"
+#include <qpid/Exception.h>
 #include <boost/format.hpp>
 
 namespace qpid {
@@ -115,7 +116,11 @@ template <class C> struct MessageCodec
     static void decode(const Message& message, typename C::ObjectType& object, const std::string& encoding)
     {
         checkEncoding(message, encoding);
-        C::decode(message.getContent(), object);
+        try {
+            C::decode(message.getContent(), object);
+        } catch (const qpid::Exception &ex) {
+            throw EncodingException(ex.what());
+        }
     }
 
     static void encode(const typename C::ObjectType& map, Message& message, const std::string& encoding)

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/replication/ReplicatingEventListener.cpp Mon Sep 19 15:13:18 2011
@@ -69,10 +69,9 @@ void ReplicatingEventListener::deliverDe
 void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueued)
 {
     boost::intrusive_ptr<Message> msg(cloneMessage(*(enqueued.queue), enqueued.payload));
-    FieldTable& headers = msg->getProperties<MessageProperties>()->getApplicationHeaders();
-    headers.setString(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
-    headers.setInt(REPLICATION_EVENT_TYPE, ENQUEUE);
-    headers.setInt(QUEUE_MESSAGE_POSITION,enqueued.position);
+    msg->insertCustomProperty(REPLICATION_TARGET_QUEUE, enqueued.queue->getName());
+    msg->insertCustomProperty(REPLICATION_EVENT_TYPE, ENQUEUE);
+    msg->insertCustomProperty(QUEUE_MESSAGE_POSITION,enqueued.position);
     route(msg);
 }
 

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/replication/ReplicationExchange.cpp Mon Sep 19 15:13:18 2011
@@ -97,11 +97,10 @@ void ReplicationExchange::handleEnqueueE
         } else {
             queue->setPosition(seqno1);  
 
-            FieldTable& headers = msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders();
-            headers.erase(REPLICATION_TARGET_QUEUE);
-            headers.erase(REPLICATION_EVENT_SEQNO);
-            headers.erase(REPLICATION_EVENT_TYPE);
-            headers.erase(QUEUE_MESSAGE_POSITION);
+            msg.getMessage().removeCustomProperty(REPLICATION_TARGET_QUEUE);
+            msg.getMessage().removeCustomProperty(REPLICATION_EVENT_SEQNO);
+            msg.getMessage().removeCustomProperty(REPLICATION_EVENT_TYPE);
+            msg.getMessage().removeCustomProperty(QUEUE_MESSAGE_POSITION);
             msg.deliverTo(queue);
             QPID_LOG(debug, "Enqueued replicated message onto " << queueName);
             if (mgmtExchange != 0) {

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/store/StorageProvider.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/store/StorageProvider.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/store/StorageProvider.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/store/StorageProvider.h Mon Sep 19 15:13:18 2011
@@ -54,7 +54,7 @@ struct QueueEntry {
     QueueEntry(uint64_t id, TplStatus tpl = NONE, const std::string& x = "")
         : queueId(id), tplStatus(tpl), xid(x) {}
 
-    bool operator==(const QueueEntry& rhs) {
+    bool operator==(const QueueEntry& rhs) const {
         if (queueId != rhs.queueId) return false;
         if (tplStatus == NONE && rhs.tplStatus == NONE) return true;
         return xid == rhs.xid;

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/CopyOnWriteArray.h Mon Sep 19 15:13:18 2011
@@ -43,6 +43,12 @@ public:
     CopyOnWriteArray() {}
     CopyOnWriteArray(const CopyOnWriteArray& c) : array(c.array) {}
 
+    bool empty()
+    {
+        Mutex::ScopedLock l(lock);
+        return array ? array->empty() : true;
+    }
+
     void add(T& t)
     {
         Mutex::ScopedLock l(lock);

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/PollableQueue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/PollableQueue.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/PollableQueue.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/PollableQueue.h Mon Sep 19 15:13:18 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,7 +28,8 @@
 #include <boost/function.hpp>
 #include <boost/bind.hpp>
 #include <algorithm>
-#include <vector>
+#include <deque>
+#include "qpid/log/Statement.h" // FIXME aconway 2011-08-05:
 
 namespace qpid {
 namespace sys {
@@ -44,7 +45,7 @@ class Poller;
 template <class T>
 class PollableQueue {
   public:
-    typedef std::vector<T> Batch;
+    typedef std::deque<T> Batch;
     typedef T value_type;
 
     /**
@@ -68,11 +69,11 @@ class PollableQueue {
                   const boost::shared_ptr<sys::Poller>& poller);
 
     ~PollableQueue();
-    
+
     /** Push a value onto the queue. Thread safe */
     void push(const T& t);
 
-    /** Start polling. */ 
+    /** Start polling. */
     void start();
 
     /** Stop polling and wait for the current callback, if any, to complete. */
@@ -90,14 +91,14 @@ class PollableQueue {
      * ensure clean shutdown with no events left on the queue.
      */
     void shutdown();
-    
+
   private:
     typedef sys::Monitor::ScopedLock ScopedLock;
     typedef sys::Monitor::ScopedUnlock ScopedUnlock;
 
     void dispatch(PollableCondition& cond);
     void process();
-    
+
     mutable sys::Monitor lock;
     Callback callback;
     PollableCondition condition;
@@ -107,7 +108,7 @@ class PollableQueue {
 };
 
 template <class T> PollableQueue<T>::PollableQueue(
-    const Callback& cb, const boost::shared_ptr<sys::Poller>& p) 
+    const Callback& cb, const boost::shared_ptr<sys::Poller>& p)
   : callback(cb),
     condition(boost::bind(&PollableQueue<T>::dispatch, this, _1), p),
     stopped(true)
@@ -151,7 +152,7 @@ template <class T> void PollableQueue<T>
             putBack = callback(batch);
         }
         // put back unprocessed items.
-        queue.insert(queue.begin(), putBack, typename Batch::const_iterator(batch.end())); 
+        queue.insert(queue.begin(), putBack, typename Batch::const_iterator(batch.end()));
         batch.clear();
     }
 }

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/Socket.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/Socket.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/Socket.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/Socket.h Mon Sep 19 15:13:18 2011
@@ -39,6 +39,9 @@ public:
     /** Create a socket wrapper for descriptor. */
     QPID_COMMON_EXTERN Socket();
 
+    /** Create a new Socket which is the same address family as this one */
+    QPID_COMMON_EXTERN Socket* createSameTypeSocket() const;
+
     /** Set socket non blocking */
     void setNonblocking() const;
 
@@ -77,7 +80,7 @@ public:
      * Returns the error code stored in the socket.  This may be used
      * to determine the result of a non-blocking connect.
      */
-    int getError() const;
+    QPID_COMMON_EXTERN int getError() const;
 
     /** Accept a connection from a socket that is already listening
      * and has an incoming connection
@@ -92,7 +95,9 @@ private:
     /** Create socket */
     void createSocket(const SocketAddress&) const;
 
+    /** Construct socket with existing handle */
     Socket(IOHandlePrivate*);
+
     mutable std::string localname;
     mutable std::string peername;
     mutable bool nonblocking;

Modified: qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/SocketAddress.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/SocketAddress.h?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/SocketAddress.h (original)
+++ qpid/branches/qpid-3346/qpid/cpp/src/qpid/sys/SocketAddress.h Mon Sep 19 15:13:18 2011
@@ -27,6 +27,7 @@
 #include <string>
 
 struct addrinfo;
+struct sockaddr;
 
 namespace qpid {
 namespace sys {
@@ -41,12 +42,19 @@ public:
     QPID_COMMON_EXTERN SocketAddress& operator=(const SocketAddress&);
     QPID_COMMON_EXTERN ~SocketAddress();
 
-    std::string asString(bool numeric=true) const;
+    QPID_COMMON_EXTERN bool nextAddress();
+    QPID_COMMON_EXTERN std::string asString(bool numeric=true) const;
+    QPID_COMMON_EXTERN void setAddrInfoPort(uint16_t port);
+
+    QPID_COMMON_EXTERN static std::string asString(::sockaddr const * const addr, size_t addrlen);
+    QPID_COMMON_EXTERN static uint16_t getPort(::sockaddr const * const addr);
+    
 
 private:
     std::string host;
     std::string port;
     mutable ::addrinfo* addrInfo;
+    mutable ::addrinfo* currentAddrInfo;
 };
 
 }}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message