qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1135165 - in /qpid/trunk/qpid/cpp/src: qpid/client/amqp0_10/AddressResolution.cpp tests/MessagingSessionTests.cpp
Date Mon, 13 Jun 2011 16:37:11 GMT
Author: gsim
Date: Mon Jun 13 16:37:10 2011
New Revision: 1135165

URL: http://svn.apache.org/viewvc?rev=1135165&view=rev
Log:
QPID-3225: Allow exclusivity (for subscription queue and subscription itself) to be controlled
through address properties.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
    qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp?rev=1135165&r1=1135164&r2=1135165&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp Mon Jun 13 16:37:10
2011
@@ -233,6 +233,8 @@ class Subscription : public Exchange, pu
     const bool reliable;
     const bool durable;
     const std::string actualType;
+    const bool exclusiveQueue;
+    const bool exclusiveSubscription;
     FieldTable queueOptions;
     FieldTable subscriptionOptions;
     Bindings bindings;
@@ -307,6 +309,7 @@ struct Opt
     Opt& operator/(const std::string& name);
     operator bool() const;
     std::string str() const;
+    bool asBool(bool defaultValue) const;
     const Variant::List& asList() const;
     void collect(qpid::framing::FieldTable& args) const;
 
@@ -338,6 +341,12 @@ Opt::operator bool() const
     return value && !value->isVoid() && value->asBool();
 }
 
+bool Opt::asBool(bool defaultValue) const
+{
+    if (value) return value->asBool();
+    else return defaultValue;
+}
+
 std::string Opt::str() const
 {
     if (value) return value->asString();
@@ -490,7 +499,9 @@ Subscription::Subscription(const Address
       queue(getSubscriptionName(name, (Opt(address)/LINK/NAME).str())),
       reliable(AddressResolution::is_reliable(address)),
       durable(Opt(address)/LINK/DURABLE),
-      actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType)
: type)
+      actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType)
: type),
+      exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)),
+      exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue))
 {
     (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions);
     (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions);
@@ -550,7 +561,7 @@ void Subscription::subscribe(qpid::clien
     checkAssert(session, FOR_RECEIVER);
 
     //create subscription queue:
-    session.queueDeclare(arg::queue=queue, arg::exclusive=true, 
+    session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue,
                          arg::autoDelete=!reliable, arg::durable=durable, arg::arguments=queueOptions);
     //'default' binding:
     bindings.bind(session);
@@ -559,15 +570,15 @@ void Subscription::subscribe(qpid::clien
     linkBindings.bind(session);
     //subscribe to subscription queue:
     AcceptMode accept = reliable ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE;
-    session.messageSubscribe(arg::queue=queue, arg::destination=destination, 
-                             arg::exclusive=true, arg::acceptMode=accept, arg::arguments=subscriptionOptions);
+    session.messageSubscribe(arg::queue=queue, arg::destination=destination,
+                             arg::exclusive=exclusiveSubscription, arg::acceptMode=accept,
arg::arguments=subscriptionOptions);
 }
 
 void Subscription::cancel(qpid::client::AsyncSession& session, const std::string&
destination)
 {
     linkBindings.unbind(session);
     session.messageCancel(destination);
-    session.queueDelete(arg::queue=queue);
+    if (reliable) session.queueDelete(arg::queue=queue, arg::ifUnused=true);
     checkDelete(session, FOR_RECEIVER);
 }
 

Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=1135165&r1=1135164&r2=1135165&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Mon Jun 13 16:37:10 2011
@@ -992,6 +992,38 @@ QPID_AUTO_TEST_CASE(testTtlForever)
     BOOST_CHECK(in.getTtl() == Duration::FOREVER);
 }
 
+QPID_AUTO_TEST_CASE(testExclusiveTopicSubscriber)
+{
+    TopicFixture fix;
+    std::string address = (boost::format("%1%; { link: { name: 'my-subscription', x-declare:
{ auto-delete: true, exclusive: true }}}") % fix.topic).str();
+    Sender sender = fix.session.createSender(fix.topic);
+    Receiver receiver1 = fix.session.createReceiver(address);
+    {
+        ScopedSuppressLogging sl;
+    try {
+        fix.session.createReceiver(address);
+        fix.session.sync();
+        BOOST_FAIL("Expected exception.");
+    } catch (const MessagingException& /*e*/) {}
+    }
+}
+
+QPID_AUTO_TEST_CASE(testNonExclusiveSubscriber)
+{
+    TopicFixture fix;
+    std::string address = (boost::format("%1%; {node:{type:topic}, link:{name:'my-subscription',
x-declare:{auto-delete:true, exclusive:false}}}") % fix.topic).str();
+    Receiver receiver1 = fix.session.createReceiver(address);
+    Receiver receiver2 = fix.session.createReceiver(address);
+    Sender sender = fix.session.createSender(fix.topic);
+    sender.send(Message("one"), true);
+    Message in = receiver1.fetch(Duration::IMMEDIATE);
+    BOOST_CHECK_EQUAL(in.getContent(), std::string("one"));
+    sender.send(Message("two"), true);
+    in = receiver2.fetch(Duration::IMMEDIATE);
+    BOOST_CHECK_EQUAL(in.getContent(), std::string("two"));
+    fix.session.acknowledge();
+}
+
 QPID_AUTO_TEST_SUITE_END()
 
 }} // namespace qpid::tests



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message