qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kw...@apache.org
Subject svn commit: r1663719 [2/2] - in /qpid/branches/QPID-6262-JavaBrokerNIO: ./ qpid/ qpid/cpp/ qpid/cpp/CMakeModules/ qpid/cpp/include/qpid/types/ qpid/cpp/src/ qpid/cpp/src/qpid/ qpid/cpp/src/qpid/amqp/ qpid/cpp/src/qpid/broker/ qpid/cpp/src/qpid/broker/a...
Date Tue, 03 Mar 2015 14:58:03 GMT
Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/PnData.cpp Tue Mar  3 14:58:01 2015
@@ -20,34 +20,53 @@
  */
 #include "PnData.h"
 #include "qpid/types/encodings.h"
+#include "qpid/log/Statement.h"
 
 namespace qpid {
 namespace messaging {
 namespace amqp {
 
 using types::Variant;
+using namespace types::encodings;
 
-void PnData::write(const Variant::Map& map)
+// TODO aconway 2014-11-20: PnData duplicates functionality of qpid::amqp::Encoder,Decoder.
+// Collapse them all into a single proton-based codec.
+
+void PnData::put(const Variant::Map& map)
 {
     pn_data_put_map(data);
     pn_data_enter(data);
     for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
-        pn_data_put_string(data, str(i->first));
-        write(i->second);
+        pn_data_put_string(data, bytes(i->first));
+        put(i->second);
     }
     pn_data_exit(data);
 }
-void PnData::write(const Variant::List& list)
+
+void PnData::put(const Variant::List& list)
 {
     pn_data_put_list(data);
     pn_data_enter(data);
     for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
-        write(*i);
+        put(*i);
     }
     pn_data_exit(data);
 }
-void PnData::write(const Variant& value)
+
+void PnData::put(const Variant& value)
 {
+    // Open data descriptors associated with the value.
+    const Variant::List& descriptors = value.getDescriptors();
+    for (Variant::List::const_iterator i = descriptors.begin(); i != descriptors.end(); ++i) {
+        pn_data_put_described(data);
+        pn_data_enter(data);
+        if (i->getType() == types::VAR_STRING)
+            pn_data_put_symbol(data, bytes(i->asString()));
+        else
+            pn_data_put_ulong(data, i->asUint64());
+    }
+
+    // Put the variant value
     switch (value.getType()) {
       case qpid::types::VAR_VOID:
         pn_data_put_null(data);
@@ -65,61 +84,70 @@ void PnData::write(const Variant& value)
         pn_data_put_double(data, value.asDouble());
         break;
       case qpid::types::VAR_STRING:
-        pn_data_put_string(data, str(value.asString()));
+        if (value.getEncoding() == ASCII)
+            pn_data_put_symbol(data, bytes(value.asString()));
+        else if (value.getEncoding() == BINARY)
+            pn_data_put_binary(data, bytes(value.asString()));
+        else
+            pn_data_put_string(data, bytes(value.asString()));
         break;
       case qpid::types::VAR_MAP:
-        write(value.asMap());
+        put(value.asMap());
         break;
       case qpid::types::VAR_LIST:
-        write(value.asList());
+        put(value.asList());
         break;
       default:
         break;
     }
+
+    // Close any descriptors.
+    for (Variant::List::const_iterator i = descriptors.begin(); i != descriptors.end(); ++i)
+        pn_data_exit(data);
 }
 
-bool PnData::read(qpid::types::Variant& value)
+bool PnData::get(qpid::types::Variant& value)
 {
-    return read(pn_data_type(data), value);
+    return get(pn_data_type(data), value);
 }
 
-void PnData::readList(qpid::types::Variant::List& value)
+void PnData::getList(qpid::types::Variant::List& value)
 {
     size_t count = pn_data_get_list(data);
     pn_data_enter(data);
     for (size_t i = 0; i < count && pn_data_next(data); ++i) {
         qpid::types::Variant e;
-        if (read(e)) value.push_back(e);
+        if (get(e)) value.push_back(e);
     }
     pn_data_exit(data);
 }
 
-void PnData::readMap(qpid::types::Variant::Map& value)
+void PnData::getMap(qpid::types::Variant::Map& value)
 {
     size_t count = pn_data_get_list(data);
     pn_data_enter(data);
     for (size_t i = 0; i < (count/2) && pn_data_next(data); ++i) {
-        std::string key = str(pn_data_get_symbol(data));
+        std::string key = string(pn_data_get_symbol(data));
         pn_data_next(data);
         qpid::types::Variant e;
-        if (read(e)) value[key]= e;
+        if (get(e)) value[key]= e;
     }
     pn_data_exit(data);
 }
 
-void PnData::readArray(qpid::types::Variant::List& value)
+void PnData::getArray(qpid::types::Variant::List& value)
 {
     size_t count = pn_data_get_array(data);
     pn_type_t type = pn_data_get_array_type(data);
     pn_data_enter(data);
     for (size_t i = 0; i < count && pn_data_next(data); ++i) {
         qpid::types::Variant e;
-        if (read(type, e)) value.push_back(e);
+        if (get(type, e)) value.push_back(e);
     }
     pn_data_exit(data);
 }
 
-bool PnData::read(pn_type_t type, qpid::types::Variant& value)
+bool PnData::get(pn_type_t type, qpid::types::Variant& value)
 {
     switch (type) {
       case PN_NULL:
@@ -168,41 +196,41 @@ bool PnData::read(pn_type_t type, qpid::
         value = qpid::types::Uuid(pn_data_get_uuid(data).bytes);
         return true;
       case PN_BINARY:
-        value = str(pn_data_get_binary(data));
+        value = string(pn_data_get_binary(data));
         value.setEncoding(qpid::types::encodings::BINARY);
         return true;
       case PN_STRING:
-        value = str(pn_data_get_string(data));
+        value = string(pn_data_get_string(data));
         value.setEncoding(qpid::types::encodings::UTF8);
         return true;
       case PN_SYMBOL:
-        value = str(pn_data_get_string(data));
+        value = string(pn_data_get_string(data));
         value.setEncoding(qpid::types::encodings::ASCII);
         return true;
       case PN_LIST:
         value = qpid::types::Variant::List();
-        readList(value.asList());
+        getList(value.asList());
         return true;
         break;
       case PN_MAP:
         value = qpid::types::Variant::Map();
-        readMap(value.asMap());
+        getMap(value.asMap());
         return true;
       case PN_ARRAY:
         value = qpid::types::Variant::List();
-        readArray(value.asList());
+        getArray(value.asList());
         return true;
       case PN_DESCRIBED:
+        // TODO aconway 2014-11-20: get described values.
       case PN_DECIMAL32:
       case PN_DECIMAL64:
       case PN_DECIMAL128:
       default:
         return false;
     }
-
 }
 
-pn_bytes_t PnData::str(const std::string& s)
+pn_bytes_t PnData::bytes(const std::string& s)
 {
     pn_bytes_t result;
     result.start = const_cast<char*>(s.data());
@@ -210,7 +238,7 @@ pn_bytes_t PnData::str(const std::string
     return result;
 }
 
-std::string PnData::str(const pn_bytes_t& in)
+std::string PnData::string(const pn_bytes_t& in)
 {
     return std::string(in.start, in.size);
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/PnData.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/PnData.h?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/PnData.h (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/PnData.h Tue Mar  3 14:58:01 2015
@@ -32,28 +32,29 @@ namespace messaging {
 namespace amqp {
 
 /**
- *  Helper class to read/write messaging types to/from pn_data_t.
+ *  Helper class to put/get messaging types to/from pn_data_t.
  */
 class PnData
 {
   public:
-    PnData(pn_data_t* d) : data(d) {}
+    pn_data_t* data;
 
-    void write(const types::Variant& value);
-    void write(const types::Variant::Map& map);
-    void write(const types::Variant::List& list);
-
-    bool read(pn_type_t type, types::Variant& value);
-    bool read(types::Variant& value);
-    void readList(types::Variant::List& value);
-    void readMap(types::Variant::Map& value);
-    void readArray(types::Variant::List& value);
+    PnData(pn_data_t* d) : data(d) {}
 
-    static pn_bytes_t str(const std::string&);
-    static std::string str(const pn_bytes_t&);
+    void put(const types::Variant& value);
+    void put(const types::Variant::Map& map);
+    void put(const types::Variant::List& list);
+    void put(int32_t n) { pn_data_put_int(data, n); }
+    void putSymbol(const std::string& symbol) { pn_data_put_symbol(data, bytes(symbol)); }
+
+    bool get(pn_type_t type, types::Variant& value);
+    bool get(types::Variant& value);
+    void getList(types::Variant::List& value);
+    void getMap(types::Variant::Map& value);
+    void getArray(types::Variant::List& value);
 
-  private:
-    pn_data_t* data;
+    static pn_bytes_t bytes(const std::string&);
+    static std::string string(const pn_bytes_t&);
 };
 }}} // namespace messaging::amqp
 

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp Tue Mar  3 14:58:01 2015
@@ -37,9 +37,10 @@ ReceiverContext::ReceiverContext(pn_sess
     helper(address),
     receiver(pn_receiver(session, name.c_str())),
     capacity(0), used(0) {}
+
 ReceiverContext::~ReceiverContext()
 {
-    pn_link_free(receiver);
+    if (receiver) pn_link_free(receiver);
 }
 
 void ReceiverContext::setCapacity(uint32_t c)
@@ -63,12 +64,13 @@ uint32_t ReceiverContext::getAvailable()
 
 uint32_t ReceiverContext::getUnsettled()
 {
+    assert(pn_link_unsettled(receiver) >= pn_link_queued(receiver));
     return pn_link_unsettled(receiver) - pn_link_queued(receiver);
 }
 
 void ReceiverContext::close()
 {
-    pn_link_close(receiver);
+    if (receiver) pn_link_close(receiver);
 }
 
 const std::string& ReceiverContext::getName() const
@@ -96,7 +98,7 @@ void ReceiverContext::verify()
 }
 void ReceiverContext::configure()
 {
-    configure(pn_link_source(receiver));
+    if (receiver) configure(pn_link_source(receiver));
 }
 void ReceiverContext::configure(pn_terminus_t* source)
 {
@@ -116,13 +118,13 @@ Address ReceiverContext::getAddress() co
 
 void ReceiverContext::reset(pn_session_t* session)
 {
-    receiver = pn_receiver(session, name.c_str());
-    configure();
+    receiver = session ? pn_receiver(session, name.c_str()) : 0;
+    if (receiver) configure();
 }
 
 bool ReceiverContext::hasCurrent()
 {
-    return pn_link_current(receiver);
+    return receiver &&  pn_link_current(receiver);
 }
 
 bool ReceiverContext::wakeupToIssueCredit()

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Tue Mar  3 14:58:01 2015
@@ -18,8 +18,10 @@
  * under the License.
  *
  */
-#include "qpid/messaging/amqp/SenderContext.h"
-#include "qpid/messaging/amqp/EncodedMessage.h"
+#include "SenderContext.h"
+#include "Transaction.h"
+#include "EncodedMessage.h"
+#include "PnData.h"
 #include "qpid/messaging/AddressImpl.h"
 #include "qpid/messaging/exceptions.h"
 #include "qpid/Exception.h"
@@ -40,22 +42,29 @@ extern "C" {
 namespace qpid {
 namespace messaging {
 namespace amqp {
+
 //TODO: proper conversion to wide string for address
-SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a, bool setToOnSend_)
-  : name(n),
+SenderContext::SenderContext(pn_session_t* session, const std::string& n,
+                             const qpid::messaging::Address& a,
+                             bool setToOnSend_,
+                             const CoordinatorPtr& coord)
+  : sender(pn_sender(session, n.c_str())),
+    name(n),
     address(a),
     helper(address),
-    sender(pn_sender(session, n.c_str())), nextId(0), capacity(50), unreliable(helper.isUnreliable()),
-    setToOnSend(setToOnSend_) {}
+    nextId(0), capacity(50), unreliable(helper.isUnreliable()),
+    setToOnSend(setToOnSend_),
+    transaction(coord)
+{}
 
 SenderContext::~SenderContext()
 {
-    pn_link_free(sender);
+    if (sender) pn_link_free(sender);
 }
 
 void SenderContext::close()
 {
-    pn_link_close(sender);
+    if (sender) pn_link_close(sender);
 }
 
 void SenderContext::setCapacity(uint32_t c)
@@ -88,10 +97,13 @@ bool SenderContext::send(const qpid::mes
 {
     resend();//if there are any messages needing to be resent at the front of the queue, send them first
     if (processUnsettled(false) < capacity && pn_link_credit(sender)) {
+        types::Variant state;
+        if (transaction)
+            state = transaction->getSendState();
         if (unreliable) {
             Delivery delivery(nextId++);
             delivery.encode(MessageImplAccess::get(message), address, setToOnSend);
-            delivery.send(sender, unreliable);
+            delivery.send(sender, unreliable, state);
             *out = 0;
             return true;
         } else {
@@ -99,7 +111,7 @@ bool SenderContext::send(const qpid::mes
             try {
                 Delivery& delivery = deliveries.back();
                 delivery.encode(MessageImplAccess::get(message), address, setToOnSend);
-                delivery.send(sender, unreliable);
+                delivery.send(sender, unreliable, state);
                 *out = &delivery;
                 return true;
             } catch (const std::exception& e) {
@@ -507,7 +519,8 @@ void SenderContext::Delivery::encode(con
         throw SendError(e.what());
     }
 }
-void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable)
+
+void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable, const types::Variant& state)
 {
     pn_delivery_tag_t tag;
     tag.size = sizeof(id);
@@ -517,6 +530,11 @@ void SenderContext::Delivery::send(pn_li
     tag.bytes = reinterpret_cast<const char*>(&id);
 #endif
     token = pn_delivery(sender, tag);
+    if (!state.isVoid()) {      // Add transaction state
+        PnData data(pn_disposition_data(pn_delivery_local(token)));
+        data.put(state);
+        pn_delivery_update(token, qpid::amqp::transaction::TRANSACTIONAL_STATE_CODE);
+    }
     pn_link_send(sender, encoded.getData(), encoded.getSize());
     if (unreliable) {
         pn_delivery_settle(token);
@@ -551,6 +569,15 @@ bool SenderContext::Delivery::rejected()
 {
     return pn_delivery_remote_state(token) == PN_REJECTED;
 }
+
+std::string SenderContext::Delivery::error()
+{
+    pn_condition_t *condition = pn_disposition_condition(pn_delivery_remote(token));
+    return (condition && pn_condition_is_set(condition)) ?
+        Msg() << pn_condition_get_name(condition) << ": " << pn_condition_get_description(condition) :
+        std::string();
+}
+
 void SenderContext::Delivery::settle()
 {
     pn_delivery_settle(token);
@@ -570,10 +597,12 @@ void SenderContext::verify()
 
     helper.checkAssertion(target, AddressHelper::FOR_SENDER);
 }
+
 void SenderContext::configure()
 {
-    configure(pn_link_target(sender));
+    if (sender) configure(pn_link_target(sender));
 }
+
 void SenderContext::configure(pn_terminus_t* target)
 {
     helper.configure(sender, target, AddressHelper::FOR_SENDER);
@@ -603,12 +632,10 @@ Address SenderContext::getAddress() cons
 
 void SenderContext::reset(pn_session_t* session)
 {
-    sender = pn_sender(session, name.c_str());
-    configure();
-
-    for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); ++i) {
+    sender = session ? pn_sender(session, name.c_str()) : 0;
+    if (sender) configure();
+    for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); ++i)
         i->reset();
-    }
 }
 
 void SenderContext::resend()

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h Tue Mar  3 14:58:01 2015
@@ -24,6 +24,7 @@
 #include <deque>
 #include <string>
 #include <vector>
+#include <boost/shared_ptr.hpp>
 #include "qpid/sys/IntegerTypes.h"
 #include "qpid/messaging/Address.h"
 #include "qpid/messaging/amqp/AddressHelper.h"
@@ -41,9 +42,10 @@ class Message;
 class MessageImpl;
 
 namespace amqp {
-/**
- *
- */
+
+class Transaction;
+
+
 class SenderContext
 {
   public:
@@ -52,13 +54,15 @@ class SenderContext
       public:
         Delivery(int32_t id);
         void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&, bool setToField);
-        void send(pn_link_t*, bool unreliable);
+        void send(pn_link_t*, bool unreliable, const types::Variant& state=types::Variant());
         bool delivered();
         bool accepted();
         bool rejected();
         void settle();
         void reset();
         bool sent() const;
+        pn_delivery_t* getToken() const { return token; }
+        std::string error();
       private:
         int32_t id;
         pn_delivery_t* token;
@@ -66,22 +70,32 @@ class SenderContext
         bool presettled;
     };
 
-    SenderContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& target, bool setToOnSend);
+    typedef boost::shared_ptr<Transaction> CoordinatorPtr;
+
+    SenderContext(pn_session_t* session, const std::string& name,
+                  const qpid::messaging::Address& target,
+                  bool setToOnSend,
+                  const CoordinatorPtr& transaction = CoordinatorPtr());
     ~SenderContext();
-    void reset(pn_session_t* session);
-    void close();
-    void setCapacity(uint32_t);
-    uint32_t getCapacity();
-    uint32_t getUnsettled();
-    const std::string& getName() const;
-    const std::string& getTarget() const;
-    bool send(const qpid::messaging::Message& message, Delivery**);
-    void configure();
-    void verify();
-    void check();
-    bool settled();
-    bool closed();
-    Address getAddress() const;
+
+    virtual void reset(pn_session_t* session);
+    virtual void close();
+    virtual void setCapacity(uint32_t);
+    virtual uint32_t getCapacity();
+    virtual uint32_t getUnsettled();
+    virtual const std::string& getName() const;
+    virtual const std::string& getTarget() const;
+    virtual bool send(const qpid::messaging::Message& message, Delivery**);
+    virtual void configure();
+    virtual void verify();
+    virtual void check();
+    virtual bool settled();
+    virtual bool closed();
+    virtual Address getAddress() const;
+
+  protected:
+    pn_link_t* sender;
+
   private:
     friend class ConnectionContext;
     typedef std::deque<Delivery> Deliveries;
@@ -89,12 +103,12 @@ class SenderContext
     const std::string name;
     qpid::messaging::Address address;
     AddressHelper helper;
-    pn_link_t* sender;
     int32_t nextId;
     Deliveries deliveries;
     uint32_t capacity;
     bool unreliable;
     bool setToOnSend;
+    boost::shared_ptr<Transaction> transaction;
 
     uint32_t processUnsettled(bool silent);
     void configure(pn_terminus_t*);

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SenderHandle.cpp Tue Mar  3 14:58:01 2015
@@ -39,7 +39,8 @@ SenderHandle::SenderHandle(boost::shared
 
 void SenderHandle::send(const Message& message, bool sync)
 {
-    connection->send(session, sender, message, sync);
+    SenderContext::Delivery* d = 0;
+    connection->send(session, sender, message, sync, &d);
 }
 
 void SenderHandle::close()

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp Tue Mar  3 14:58:01 2015
@@ -21,11 +21,15 @@
 #include "SessionContext.h"
 #include "SenderContext.h"
 #include "ReceiverContext.h"
+#include "Transaction.h"
+#include "PnData.h"
 #include <boost/format.hpp>
 #include "qpid/messaging/Address.h"
 #include "qpid/messaging/Duration.h"
 #include "qpid/messaging/exceptions.h"
 #include "qpid/log/Statement.h"
+#include "qpid/amqp/descriptors.h"
+
 extern "C" {
 #include <proton/engine.h>
 }
@@ -35,23 +39,32 @@ namespace messaging {
 namespace amqp {
 
 SessionContext::SessionContext(pn_connection_t* connection) : session(pn_session(connection)) {}
+
 SessionContext::~SessionContext()
 {
-    senders.clear(); receivers.clear();
-    pn_session_free(session);
+    // Clear all pointers to senders and receivers before we free the session.
+    senders.clear();
+    receivers.clear();
+    transaction.reset();        // Transaction is a sender.
+    if (!error && session)
+        pn_session_free(session);
 }
 
 boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address, bool setToOnSend)
 {
+    error.raise();
     std::string name = AddressHelper::getLinkName(address);
-    if (senders.find(name) != senders.end()) throw LinkError("Link name must be unique within the scope of the connection");
-    boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address, setToOnSend));
+    if (senders.find(name) != senders.end())
+        throw LinkError("Link name must be unique within the scope of the connection");
+    boost::shared_ptr<SenderContext> s(
+        new SenderContext(session, name, address, setToOnSend, transaction));
     senders[name] = s;
     return s;
 }
 
 boost::shared_ptr<ReceiverContext> SessionContext::createReceiver(const qpid::messaging::Address& address)
 {
+    error.raise();
     std::string name = AddressHelper::getLinkName(address);
     if (receivers.find(name) != receivers.end()) throw LinkError("Link name must be unique within the scope of the connection");
     boost::shared_ptr<ReceiverContext> r(new ReceiverContext(session, name, address));
@@ -61,6 +74,7 @@ boost::shared_ptr<ReceiverContext> Sessi
 
 boost::shared_ptr<SenderContext> SessionContext::getSender(const std::string& name) const
 {
+    error.raise();
     SenderMap::const_iterator i = senders.find(name);
     if (i == senders.end()) {
         throw qpid::messaging::KeyError(std::string("No such sender") + name);
@@ -71,6 +85,7 @@ boost::shared_ptr<SenderContext> Session
 
 boost::shared_ptr<ReceiverContext> SessionContext::getReceiver(const std::string& name) const
 {
+    error.raise();
     ReceiverMap::const_iterator i = receivers.find(name);
     if (i == receivers.end()) {
         throw qpid::messaging::KeyError(std::string("No such receiver") + name);
@@ -81,16 +96,19 @@ boost::shared_ptr<ReceiverContext> Sessi
 
 void SessionContext::removeReceiver(const std::string& n)
 {
+    error.raise();
     receivers.erase(n);
 }
 
 void SessionContext::removeSender(const std::string& n)
 {
+    error.raise();
     senders.erase(n);
 }
 
 boost::shared_ptr<ReceiverContext> SessionContext::nextReceiver()
 {
+    error.raise();
     for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) {
         if (i->second->hasCurrent()) {
             return i->second;
@@ -102,16 +120,19 @@ boost::shared_ptr<ReceiverContext> Sessi
 
 uint32_t SessionContext::getReceivable()
 {
+    error.raise();
     return 0;//TODO
 }
 
 uint32_t SessionContext::getUnsettledAcks()
 {
+    error.raise();
     return 0;//TODO
 }
 
 qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery)
 {
+    error.raise();
     qpid::framing::SequenceNumber id = next++;
     if (!pn_delivery_settled(delivery))
         unacked[id] = delivery;
@@ -121,22 +142,32 @@ qpid::framing::SequenceNumber SessionCon
 
 void SessionContext::acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end)
 {
+    error.raise();
     for (DeliveryMap::iterator i = begin; i != end; ++i) {
-        QPID_LOG(debug, "Setting disposition for delivery " << i->first << " -> " << i->second);
-        pn_delivery_update(i->second, PN_ACCEPTED);
-        pn_delivery_settle(i->second);//TODO: different settlement modes?
+        types::Variant txState;
+        if (transaction) {
+            QPID_LOG(trace, "Setting disposition for transactional delivery "
+                     << i->first << " -> " << i->second);
+            transaction->acknowledge(i->second);
+        } else {
+            QPID_LOG(trace, "Setting disposition for delivery " << i->first << " -> " << i->second);
+            pn_delivery_update(i->second, PN_ACCEPTED);
+            pn_delivery_settle(i->second); //TODO: different settlement modes?
+        }
     }
     unacked.erase(begin, end);
 }
 
 void SessionContext::acknowledge()
 {
+    error.raise();
     QPID_LOG(debug, "acknowledging all " << unacked.size() << " messages");
     acknowledge(unacked.begin(), unacked.end());
 }
 
 void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool cumulative)
 {
+    error.raise();
     QPID_LOG(debug, "acknowledging selected messages, id=" << id << ", cumulative=" << cumulative);
     DeliveryMap::iterator i = unacked.find(id);
     if (i != unacked.end()) {
@@ -149,6 +180,7 @@ void SessionContext::acknowledge(const q
 
 void SessionContext::nack(const qpid::framing::SequenceNumber& id, bool reject)
 {
+    error.raise();
     DeliveryMap::iterator i = unacked.find(id);
     if (i != unacked.end()) {
         if (reject) {
@@ -166,7 +198,9 @@ void SessionContext::nack(const qpid::fr
 
 bool SessionContext::settled()
 {
+    error.raise();
     bool result = true;
+
     for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
         try {
             if (!i->second->closed() && !i->second->settled()) result = false;
@@ -189,8 +223,25 @@ std::string SessionContext::getName() co
 
 void SessionContext::reset(pn_connection_t* connection)
 {
-    session = pn_session(connection);
     unacked.clear();
+    if (transaction) {
+        if (transaction->isCommitting())
+            error = new TransactionUnknown("Transaction outcome unknown: transport failure");
+        else
+            error = new TransactionAborted("Transaction aborted: transport failure");
+        resetSession(0);
+        senders.clear();
+        receivers.clear();
+        transaction.reset();
+        return;
+    }
+    resetSession(pn_session(connection));
+
+}
+
+void SessionContext::resetSession(pn_session_t* session_) {
+    session = session_;
+    if (transaction) transaction->reset(session);
     for (SessionContext::SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
         i->second->reset(session);
     }
@@ -198,4 +249,6 @@ void SessionContext::reset(pn_connection
         i->second->reset(session);
     }
 }
+
+
 }}} // namespace qpid::messaging::amqp

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h Tue Mar  3 14:58:01 2015
@@ -26,6 +26,7 @@
 #include <boost/shared_ptr.hpp>
 #include "qpid/sys/IntegerTypes.h"
 #include "qpid/framing/SequenceNumber.h"
+#include "qpid/sys/ExceptionHolder.h"
 
 struct pn_connection_t;
 struct pn_session_t;
@@ -42,6 +43,8 @@ namespace amqp {
 class ConnectionContext;
 class SenderContext;
 class ReceiverContext;
+class Transaction;
+
 /**
  *
  */
@@ -63,23 +66,29 @@ class SessionContext
     bool settled();
     void setName(const std::string&);
     std::string getName() const;
+
+    void nack(const qpid::framing::SequenceNumber& id, bool reject);
+
   private:
     friend class ConnectionContext;
     typedef std::map<std::string, boost::shared_ptr<SenderContext> > SenderMap;
     typedef std::map<std::string, boost::shared_ptr<ReceiverContext> > ReceiverMap;
     typedef std::map<qpid::framing::SequenceNumber, pn_delivery_t*> DeliveryMap;
+
     pn_session_t* session;
     SenderMap senders;
+    boost::shared_ptr<Transaction> transaction;
     ReceiverMap receivers;
     DeliveryMap unacked;
     qpid::framing::SequenceNumber next;
     std::string name;
+    sys::ExceptionHolder error;
 
     qpid::framing::SequenceNumber record(pn_delivery_t*);
     void acknowledge();
     void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative);
     void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end);
-    void nack(const qpid::framing::SequenceNumber& id, bool reject);
+    void resetSession(pn_session_t*);
 };
 }}} // namespace qpid::messaging::amqp
 

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp Tue Mar  3 14:58:01 2015
@@ -42,12 +42,12 @@ SessionHandle::SessionHandle(boost::shar
 
 void SessionHandle::commit()
 {
-
+    connection->commit(session);
 }
 
 void SessionHandle::rollback()
 {
-
+    connection->rollback(session);
 }
 
 void SessionHandle::acknowledge(bool /*sync*/)

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp Tue Mar  3 14:58:01 2015
@@ -36,17 +36,20 @@ namespace sys {
 struct ProtocolTimeoutTask : public sys::TimerTask {
     AsynchIOHandler& handler;
     std::string id;
+    Duration timeout;
 
-    ProtocolTimeoutTask(const std::string& i, const Duration& timeout, AsynchIOHandler& h) :
-        TimerTask(timeout, "ProtocolTimeout"),
+    ProtocolTimeoutTask(const std::string& i, const Duration& timeout_, AsynchIOHandler& h) :
+        TimerTask(timeout_, "ProtocolTimeout"),
         handler(h),
-        id(i)
+        id(i),
+        timeout(timeout_)
     {}
 
     void fire() {
         // If this fires it means that we didn't negotiate the connection in the timeout period
         // Schedule closing the connection for the io thread
-        QPID_LOG(error, "Connection " << id << " No protocol received closing");
+        QPID_LOG(error, "Connection " << id << " No protocol received after " << timeout
+                 << ", closing");
         handler.abort();
     }
 };

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/types/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/types/Variant.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/types/Variant.cpp (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/types/Variant.cpp Tue Mar  3 14:58:01 2015
@@ -43,21 +43,23 @@ class VariantImpl
 {
   public:
     VariantImpl();
-    VariantImpl(bool);
-    VariantImpl(uint8_t);
-    VariantImpl(uint16_t);
-    VariantImpl(uint32_t);
-    VariantImpl(uint64_t);
-    VariantImpl(int8_t);
-    VariantImpl(int16_t);
-    VariantImpl(int32_t);
-    VariantImpl(int64_t);
-    VariantImpl(float);
-    VariantImpl(double);
-    VariantImpl(const std::string&, const std::string& encoding=std::string());
-    VariantImpl(const Variant::Map&);
-    VariantImpl(const Variant::List&);
-    VariantImpl(const Uuid&);
+    void reset();
+    void set(bool);
+    void set(uint8_t);
+    void set(uint16_t);
+    void set(uint32_t);
+    void set(uint64_t);
+    void set(int8_t);
+    void set(int16_t);
+    void set(int32_t);
+    void set(int64_t);
+    void set(float);
+    void set(double);
+    void set(const std::string&, const std::string& encoding=std::string());
+    void set(const Variant::Map&);
+    void set(const Variant::List&);
+    void set(const Uuid&);
+    void set(const Variant&);
     ~VariantImpl();
 
     VariantType getType() const;
@@ -90,9 +92,10 @@ class VariantImpl
     bool isEqualTo(VariantImpl&) const;
     bool isEquivalentTo(VariantImpl&) const;
 
-    static VariantImpl* create(const Variant&);
+    Variant::List descriptors;         // Optional descriptors for described value.
+
   private:
-    const VariantType type;
+    VariantType type;
     union {
         bool b;
         uint8_t ui8;
@@ -110,7 +113,7 @@ class VariantImpl
         Variant::List* list;
         std::string* string;
     } value;
-    std::string encoding;//optional encoding for variable length data
+    std::string encoding;       // Optional encoding for variable length data.
 
   template<class T> T convertFromString() const
     {
@@ -136,26 +139,34 @@ class VariantImpl
 
 };
 
+VariantImpl::VariantImpl() : type(VAR_VOID) {}
+
+void VariantImpl::set(bool b) { reset(); type = VAR_BOOL; value.b = b; }
+void VariantImpl::set(uint8_t i) { reset(); type = VAR_UINT8; value.ui8 = i; }
+void VariantImpl::set(uint16_t i) { reset(); type = VAR_UINT16; value.ui16 = i; }
+void VariantImpl::set(uint32_t i) { reset(); type = VAR_UINT32; value.ui32 = i; }
+void VariantImpl::set(uint64_t i) { reset(); type = VAR_UINT64; value.ui64 = i; }
+void VariantImpl::set(int8_t i) { reset(); type = VAR_INT8; value.i8 = i; }
+void VariantImpl::set(int16_t i) { reset(); type = VAR_INT16; value.i16 = i; }
+void VariantImpl::set(int32_t i) { reset(); type = VAR_INT32; value.i32 = i; }
+void VariantImpl::set(int64_t i) { reset(); type = VAR_INT64; value.i64 = i; }
+void VariantImpl::set(float f) { reset(); type = VAR_FLOAT; value.f = f; }
+void VariantImpl::set(double d) { reset(); type = VAR_DOUBLE; value.d = d; }
+void VariantImpl::set(const std::string& s, const std::string& e) { reset();  type = VAR_STRING; encoding = e; value.string = new std::string(s); }
+
+void VariantImpl::set(const Variant::Map& m) {
+    reset();
+    type = VAR_MAP;
+    value.map = new Variant::Map(m);
+}
+
+void VariantImpl::set(const Variant::List& l) { reset(); type = VAR_LIST; value.list = new Variant::List(l); }
+
+void VariantImpl::set(const Uuid& u) { reset(); type = VAR_UUID; value.uuid = new Uuid(u); }
 
-VariantImpl::VariantImpl() : type(VAR_VOID) { value.i64 = 0; }
-VariantImpl::VariantImpl(bool b) : type(VAR_BOOL) { value.b = b; }
-VariantImpl::VariantImpl(uint8_t i) : type(VAR_UINT8) { value.ui8 = i; }
-VariantImpl::VariantImpl(uint16_t i) : type(VAR_UINT16) { value.ui16 = i; }
-VariantImpl::VariantImpl(uint32_t i) : type(VAR_UINT32) { value.ui32 = i; }
-VariantImpl::VariantImpl(uint64_t i) : type(VAR_UINT64) { value.ui64 = i; }
-VariantImpl::VariantImpl(int8_t i) : type(VAR_INT8) { value.i8 = i; }
-VariantImpl::VariantImpl(int16_t i) : type(VAR_INT16) { value.i16 = i; }
-VariantImpl::VariantImpl(int32_t i) : type(VAR_INT32) { value.i32 = i; }
-VariantImpl::VariantImpl(int64_t i) : type(VAR_INT64) { value.i64 = i; }
-VariantImpl::VariantImpl(float f) : type(VAR_FLOAT) { value.f = f; }
-VariantImpl::VariantImpl(double d) : type(VAR_DOUBLE) { value.d = d; }
-VariantImpl::VariantImpl(const std::string& s, const std::string& e)
-    : type(VAR_STRING), encoding(e) { value.string = new std::string(s); }
-VariantImpl::VariantImpl(const Variant::Map& m) : type(VAR_MAP) { value.map = new Variant::Map(m); }
-VariantImpl::VariantImpl(const Variant::List& l) : type(VAR_LIST) { value.list = new Variant::List(l); }
-VariantImpl::VariantImpl(const Uuid& u) : type(VAR_UUID) { value.uuid = new Uuid(u); }
+VariantImpl::~VariantImpl() { reset(); }
 
-VariantImpl::~VariantImpl() {
+void VariantImpl::reset() {
     switch (type) {
       case VAR_STRING:
         delete value.string;
@@ -172,6 +183,7 @@ VariantImpl::~VariantImpl() {
       default:
         break;
     }
+    type = VAR_VOID;
 }
 
 VariantType VariantImpl::getType() const { return type; }
@@ -637,46 +649,50 @@ bool isIntegerType(VariantType type)
     }
 }
 
-VariantImpl* VariantImpl::create(const Variant& v)
+void VariantImpl::set(const Variant& v)
 {
     switch (v.getType()) {
-      case VAR_BOOL: return new VariantImpl(v.asBool());
-      case VAR_UINT8: return new VariantImpl(v.asUint8());
-      case VAR_UINT16: return new VariantImpl(v.asUint16());
-      case VAR_UINT32: return new VariantImpl(v.asUint32());
-      case VAR_UINT64: return new VariantImpl(v.asUint64());
-      case VAR_INT8: return new VariantImpl(v.asInt8());
-      case VAR_INT16: return new VariantImpl(v.asInt16());
-      case VAR_INT32: return new VariantImpl(v.asInt32());
-      case VAR_INT64: return new VariantImpl(v.asInt64());
-      case VAR_FLOAT: return new VariantImpl(v.asFloat());
-      case VAR_DOUBLE: return new VariantImpl(v.asDouble());
-      case VAR_STRING: return new VariantImpl(v.asString(), v.getEncoding());
-      case VAR_MAP: return new VariantImpl(v.asMap());
-      case VAR_LIST: return new VariantImpl(v.asList());
-      case VAR_UUID: return new VariantImpl(v.asUuid());
-      default: return new VariantImpl();
-    }
-}
-
-Variant::Variant() : impl(new VariantImpl()) {}
-Variant::Variant(bool b) : impl(new VariantImpl(b)) {}
-Variant::Variant(uint8_t i) : impl(new VariantImpl(i)) {}
-Variant::Variant(uint16_t i) : impl(new VariantImpl(i)) {}
-Variant::Variant(uint32_t i) : impl(new VariantImpl(i)) {}
-Variant::Variant(uint64_t i) : impl(new VariantImpl(i)) {}
-Variant::Variant(int8_t i) : impl(new VariantImpl(i)) {}
-Variant::Variant(int16_t i) : impl(new VariantImpl(i)) {}
-Variant::Variant(int32_t i) : impl(new VariantImpl(i)) {}
-Variant::Variant(int64_t i) : impl(new VariantImpl(i)) {}
-Variant::Variant(float f) : impl(new VariantImpl(f)) {}
-Variant::Variant(double d) : impl(new VariantImpl(d)) {}
-Variant::Variant(const std::string& s) : impl(new VariantImpl(s)) {}
-Variant::Variant(const char* s) : impl(new VariantImpl(std::string(s))) {}
-Variant::Variant(const Map& m) : impl(new VariantImpl(m)) {}
-Variant::Variant(const List& l) : impl(new VariantImpl(l)) {}
-Variant::Variant(const Variant& v) : impl(VariantImpl::create(v)) {}
-Variant::Variant(const Uuid& u) : impl(new VariantImpl(u)) {}
+      case VAR_BOOL: set(v.asBool()); break;
+      case VAR_UINT8: set(v.asUint8()); break;
+      case VAR_UINT16: set(v.asUint16()); break;
+      case VAR_UINT32: set(v.asUint32()); break;
+      case VAR_UINT64: set(v.asUint64()); break;
+      case VAR_INT8: set(v.asInt8()); break;
+      case VAR_INT16: set(v.asInt16()); break;
+      case VAR_INT32: set(v.asInt32()); break;
+      case VAR_INT64: set(v.asInt64()); break;
+      case VAR_FLOAT: set(v.asFloat()); break;
+      case VAR_DOUBLE: set(v.asDouble()); break;
+      case VAR_STRING: set(v.asString(), v.getEncoding()); break;
+      case VAR_MAP: set(v.asMap()); break;
+      case VAR_LIST: set(v.asList()); break;
+      case VAR_UUID: set(v.asUuid()); break;
+      default: reset();
+    }
+    encoding = v.getEncoding();
+    descriptors = v.getDescriptors();
+}
+
+Variant::Variant() : impl(0) {}
+Variant::Variant(bool b) : impl(new VariantImpl()) { impl->set(b); }
+Variant::Variant(uint8_t i) : impl(new VariantImpl()) { impl->set(i); }
+Variant::Variant(uint16_t i) : impl(new VariantImpl()) { impl->set(i); }
+Variant::Variant(uint32_t i) : impl(new VariantImpl()) { impl->set(i); }
+Variant::Variant(uint64_t i) : impl(new VariantImpl()) { impl->set(i); }
+Variant::Variant(int8_t i) : impl(new VariantImpl()) { impl->set(i); }
+Variant::Variant(int16_t i) : impl(new VariantImpl()) { impl->set(i); }
+Variant::Variant(int32_t i) : impl(new VariantImpl()) { impl->set(i); }
+Variant::Variant(int64_t i) : impl(new VariantImpl()) { impl->set(i); }
+Variant::Variant(float f) : impl(new VariantImpl()) { impl->set(f); }
+Variant::Variant(double d) : impl(new VariantImpl()) { impl->set(d); }
+Variant::Variant(const std::string& s) : impl(new VariantImpl()) { impl->set(s); }
+Variant::Variant(const std::string& s, const std::string& encoding) : impl(new VariantImpl()) { impl->set(s, encoding); }
+Variant::Variant(const char* s) : impl(new VariantImpl()) { impl->set(std::string(s)); }
+Variant::Variant(const char* s, const char* encoding) : impl(new VariantImpl()) { impl->set(std::string(s), std::string(encoding)); }
+Variant::Variant(const Map& m) : impl(new VariantImpl()) { impl->set(m); }
+Variant::Variant(const List& l) : impl(new VariantImpl()) { impl->set(l); }
+Variant::Variant(const Variant& v) : impl(new VariantImpl()) { impl->set(v); }
+Variant::Variant(const Uuid& u) : impl(new VariantImpl()) { impl->set(u); }
 
 Variant::~Variant() { if (impl) delete impl; }
 
@@ -686,116 +702,105 @@ void Variant::reset()
     impl = 0;
 }
 
+namespace {
+VariantImpl* assure(VariantImpl*& ptr) {
+    if (!ptr) ptr = new VariantImpl();
+    return ptr;
+}
+}
 
 Variant& Variant::operator=(bool b)
 {
-    if (impl) delete impl;
-    impl = new VariantImpl(b);
+    assure(impl)->set(b);
     return *this;
 }
 
 Variant& Variant::operator=(uint8_t i)
 {
-    if (impl) delete impl;
-    impl = new VariantImpl(i);
+    assure(impl)->set(i);
     return *this;
 }
 Variant& Variant::operator=(uint16_t i)
 {
-    if (impl) delete impl;
-    impl = new VariantImpl(i);
+    assure(impl)->set(i);
     return *this;
 }
 Variant& Variant::operator=(uint32_t i)
 {
-    if (impl) delete impl;
-    impl = new VariantImpl(i);
+    assure(impl)->set(i);
     return *this;
 }
 Variant& Variant::operator=(uint64_t i)
 {
-    if (impl) delete impl;
-    impl = new VariantImpl(i);
+    assure(impl)->set(i);
     return *this;
 }
 
 Variant& Variant::operator=(int8_t i)
 {
-    if (impl) delete impl;
-    impl = new VariantImpl(i);
+    assure(impl)->set(i);
     return *this;
 }
 Variant& Variant::operator=(int16_t i)
 {
-    if (impl) delete impl;
-    impl = new VariantImpl(i);
+    assure(impl)->set(i);
     return *this;
 }
 Variant& Variant::operator=(int32_t i)
 {
-    if (impl) delete impl;
-    impl = new VariantImpl(i);
+    assure(impl)->set(i);
     return *this;
 }
 Variant& Variant::operator=(int64_t i)
 {
-    if (impl) delete impl;
-    impl = new VariantImpl(i);
+    assure(impl)->set(i);
     return *this;
 }
 
 Variant& Variant::operator=(float f)
 {
-    if (impl) delete impl;
-    impl = new VariantImpl(f);
+    assure(impl)->set(f);
     return *this;
 }
 Variant& Variant::operator=(double d)
 {
-    if (impl) delete impl;
-    impl = new VariantImpl(d);
+    assure(impl)->set(d);
     return *this;
 }
 
 Variant& Variant::operator=(const std::string& s)
 {
-    if (impl) delete impl;
-    impl = new VariantImpl(s);
+    assure(impl)->set(s);
     return *this;
 }
 
 Variant& Variant::operator=(const char* s)
 {
-    if (impl) delete impl;
-    impl = new VariantImpl(std::string(s));
+    assure(impl)->set(std::string(s));
     return *this;
 }
 
 Variant& Variant::operator=(const Uuid& u)
 {
-    if (impl) delete impl;
-    impl = new VariantImpl(u);
+    assure(impl)->set(u);
     return *this;
 }
 
 Variant& Variant::operator=(const Map& m)
 {
-    if (impl) delete impl;
-    impl = new VariantImpl(m);
+    assure(impl)->set(m);
     return *this;
 }
 
 Variant& Variant::operator=(const List& l)
 {
-    if (impl) delete impl;
-    impl = new VariantImpl(l);
+    assure(impl)->set(l);
     return *this;
 }
 
 Variant& Variant::operator=(const Variant& v)
 {
-    if (impl) delete impl;
-    impl = VariantImpl::create(v);
+    assure(impl)->set(v);
     return *this;
 }
 
@@ -841,8 +846,7 @@ Variant::List& Variant::asList() { if (!
 const std::string& Variant::getString() const { if (!impl) throw InvalidConversion("Can't convert VOID to STRING"); return impl->getString(); }
 std::string& Variant::getString() { if (!impl) throw InvalidConversion("Can't convert VOID to STRING"); return impl->getString(); }
 void Variant::setEncoding(const std::string& s) {
-    if (!impl) impl = new VariantImpl();
-    impl->setEncoding(s);
+    assure(impl)->setEncoding(s);
 }
 const std::string& Variant::getEncoding() const { return impl ? impl->getEncoding() : EMPTY; }
 
@@ -884,6 +888,12 @@ std::ostream& operator<<(std::ostream& o
 
 std::ostream& operator<<(std::ostream& out, const Variant& value)
 {
+    // Print the descriptors
+    const Variant::List& descriptors = value.getDescriptors();
+    for (Variant::List::const_iterator i = descriptors.begin(); i != descriptors.end(); ++i)
+        out << "@" << *i << " ";
+
+    // Print the value
     switch (value.getType()) {
       case VAR_MAP:
         out << value.asMap();
@@ -910,7 +920,43 @@ bool operator!=(const Variant& a, const
 
 bool Variant::isEqualTo(const Variant& other) const
 {
+    if (isVoid() && other.isVoid()) return true;
+    if (isVoid() || other.isVoid()) return false;
     return impl && impl->isEqualTo(*other.impl);
 }
 
+bool Variant::isDescribed() const {
+    return impl && !impl->descriptors.empty();
+}
+
+Variant::List& Variant::getDescriptors() {
+    return assure(impl)->descriptors;
+}
+
+const Variant::List& Variant::getDescriptors() const {
+    return assure(impl)->descriptors;
+}
+
+Variant Variant::getDescriptor() const {
+    if (getDescriptors().size() > 0) return getDescriptors().front();
+    else return Variant();
+}
+
+void Variant::setDescriptor(const Variant& descriptor) {
+    getDescriptors().clear();
+    getDescriptors().push_back(descriptor);
+}
+
+Variant Variant::described(const Variant& descriptor, const Variant& value) {
+    Variant described(value);
+    described.setDescriptor(descriptor);
+    return described;
+}
+
+Variant Variant::described(const Variant& descriptor, const List& value) {
+    Variant described(value);
+    described.setDescriptor(descriptor);
+    return described;
+}
+
 }} // namespace qpid::types

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/types/encodings.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/types/encodings.h?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/types/encodings.h (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/types/encodings.h Tue Mar  3 14:58:01 2015
@@ -23,11 +23,13 @@
  */
 namespace qpid {
 namespace types {
+
 namespace encodings {
 const std::string BINARY("binary");
 const std::string UTF8("utf8");
 const std::string ASCII("ascii");
 }
+
 }} // namespace qpid::types
 
 #endif  /*!QPID_TYPES_ENCODINGS_H*/

Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar  3 14:58:01 2015
@@ -7,4 +7,4 @@
 /qpid/branches/java-network-refactor/qpid/cpp/src/tests:805429-825319
 /qpid/branches/qpid-2935/qpid/cpp/src/tests:1061302-1072333
 /qpid/branches/qpid-3346/qpid/cpp/src/tests:1144319-1179855
-/qpid/trunk/qpid/cpp/src/tests:1643238-1658732
+/qpid/trunk/qpid/cpp/src/tests:1643238-1663687

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/BrokerFixture.h
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/BrokerFixture.h?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/BrokerFixture.h (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/BrokerFixture.h Tue Mar  3 14:58:01 2015
@@ -101,11 +101,13 @@ struct  BrokerFixture : private boost::n
         opts.auth=false;
 
         // Argument parsing
-        std::vector<const char*> argv(args.size());
-        for (size_t i = 0; i<args.size(); ++i)
-            argv[i] = args[i].c_str();
-        Plugin::addOptions(opts);
-        opts.parse(argv.size(), &argv[0]);
+        if (args.size() > 0) {
+            std::vector<const char*> argv(args.size());
+            for (size_t i = 0; i<args.size(); ++i)
+                argv[i] = args[i].c_str();
+            Plugin::addOptions(opts);
+            opts.parse(argv.size(), &argv[0]);
+        }
         broker = Broker::create(opts);
         // TODO aconway 2007-12-05: At one point BrokerFixture
         // tests could hang in Connection ctor if the following

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/CMakeLists.txt?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/CMakeLists.txt (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/CMakeLists.txt Tue Mar  3 14:58:01 2015
@@ -360,6 +360,11 @@ if (NOT CMAKE_SYSTEM_NAME STREQUAL Windo
   # paged queue not yet implemented for windows
   add_test (paged_queue_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_paged_queue_tests${test_script_suffix})
 endif (NOT CMAKE_SYSTEM_NAME STREQUAL Windows)
+
+if (BUILD_AMQP)
+  add_test (interop_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/interop_tests.py)
+endif (BUILD_AMQP)
+
 add_test (ha_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/ha_tests.py)
 add_test (qpidd_qmfv2_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/qpidd_qmfv2_tests.py)
 if (BUILD_AMQP)

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/Variant.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/Variant.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/Variant.cpp (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/Variant.cpp Tue Mar  3 14:58:01 2015
@@ -18,14 +18,16 @@
  * under the License.
  *
  */
-#include <iostream>
-#include "qpid/types/Variant.h"
-#include "qpid/amqp_0_10/Codecs.h"
 
 #include "unit_test.h"
+#include "qpid/types/Variant.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include <boost/assign.hpp>
+#include <iostream>
 
 using namespace qpid::types;
 using namespace qpid::amqp_0_10;
+using boost::assign::list_of;
 
 namespace qpid {
 namespace tests {
@@ -807,6 +809,22 @@ QPID_AUTO_TEST_CASE(parse)
     BOOST_CHECK(a.getType()==types::VAR_DOUBLE);
 }
 
+QPID_AUTO_TEST_CASE(described)
+{
+    Variant a;
+    BOOST_CHECK(!a.isDescribed());
+    a.getDescriptors().push_back("foo");
+    BOOST_CHECK(a.isDescribed());
+    BOOST_CHECK_EQUAL(a.getDescriptors(), list_of<Variant>("foo"));
+    a = 42;
+    BOOST_CHECK(a.isDescribed());
+    BOOST_CHECK_EQUAL(a.getDescriptors(), list_of<Variant>("foo"));
+    a.getDescriptors().push_back(33);
+    BOOST_CHECK_EQUAL(a.getDescriptors(), list_of<Variant>("foo")(33));
+    a.getDescriptors().clear();
+    BOOST_CHECK(!a.isDescribed());
+}
+
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/brokertest.py?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/brokertest.py Tue Mar  3 14:58:01 2015
@@ -21,9 +21,9 @@
 
 import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re
 import qpid, traceback, signal
+import proton
 from qpid import connection, util
 from qpid.compat import format_exc
-from qpid.harness import Skipped
 from unittest import TestCase
 from copy import copy
 from threading import Thread, Lock, Condition
@@ -49,13 +49,18 @@ from qpidtoollibs import BrokerAgent
 import qpid.messaging
 qm = qpid.messaging
 qpid_messaging = None
+
+def env_has_log_config():
+    """True if there are qpid log configuratoin settings in the environment."""
+    return "QPID_LOG_ENABLE" in os.environ or "QPID_TRACE" in os.environ
+
 if not os.environ.get("QPID_PY_NO_SWIG"):
     try:
         import qpid_messaging
         from qpid.datatypes import uuid4
         qm = qpid_messaging
         # Silence warnings from swigged messaging library unless enabled in environment.
-        if "QPID_LOG_ENABLE" not in os.environ and "QPID_TRACE" not in os.environ:
+        if not env_has_log_config():
             qm.Logger.configure(["--log-enable=error"])
     except ImportError:
         print "Cannot load python SWIG bindings, falling back to native qpid.messaging."
@@ -136,7 +141,7 @@ _popen_id = AtomicCounter() # Popen iden
 
 # Constants for file descriptor arguments to Popen
 FILE = "FILE"                       # Write to file named after process
-PIPE = subprocess.PIPE
+from subprocess import PIPE, STDOUT
 
 class Popen(subprocess.Popen):
     """
@@ -202,7 +207,7 @@ class Popen(subprocess.Popen):
 
     def communicate(self, input=None):
         ret = subprocess.Popen.communicate(self, input)
-        self.cleanup()
+        self._cleanup()
         return ret
 
     def is_running(self): return self.poll() is None
@@ -254,6 +259,7 @@ class Popen(subprocess.Popen):
 
     def cmd_str(self): return " ".join([str(s) for s in self.cmd])
 
+
 def checkenv(name):
     value = os.getenv(name)
     if not value: raise Exception("Environment variable %s is not set" % name)
@@ -308,7 +314,7 @@ class Broker(Popen):
         cmd += ["--log-to-stderr=no"]
 
         # Add default --log-enable arguments unless args already has --log arguments.
-        if not [l for l in args if l.startswith("--log")]:
+        if not env_has_log_config() and not [l for l in args if l.startswith("--log")]:
             args += ["--log-enable=info+"]
 
         if test_store: cmd += ["--load-module", BrokerTest.test_store_lib,
@@ -444,10 +450,11 @@ def browse(session, queue, timeout=0, tr
     finally:
         r.close()
 
-def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg="browse failed"):
+def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg=None):
     """Assert that the contents of messages on queue (as retrieved
     using session and timeout) exactly match the strings in
     expect_contents"""
+    if msg is None: msg = "browse '%s' failed" % queue
     actual_contents = browse(session, queue, timeout, transform=transform)
     if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
     assert expect_contents == actual_contents, msg
@@ -486,6 +493,18 @@ class BrokerTest(TestCase):
     test_store_lib = os.getenv("TEST_STORE_LIB")
     rootdir = os.getcwd()
 
+    PN_VERSION = (proton.VERSION_MAJOR, proton.VERSION_MINOR)
+    PN_TX_VERSION = (0, 9)
+
+    amqp_tx_supported = PN_VERSION >= PN_TX_VERSION
+
+    @classmethod
+    def amqp_tx_warning(cls):
+        if not cls.amqp_tx_supported:
+            print "WARNING: Cannot test transactions over AMQP 1.0, proton version %s.%s < %s.%s" % (cls.PN_VERSION + cls.PN_TX_VERSION)
+            return False
+        return True
+
     def configure(self, config): self.config=config
 
     def setUp(self):
@@ -498,8 +517,8 @@ class BrokerTest(TestCase):
         if qpid_messaging and self.amqp_lib: default_protocol="amqp1.0"
         else: default_protocol="amqp0-10"
         self.protocol = defs.get("PROTOCOL") or default_protocol
-        self.tx_protocol = "amqp0-10" # Transactions not yet supported over 1.0
-
+        self.tx_protocol = self.protocol
+        if not self.amqp_tx_supported: self.tx_protocol = "amqp0-10"
 
     def tearDown(self):
         err = []
@@ -530,15 +549,22 @@ class BrokerTest(TestCase):
         self.teardown_add(p)
         return p
 
-    def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, show_cmd=False):
+    def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, show_cmd=False, **kw):
         """Create and return a broker ready for use"""
-        b = Broker(self, args=args, name=name, expect=expect, port=port, show_cmd=show_cmd)
+        b = Broker(self, args=args, name=name, expect=expect, port=port, show_cmd=show_cmd, **kw)
         if (wait):
             try: b.ready()
             except Exception, e:
                 raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
         return b
 
+    def check_output(self, args, stdin=None):
+        p = self.popen(args, stdout=PIPE, stderr=STDOUT)
+        out = p.communicate(stdin)
+        if p.returncode != 0:
+            raise Exception("%s exit code %s, output:\n%s" % (args, p.returncode, out[0]))
+        return out[0]
+
     def browse(self, *args, **kwargs): browse(*args, **kwargs)
     def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs)
     def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs)

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/ha_test.py?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/ha_test.py Tue Mar  3 14:58:01 2015
@@ -24,6 +24,7 @@ from brokertest import *
 from threading import Thread, Lock, Condition
 from logging import getLogger, WARN, ERROR, DEBUG, INFO
 from qpidtoollibs import BrokerAgent
+from qpid.harness import Skipped
 
 log = getLogger(__name__)
 
@@ -129,12 +130,9 @@ class HaBroker(Broker):
         args += ["--load-module", BrokerTest.ha_lib,
                  # Non-standard settings for faster tests.
                  "--link-maintenance-interval=0.1",
-                 # Heartbeat and negotiate time are needed so that a broker wont
-                 # stall on an address that doesn't currently have a broker running.
-                 "--max-negotiate-time=1000",
                  "--ha-cluster=%s"%ha_cluster]
         # Add default --log-enable arguments unless args already has --log arguments.
-        if not [l for l in args if l.startswith("--log")]:
+        if not env_has_log_config() and not [l for l in args if l.startswith("--log")]:
             args += ["--log-enable=info+", "--log-enable=debug+:ha::"]
         if not [h for h in args if h.startswith("--link-heartbeat-interval")]:
             args += ["--link-heartbeat-interval=%s"%(HaBroker.heartbeat)]
@@ -159,13 +157,20 @@ acl allow all all
         Broker.__init__(self, test, args, port=ha_port.port, **kwargs)
 
     # Do some static setup to locate the qpid-config and qpid-ha tools.
-    qpid_ha_script=import_script(os.path.join(os.getenv("PYTHON_COMMANDS"),"qpid-ha"))
-    qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config")
-    assert os.path.isfile(qpid_config_path)
+    @property
+    def qpid_ha_script(self):
+        if not hasattr(self, "_qpid_ha_script"):
+            qpid_ha_exec = os.getenv("QPID_HA_EXEC")
+            if not qpid_ha_exec or not os.path.isfile(qpid_ha_exec):
+                raise Skipped("qpid-ha not available")
+            self._qpid_ha_script = import_script(qpid_ha_exec)
+        return self._qpid_ha_script
 
     def __repr__(self): return "<HaBroker:%s:%d>"%(self.log, self.port())
 
     def qpid_ha(self, args):
+        if not self.qpid_ha_script:
+            raise Skipped("qpid-ha not available")
         try:
             cred = self.client_credentials
             url = self.host_port()
@@ -195,33 +200,37 @@ acl allow all all
 
     def ha_status(self): return self.qmf().status
 
-    def wait_status(self, status, timeout=5):
+    def wait_status(self, status, timeout=10):
+
         def try_get_status():
             self._status = "<unknown>"
-            # Ignore ConnectionError, the broker may not be up yet.
             try:
                 self._status = self.ha_status()
-                return self._status == status;
-            except qm.ConnectionError: return False
+            except qm.ConnectionError, e:
+                # Record the error but don't raise, the broker may not be up yet.
+                self._status = "%s: %s" % (type(e).__name__, e)
+            return self._status == status;
         assert retry(try_get_status, timeout=timeout), "%s expected=%r, actual=%r"%(
             self, status, self._status)
 
-    def wait_queue(self, queue, timeout=1, msg="wait_queue"):
+    def wait_queue(self, queue, timeout=10, msg="wait_queue"):
         """ Wait for queue to be visible via QMF"""
         agent = self.agent
-        assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout), msg+"queue %s not present"%queue
+        assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout), \
+            "%s queue %s not present" % (msg, queue)
 
-    def wait_no_queue(self, queue, timeout=1, msg="wait_no_queue"):
+    def wait_no_queue(self, queue, timeout=10, msg="wait_no_queue"):
         """ Wait for queue to be invisible via QMF"""
         agent = self.agent
         assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout), "%s: queue %s still present"%(msg,queue)
 
-    # TODO aconway 2012-05-01: do direct python call to qpid-config code.
     def qpid_config(self, args):
+        qpid_config_exec = os.getenv("QPID_CONFIG_EXEC")
+        if not qpid_config_exec or not os.path.isfile(qpid_config_exec):
+            raise Skipped("qpid-config not available")
         assert subprocess.call(
-            [self.qpid_config_path, "--broker", self.host_port()]+args,
-            stdout=1, stderr=subprocess.STDOUT
-            ) == 0
+            [qpid_config_exec, "--broker", self.host_port()]+args, stdout=1, stderr=subprocess.STDOUT
+        ) == 0, "qpid-config failed"
 
     def config_replicate(self, from_broker, queue):
         self.qpid_config(["add", "queue", "--start-replica", from_broker, queue])
@@ -325,7 +334,7 @@ class HaCluster(object):
         ha_port = self._ports[i]
         b = HaBroker(ha_port.test, ha_port, brokers_url=self.url, name=name,
                      args=args, **self.kwargs)
-        b.ready(timeout=5)
+        b.ready(timeout=10)
         return b
 
     def start(self):

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/ha_tests.py?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/ha_tests.py Tue Mar  3 14:58:01 2015
@@ -1025,8 +1025,8 @@ class LongTests(HaBrokerTest):
              "--broker", brokers[0].host_port(),
              "--address", "q;{create:always}",
              "--messages=1000",
-             "--tx=10"
-             # TODO aconway 2014-02-21: can't use amqp1.0 for transactions yet
+             "--tx=10",
+             "--connection-options={protocol:%s}" % self.tx_protocol
              ])
         receiver = self.popen(
             ["qpid-receive",
@@ -1034,8 +1034,8 @@ class LongTests(HaBrokerTest):
              "--address", "q;{create:always}",
              "--messages=990",
              "--timeout=10",
-             "--tx=10"
-             # TODO aconway 2014-02-21: can't use amqp1.0 for transactions yet
+             "--tx=10",
+             "--connection-options={protocol:%s}" % self.tx_protocol
              ])
         self.assertEqual(sender.wait(), 0)
         self.assertEqual(receiver.wait(), 0)
@@ -1268,7 +1268,7 @@ class StoreTests(HaBrokerTest):
         """Verify that a backup erases queue data from store recovery before
         doing catch-up from the primary."""
         if self.check_skip(): return
-        cluster = HaCluster(self, 2, args=['--log-enable=trace+:ha', '--log-enable=trace+:Store'])
+        cluster = HaCluster(self, 2)
         sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session()
         s1 = sn.sender("q1;{create:always,node:{durable:true}}")
         for m in ["foo","bar"]: s1.send(qm.Message(m, durable=True))
@@ -1532,7 +1532,7 @@ class TransactionTests(HaBrokerTest):
             except qm.TransactionUnknown: pass
             for b in cluster: self.assert_tx_clean(b)
             try: tx.connection.close()
-            except TransactionUnknown: pass # Occasionally get exception on close.
+            except qm.TransactionUnknown: pass # Occasionally get exception on close.
         finally: l.restore()
 
     def test_tx_no_backups(self):
@@ -1622,21 +1622,26 @@ class TransactionTests(HaBrokerTest):
             import qpid_tests.broker_0_10
         except ImportError:
             raise Skipped("Tests not found")
-
         cluster = HaCluster(self, 3)
-        self.popen(["qpid-txtest", "-p%s"%cluster[0].port()]).assert_exit_ok()
+        if "QPID_PORT" in os.environ: del os.environ["QPID_PORT"]
+        self.popen(["qpid-txtest2", "--broker", cluster[0].host_port()]).assert_exit_ok()
+        print
         self.popen(["qpid-python-test",
                     "-m", "qpid_tests.broker_0_10",
+                    "-m", "qpid_tests.broker_1_0",
                     "-b", "localhost:%s"%(cluster[0].port()),
-                    "*.tx.*"]).assert_exit_ok()
+                    "*.tx.*"], stdout=None, stderr=None).assert_exit_ok()
 
 if __name__ == "__main__":
-    outdir = "ha_tests.tmp"
-    shutil.rmtree(outdir, True)
-    qpid_ha = os.getenv("QPID_HA_EXEC")
-    if  qpid_ha and os.path.exists(qpid_ha):
+    qpid_ha_exec = os.getenv("QPID_HA_EXEC")
+    if qpid_ha_exec and os.path.isfile(qpid_ha_exec):
+        BrokerTest.amqp_tx_warning()
+        outdir = "ha_tests.tmp"
+        shutil.rmtree(outdir, True)
         os.execvp("qpid-python-test",
-                  ["qpid-python-test", "-m", "ha_tests", "-DOUTDIR=%s"%outdir]
+                ["qpid-python-test", "-m", "ha_tests", "-DOUTDIR=%s"%outdir]
                   + sys.argv[1:])
     else:
-        print "Skipping ha_tests, %s not available"%(qpid_ha)
+        print "Skipping ha_tests, qpid-ha not available"
+
+

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/interlink_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/interlink_tests.py?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/interlink_tests.py (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/interlink_tests.py Tue Mar  3 14:58:01 2015
@@ -88,6 +88,7 @@ class AmqpBrokerTest(BrokerTest):
         result = self.popen(cmd, stdout=PIPE)
         r.fetch(timeout=1) # wait until receiver is actually ready
         s.acknowledge()
+        r.close()
         s.close()
         return result
 

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-receive.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-receive.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-receive.cpp (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-receive.cpp Tue Mar  3 14:58:01 2015
@@ -197,7 +197,7 @@ int main(int argc, char ** argv)
             std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
             Session session = opts.tx ? connection.createTransactionalSession() : connection.createSession();
             Receiver receiver = session.createReceiver(opts.address);
-            receiver.setCapacity(opts.capacity);
+            receiver.setCapacity(std::min(opts.capacity, opts.messages));
             Message msg;
             uint count = 0;
             uint txCount = 0;
@@ -207,9 +207,9 @@ int main(int argc, char ** argv)
             Reporter<ThroughputAndLatency> reporter(std::cout, opts.reportEvery, opts.reportHeader);
             if (!opts.readyAddress.empty()) {
                 session.createSender(opts.readyAddress).send(msg);
-		if (opts.tx)
-		    session.commit();
-	    }
+                if (opts.tx)
+                    session.commit();
+            }
             // For receive rate calculation
             qpid::sys::AbsTime start = qpid::sys::now();
             int64_t interval = 0;
@@ -290,6 +290,7 @@ int main(int argc, char ** argv)
             connection.close();
             return 0;
         }
+        return 1;
     } catch(const std::exception& error) {
         std::cerr << "qpid-receive: " << error.what() << std::endl;
         connection.close();

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-send.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-send.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-send.cpp (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-send.cpp Tue Mar  3 14:58:01 2015
@@ -112,14 +112,14 @@ struct Options : public qpid::Options
           log(argv0),
           reportTotal(false),
           reportEvery(0),
-          reportHeader(true),
-          sendRate(0),
-          sequence(true),
-          timestamp(true),
-          groupPrefix("GROUP-"),
-          groupSize(10),
-          groupRandSize(false),
-          groupInterleave(1)
+        reportHeader(true),
+        sendRate(0),
+        sequence(true),
+        timestamp(true),
+        groupPrefix("GROUP-"),
+        groupSize(10),
+        groupRandSize(false),
+        groupInterleave(1)
     {
         addOptions()
             ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
@@ -272,7 +272,7 @@ class MapContentGenerator   : public Con
 // tag each generated message with a group identifer
 //
 class GroupGenerator {
-public:
+  public:
     GroupGenerator(const std::string& key,
                    const std::string& prefix,
                    const uint size,
@@ -351,7 +351,7 @@ int main(int argc, char ** argv)
     try {
         Options opts;
         if (opts.parse(argc, argv)) {
-             connection = Connection(opts.url, opts.connectionOptions);
+            connection = Connection(opts.url, opts.connectionOptions);
             connection.open();
             std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
             Session session = opts.tx ? connection.createTransactionalSession() : connection.createSession();
@@ -447,6 +447,7 @@ int main(int argc, char ** argv)
             connection.close();
             return 0;
         }
+        return 1;
     } catch(const std::exception& error) {
         std::cerr << "qpid-send: " << error.what() << std::endl;
         connection.close();

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-txtest2.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-txtest2.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-txtest2.cpp (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/qpid-txtest2.cpp Tue Mar  3 14:58:01 2015
@@ -353,10 +353,11 @@ int main(int argc, char** argv)
             if (opts.init) controller.init();
             if (opts.transfer) controller.transfer();
             if (opts.check) return controller.check();
+            return 0;
         }
-        return 0;
+        return 1;
     } catch(const std::exception& e) {
-	std::cout << argv[0] << ": " << e.what() << std::endl;
+	std::cerr << argv[0] << ": " << e.what() << std::endl;
     }
     return 2;
 }

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/swig_python_tests
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/swig_python_tests?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/swig_python_tests (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/swig_python_tests Tue Mar  3 14:58:01 2015
@@ -39,7 +39,8 @@ skip() {
 }
 
 start_broker() {
-    QPID_PORT=$($QPIDD_EXEC --daemon --port 0 --interface 127.0.0.1 --no-data-dir $MODULES --auth no) || fail "Could not start broker"
+    rm -f swig_python_tests.log
+    QPID_PORT=$($QPIDD_EXEC --daemon --port 0 --interface 127.0.0.1 --no-data-dir $MODULES --auth no --log-to-file swig_python_tests.log) || fail "Could not start broker"
 }
 
 stop_broker() {
@@ -54,9 +55,9 @@ echo "Running swigged python tests using
 
 export PYTHONPATH=$PYTHONPATH:$PYTHONPATH_SWIG
 export QPID_USE_SWIG_CLIENT=1
-$QPID_PYTHON_TEST -m qpid.tests.messaging.message -m qpid_tests.broker_0_10.priority -m qpid_tests.broker_0_10.lvq -m qpid_tests.broker_0_10.new_api -b localhost:$QPID_PORT -I $srcdir/failing-amqp0-10-python-tests || FAILED=1
+$QPID_PYTHON_TEST -m qpid.tests.messaging.message -m qpid_tests.broker_0_10.priority -m qpid_tests.broker_0_10.lvq -m qpid_tests.broker_0_10.new_api -b localhost:$QPID_PORT -I $srcdir/failing-amqp0-10-python-tests $* || FAILED=1
 if [[ -a $AMQP_LIB ]] ; then
-    $QPID_PYTHON_TEST --define="protocol_version=amqp1.0" -m qpid_tests.broker_1_0 -m qpid_tests.broker_0_10.new_api -m assertions -m reject_release -m misc -m policies -b localhost:$QPID_PORT -I $srcdir/failing-amqp1.0-python-tests || FAILED=1
+    $QPID_PYTHON_TEST --define="protocol_version=amqp1.0" -m qpid_tests.broker_1_0 -m qpid_tests.broker_0_10.new_api -m assertions -m reject_release -m misc -m policies -b localhost:$QPID_PORT -I $srcdir/failing-amqp1.0-python-tests $* || FAILED=1
 fi
 stop_broker
 if [[ $FAILED -eq 1 ]]; then

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/test_env.sh.in
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/test_env.sh.in?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/test_env.sh.in (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/test_env.sh.in Tue Mar  3 14:58:01 2015
@@ -20,14 +20,14 @@
 absdir() { echo `cd $1 && pwd`; }
 
 # Environment variables substituted by cmake.
-srcdir=`absdir @abs_srcdir@`
-builddir=`absdir @abs_builddir@`
-top_srcdir=`absdir @abs_top_srcdir@`
-top_builddir=`absdir @abs_top_builddir@`
-moduledir=$top_builddir/src@builddir_lib_suffix@
-pythonswigdir=$top_builddir/bindings/qpid/python/
-pythonswiglibdir=$top_builddir/bindings/qpid/python@builddir_lib_suffix@
-testmoduledir=$builddir@builddir_lib_suffix@
+export srcdir=`absdir @abs_srcdir@`
+export builddir=`absdir @abs_builddir@`
+export top_srcdir=`absdir @abs_top_srcdir@`
+export top_builddir=`absdir @abs_top_builddir@`
+export moduledir=$top_builddir/src@builddir_lib_suffix@
+export pythonswigdir=$top_builddir/bindings/qpid/python/
+export pythonswiglibdir=$top_builddir/bindings/qpid/python@builddir_lib_suffix@
+export testmoduledir=$builddir@builddir_lib_suffix@
 export QPID_INSTALL_PREFIX=@prefix@
 
 # Tools substituted by cmake

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/test_store.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/test_store.cpp?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/test_store.cpp (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests/test_store.cpp Tue Mar  3 14:58:01 2015
@@ -223,27 +223,18 @@ class TestStore : public NullMessageStor
                  const boost::intrusive_ptr<PersistableMessage>& pmsg,
                  const PersistableQueue& queue)
     {
-        qpid::broker::amqp_0_10::MessageTransfer* msg =
-            dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get());
-        assert(msg);
-
         ostringstream o;
-        o << "<enqueue " << queue.getName() << " " << getContent(msg);
+        string data = getContent(pmsg);
+        o << "<enqueue " << queue.getName() << " " << data;
         if (tx) o << " tx=" << getId(*tx);
         o << ">";
         log(o.str());
 
         // Dump the message if there is a dump file.
         if (dump.get()) {
-            msg->getFrames().getMethod()->print(*dump);
-            *dump  << endl << "  ";
-            msg->getFrames().getHeaders()->print(*dump);
-            *dump << endl << "  ";
-            *dump << msg->getFrames().getContentSize() << endl;
+            *dump << "Message(" << data.size() << "): " << data << endl;
         }
         string logPrefix = "TestStore "+name+": ";
-        // Check the message for special instructions for this store.
-        string data = msg->getFrames().getContent();
         Action action(data);
         bool doComplete = true;
         if (action.index && action.executeIn(name)) {
@@ -258,7 +249,7 @@ class TestStore : public NullMessageStor
                       QPID_LOG(error, logPrefix << "async-id needs argument: " << data);
                       break;
                   }
-                  asyncIds[action.args[0]] = msg;
+                  asyncIds[action.args[0]] = pmsg;
                   QPID_LOG(debug, logPrefix << "delayed completion " << action.args[0]);
                   doComplete = false;
                   break;
@@ -284,7 +275,7 @@ class TestStore : public NullMessageStor
                 QPID_LOG(error, logPrefix << "unknown action: " << data);
             }
         }
-        if (doComplete) msg->enqueueComplete();
+        if (doComplete) pmsg->enqueueComplete();
     }
 
     void dequeue(TransactionContext* tx,

Propchange: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Mar  3 14:58:01 2015
@@ -3,4 +3,4 @@
 /qpid/branches/java-network-refactor/qpid/python:805429-825319
 /qpid/branches/qmfv2/qpid/python:902858,902894
 /qpid/branches/qpid-2935/qpid/python:1061302-1072333
-/qpid/trunk/qpid/python:1643238-1659605
+/qpid/trunk/qpid/python:1643238-1663687

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/client.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/client.py?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/client.py (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/client.py Tue Mar  3 14:58:01 2015
@@ -89,7 +89,7 @@ class Client:
     self.password = password
     self.locale = locale
     self.tune_params = tune_params
-    self.client_properties=get_client_properties_with_defaults(provided_client_properties=client_properties)
+    self.client_properties=get_client_properties_with_defaults(provided_client_properties=client_properties, version_property_key="version")
     self.sasl_options = sasl_options
     self.socket = connect(self.host, self.port, connection_options)
     self.conn = Connection(self.socket, self.spec)

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/tests/util.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/tests/util.py?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/tests/util.py (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/tests/util.py Tue Mar  3 14:58:01 2015
@@ -21,26 +21,32 @@ from qpid.util import get_client_propert
 
 class UtilTest (TestCase):
 
-  def test_get_spec_recommended_client_properties(self):
-    client_properties = get_client_properties_with_defaults(provided_client_properties={"mykey":"myvalue"})
+  def test_default_client_properties_08091(self):
+    client_properties = get_client_properties_with_defaults(version_property_key="version")
     self.assertTrue("product" in client_properties)
     self.assertTrue("version" in client_properties)
     self.assertTrue("platform" in client_properties)
 
-  def test_get_client_properties_with_provided_value(self):
+  def test_default_client_properties_010(self):
+    client_properties = get_client_properties_with_defaults(version_property_key="qpid.client_version")
+    self.assertTrue("product" in client_properties)
+    self.assertTrue("qpid.client_version" in client_properties)
+    self.assertTrue("platform" in client_properties)
+
+  def test_client_properties_with_provided_value(self):
     client_properties = get_client_properties_with_defaults(provided_client_properties={"mykey":"myvalue"})
     self.assertTrue("product" in client_properties)
     self.assertTrue("mykey" in client_properties)
     self.assertEqual("myvalue", client_properties["mykey"])
 
-  def test_get_client_properties_with_no_provided_values(self):
+  def test_client_properties_with_provided_value_that_overrides_default(self):
+    client_properties = get_client_properties_with_defaults(provided_client_properties={"product":"myproduct"})
+    self.assertEqual("myproduct", client_properties["product"])
+
+  def test_client_properties_with_no_provided_values(self):
     client_properties = get_client_properties_with_defaults(provided_client_properties=None)
     self.assertTrue("product" in client_properties)
 
     client_properties = get_client_properties_with_defaults()
     self.assertTrue("product" in client_properties)
 
-  def test_get_client_properties_with_provided_value_that_overrides_default(self):
-    client_properties = get_client_properties_with_defaults(provided_client_properties={"version":"myversion"})
-    self.assertEqual("myversion", client_properties["version"])
-

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/util.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/util.py?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/util.py (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/python/qpid/util.py Tue Mar  3 14:58:01 2015
@@ -42,15 +42,24 @@ except ImportError:
     def close(self):
       self.sock.close()
 
-def get_client_properties_with_defaults(provided_client_properties={}):
+def get_client_properties_with_defaults(provided_client_properties={}, version_property_key="qpid.client_version"):
   ppid = 0
+  version = "unidentified"
   try:
     ppid = os.getppid()
   except:
     pass
 
+  try:
+    import pkg_resources
+    pkg = pkg_resources.require("qpid-python")
+    if pkg and pkg[0] and pkg[0].version:
+      version = pkg[0].version
+  except:
+    pass
+
   client_properties = {"product": "qpid python client",
-                       "version": "development",
+                       version_property_key : version,
                        "platform": os.name,
                        "qpid.client_process": os.path.basename(sys.argv and sys.argv[0] or ''),
                        "qpid.client_pid": os.getpid(),

Modified: qpid/branches/QPID-6262-JavaBrokerNIO/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py?rev=1663719&r1=1663718&r2=1663719&view=diff
==============================================================================
--- qpid/branches/QPID-6262-JavaBrokerNIO/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py (original)
+++ qpid/branches/QPID-6262-JavaBrokerNIO/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py Tue Mar  3 14:58:01 2015
@@ -23,3 +23,4 @@ from general import *
 from legacy_exchanges import *
 from selector import *
 from translation import *
+from tx import *



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message