qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1532308 - in /qpid/trunk/qpid/cpp/src/qpid: broker/amqp/Outgoing.cpp broker/amqp/Session.cpp messaging/amqp/AddressHelper.cpp
Date Tue, 15 Oct 2013 12:42:01 GMT
Author: gsim
Date: Tue Oct 15 12:42:01 2013
New Revision: 1532308

URL: http://svn.apache.org/r1532308
Log:
QPID-5232: make subscriptions unreliable and autodeleted by default

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1532308&r1=1532307&r2=1532308&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Tue Oct 15 12:42:01 2013
@@ -45,6 +45,17 @@ void Outgoing::wakeup()
     session.wakeup();
 }
 
+namespace {
+bool requested_reliable(pn_link_t* link)
+{
+    return pn_link_remote_snd_settle_mode(link) == PN_SND_UNSETTLED;
+}
+bool requested_unreliable(pn_link_t* link)
+{
+    return pn_link_remote_snd_settle_mode(link) == PN_SND_SETTLED;
+}
+}
+
 OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, const
std::string& target, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session,
                                      qpid::sys::OutputControl& o, SubscriptionType type,
bool e, bool p)
     : Outgoing(broker, session, source, target, pn_link_name(l)),
@@ -54,7 +65,8 @@ OutgoingFromQueue::OutgoingFromQueue(Bro
       queue(q), deliveries(5000), link(l), out(o),
       current(0), outstanding(0),
       buffer(1024)/*used only for header at present*/,
-      unreliable(pn_link_remote_snd_settle_mode(link) == PN_SND_SETTLED)
+      //for exclusive queues, assume unreliable unless reliable is explicitly requested;
otherwise assume reliable unless unreliable requested
+      unreliable(exclusive ? !requested_reliable(link) : requested_unreliable(link))
 {
     for (size_t i = 0 ; i < deliveries.capacity(); ++i) {
         deliveries[i].init(i);
@@ -106,8 +118,8 @@ void OutgoingFromQueue::handle(pn_delive
         write(&buffer[0], encoder.getPosition());
         Translation t(r.msg);
         t.write(*this);
-        if (unreliable) pn_delivery_settle(delivery);
         if (pn_link_advance(link)) {
+            if (unreliable) pn_delivery_settle(delivery);
             --outstanding;
             outgoingMessageSent();
             QPID_LOG(debug, "Sent message " << r.msg.getSequence() << " from
" << queue->getName() << ", index=" << r.index);

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1532308&r1=1532307&r2=1532308&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Tue Oct 15 12:42:01 2013
@@ -396,12 +396,13 @@ void Session::setupOutgoing(pn_link_t* l
         authorise.access(node.exchange);//do separate access check before trying to create
the queue
         bool shared = is_capability_requested(SHARED, pn_terminus_capabilities(source));
         bool durable = pn_terminus_get_durability(source);
-        QueueSettings settings(durable, !durable);
+        bool autodelete = !durable && pn_link_remote_snd_settle_mode(link) == PN_SND_SETTLED;
+        QueueSettings settings(durable, autodelete);
         std::string altExchange;
         if (node.topic) {
             settings = node.topic->getPolicy();
             settings.durable = durable;
-            settings.autodelete = !durable;
+            settings.autodelete = autodelete;
             altExchange = node.topic->getAlternateExchange();
         }
         settings.autoDeleteDelay = pn_terminus_get_timeout(source);

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp?rev=1532308&r1=1532307&r2=1532308&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp Tue Oct 15 12:42:01 2013
@@ -812,6 +812,13 @@ void AddressHelper::configure(pn_link_t*
     }
     if (isUnreliable()) {
         pn_link_set_snd_settle_mode(link, PN_SND_SETTLED);
+    } else if (!reliability.empty()) {
+        if (reliability == EXACTLY_ONCE ) {
+            QPID_LOG(warning, "Unsupported reliability mode: " << reliability);
+        } else if (reliability != AT_LEAST_ONCE ) {
+            QPID_LOG(warning, "Unrecognised reliability mode: " << reliability);
+        }
+        pn_link_set_snd_settle_mode(link, PN_SND_UNSETTLED);
     }
 }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message