qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1518182 - in /qpid/trunk/qpid/cpp/src/qpid: broker/amqp/ messaging/amqp/
Date Wed, 28 Aug 2013 12:41:32 GMT
Author: gsim
Date: Wed Aug 28 12:41:31 2013
New Revision: 1518182

URL: http://svn.apache.org/r1518182
Log:
QPID-4978: add support for reliability option

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1518182&r1=1518181&r2=1518182&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Wed Aug 28 12:41:31 2013
@@ -52,7 +52,8 @@ OutgoingFromQueue::OutgoingFromQueue(Bro
       isControllingUser(p),
       queue(q), deliveries(5000), link(l), out(o),
       current(0), outstanding(0),
-      buffer(1024)/*used only for header at present*/
+      buffer(1024)/*used only for header at present*/,
+      unreliable(pn_link_remote_snd_settle_mode(link) == PN_SND_SETTLED)
 {
     for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
         deliveries[i].init(i);
@@ -105,6 +106,7 @@ void OutgoingFromQueue::handle(pn_delive
         write(&buffer[0], encoder.getPosition());
         Translation t(r.msg);
         t.write(*this);
+        if (unreliable) pn_delivery_settle(delivery);
         if (pn_link_advance(link)) {
             --outstanding;
             outgoingMessageSent();
@@ -113,7 +115,10 @@ void OutgoingFromQueue::handle(pn_delive
             QPID_LOG(error, "Failed to send message " << r.msg.getSequence() <<
" from " << queue->getName() << ", index=" << r.index);
         }
     }
-    if (pn_delivery_updated(delivery)) {
+    if (unreliable) {
+        if (preAcquires()) queue->dequeue(0, r.cursor);
+        r.reset();
+    } else if (pn_delivery_updated(delivery)) {
         assert(r.delivery == delivery);
         r.disposition = pn_delivery_remote_state(delivery);
         if (r.disposition) {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h?rev=1518182&r1=1518181&r2=1518182&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h Wed Aug 28 12:41:31 2013
@@ -135,6 +135,7 @@ class OutgoingFromQueue : public Outgoin
     std::vector<char> buffer;
     std::string subjectFilter;
     boost::scoped_ptr<Selector> selector;
+    bool unreliable;
 };
 }}} // namespace qpid::broker::amqp
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1518182&r1=1518181&r2=1518182&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Wed Aug 28 12:41:31 2013
@@ -317,7 +317,6 @@ void Session::setupOutgoing(pn_link_t* l
         target = targetAddress;
     }
 
-
     if (node.queue) {
         authorise.outgoing(node.queue);
         SubscriptionType type = pn_terminus_get_distribution_mode(source) == PN_DIST_MODE_COPY
? BROWSER : CONSUMER;

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp?rev=1518182&r1=1518181&r2=1518182&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp Wed Aug 28 12:41:31 2013
@@ -73,6 +73,12 @@ const std::string SUBJECT_FILTER("subjec
 const std::string SOURCE("sender-source");
 const std::string TARGET("receiver-target");
 
+//reliability options:
+const std::string UNRELIABLE("unreliable");
+const std::string AT_MOST_ONCE("at-most-once");
+const std::string AT_LEAST_ONCE("at-least-once");
+const std::string EXACTLY_ONCE("exactly-once");
+
 //distribution modes:
 const std::string MOVE("move");
 const std::string COPY("copy");
@@ -293,6 +299,7 @@ AddressHelper::AddressHelper(const Addre
     bind(address, LINK, link);
     bind(node, PROPERTIES, properties);
     bind(node, CAPABILITIES, capabilities);
+    bind(link, RELIABILITY, reliability);
     durableNode = test(node, DURABLE);
     durableLink = test(link, DURABLE);
     timeout = get(link, TIMEOUT, durableLink ? DEFAULT_DURABLE_TIMEOUT : DEFAULT_TIMEOUT);
@@ -506,6 +513,11 @@ bool AddressHelper::enabled(const std::s
     return result;
 }
 
+bool AddressHelper::isUnreliable() const
+{
+    return reliability == AT_MOST_ONCE || reliability == UNRELIABLE;
+}
+
 const qpid::types::Variant::Map& AddressHelper::getNodeProperties() const
 {
     return node;
@@ -536,7 +548,7 @@ bool AddressHelper::getLinkOption(const 
     }
 }
 
-void AddressHelper::configure(pn_terminus_t* terminus, CheckMode mode)
+void AddressHelper::configure(pn_link_t* link, pn_terminus_t* terminus, CheckMode mode)
 {
     bool createOnDemand(false);
     if (isTemporary) {
@@ -581,7 +593,9 @@ void AddressHelper::configure(pn_terminu
             pn_data_exit(filter);
         }
     }
-
+    if (isUnreliable()) {
+        pn_link_set_snd_settle_mode(link, PN_SND_SETTLED);
+    }
 }
 
 void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create)

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h?rev=1518182&r1=1518181&r2=1518182&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h Wed Aug 28 12:41:31 2013
@@ -24,6 +24,7 @@
 #include "qpid/types/Variant.h"
 #include <vector>
 
+struct pn_link_t;
 struct pn_terminus_t;
 
 namespace qpid {
@@ -36,9 +37,10 @@ class AddressHelper
     enum CheckMode {FOR_RECEIVER, FOR_SENDER};
 
     AddressHelper(const Address& address);
-    void configure(pn_terminus_t* terminus, CheckMode mode);
+    void configure(pn_link_t* link, pn_terminus_t* terminus, CheckMode mode);
     void checkAssertion(pn_terminus_t* terminus, CheckMode mode);
 
+    bool isUnreliable() const;
     const qpid::types::Variant::Map& getNodeProperties() const;
     bool getLinkSource(std::string& out) const;
     bool getLinkTarget(std::string& out) const;
@@ -68,6 +70,7 @@ class AddressHelper
     qpid::types::Variant::List capabilities;
     std::string name;
     std::string type;
+    std::string reliability;
     bool durableNode;
     bool durableLink;
     uint32_t timeout;

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1518182&r1=1518181&r2=1518182&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Wed Aug 28 12:41:31
2013
@@ -377,12 +377,12 @@ void ConnectionContext::send(boost::shar
     qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
     checkClosed(ssn);
     SenderContext::Delivery* delivery(0);
-    while (!(delivery = snd->send(message))) {
+    while (!snd->send(message, &delivery)) {
         QPID_LOG(debug, "Waiting for capacity...");
         wait(ssn, snd);//wait for capacity
     }
     wakeupDriver();
-    if (sync) {
+    if (sync && delivery) {
         while (!delivery->accepted()) {
             QPID_LOG(debug, "Waiting for confirmation...");
             wait(ssn, snd);//wait until message has been confirmed

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp?rev=1518182&r1=1518181&r2=1518182&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp Wed Aug 28 12:41:31 2013
@@ -99,7 +99,7 @@ void ReceiverContext::configure()
 }
 void ReceiverContext::configure(pn_terminus_t* source)
 {
-    helper.configure(source, AddressHelper::FOR_RECEIVER);
+    helper.configure(receiver, source, AddressHelper::FOR_RECEIVER);
     std::string option;
     if (helper.getLinkTarget(option)) {
         pn_terminus_set_address(pn_link_target(receiver), option.c_str());

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1518182&r1=1518181&r2=1518182&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Wed Aug 28 12:41:31 2013
@@ -42,7 +42,7 @@ SenderContext::SenderContext(pn_session_
   : name(n),
     address(a),
     helper(address),
-    sender(pn_sender(session, n.c_str())), capacity(1000) {}
+    sender(pn_sender(session, n.c_str())), capacity(1000), unreliable(helper.isUnreliable())
{}
 
 SenderContext::~SenderContext()
 {
@@ -80,16 +80,25 @@ const std::string& SenderContext::getTar
     return address.getName();
 }
 
-SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& message)
+bool SenderContext::send(const qpid::messaging::Message& message, SenderContext::Delivery**
out)
 {
     if (processUnsettled(false) < capacity && pn_link_credit(sender)) {
-        deliveries.push_back(Delivery(nextId++));
-        Delivery& delivery = deliveries.back();
-        delivery.encode(MessageImplAccess::get(message), address);
-        delivery.send(sender);
-        return &delivery;
+        if (unreliable) {
+            Delivery delivery(nextId++);
+            delivery.encode(MessageImplAccess::get(message), address);
+            delivery.send(sender, unreliable);
+            *out = 0;
+            return true;
+        } else {
+            deliveries.push_back(Delivery(nextId++));
+            Delivery& delivery = deliveries.back();
+            delivery.encode(MessageImplAccess::get(message), address);
+            delivery.send(sender, unreliable);
+            *out = &delivery;
+            return true;
+        }
     } else {
-        return 0;
+        return false;
     }
 }
 
@@ -474,13 +483,14 @@ void SenderContext::Delivery::encode(con
         //write footer (no annotations yet supported)
     }
 }
-void SenderContext::Delivery::send(pn_link_t* sender)
+void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable)
 {
     pn_delivery_tag_t tag;
     tag.size = sizeof(id);
     tag.bytes = reinterpret_cast<const char*>(&id);
     token = pn_delivery(sender, tag);
     pn_link_send(sender, encoded.getData(), encoded.getSize());
+    if (unreliable) pn_delivery_settle(token);
     pn_link_advance(sender);
 }
 
@@ -520,7 +530,7 @@ void SenderContext::configure()
 }
 void SenderContext::configure(pn_terminus_t* target)
 {
-    helper.configure(target, AddressHelper::FOR_SENDER);
+    helper.configure(sender, target, AddressHelper::FOR_SENDER);
     std::string option;
     if (helper.getLinkSource(option)) {
         pn_terminus_set_address(pn_link_source(sender), option.c_str());

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h?rev=1518182&r1=1518181&r2=1518182&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h Wed Aug 28 12:41:31 2013
@@ -52,7 +52,7 @@ class SenderContext
       public:
         Delivery(int32_t id);
         void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&);
-        void send(pn_link_t*);
+        void send(pn_link_t*, bool unreliable);
         bool delivered();
         bool accepted();
         bool rejected();
@@ -71,7 +71,7 @@ class SenderContext
     uint32_t getUnsettled();
     const std::string& getName() const;
     const std::string& getTarget() const;
-    Delivery* send(const qpid::messaging::Message& message);
+    bool send(const qpid::messaging::Message& message, Delivery**);
     void configure();
     void verify(pn_terminus_t*);
     void check();
@@ -88,6 +88,7 @@ class SenderContext
     int32_t nextId;
     Deliveries deliveries;
     uint32_t capacity;
+    bool unreliable;
 
     uint32_t processUnsettled(bool silent);
     void configure(pn_terminus_t*);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message