qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1410750 - in /qpid/trunk/qpid/cpp/src/qpid/broker/amqp: Filter.cpp Filter.h Outgoing.cpp Outgoing.h Session.cpp
Date Sat, 17 Nov 2012 17:08:15 GMT
Author: gsim
Date: Sat Nov 17 17:08:14 2012
New Revision: 1410750

URL: http://svn.apache.org/viewvc?rev=1410750&view=rev
Log:
QPID-4368: Added support for subject filtering on queues

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp?rev=1410750&r1=1410749&r2=1410750&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp Sat Nov 17 17:08:14 2012
@@ -74,6 +74,11 @@ bool Filter::hasSubjectFilter() const
     return !subjectFilter.value.empty();
 }
 
+std::string Filter::getSubjectFilter() const
+{
+    return subjectFilter.value;
+}
+
 
 void Filter::setSubjectFilter(const StringFilter& filter)
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h?rev=1410750&r1=1410749&r2=1410750&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h Sat Nov 17 17:08:14 2012
@@ -39,6 +39,7 @@ class Filter : qpid::amqp::MapReader
     void read(pn_data_t*);
     void write(pn_data_t*);
     bool hasSubjectFilter() const;
+    std::string getSubjectFilter() const;
     void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue>
queue);
   private:
     struct StringFilter

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp?rev=1410750&r1=1410749&r2=1410750&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp Sat Nov 17 17:08:14 2012
@@ -22,6 +22,7 @@
 #include "qpid/broker/amqp/Header.h"
 #include "qpid/broker/amqp/Translation.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/broker/TopicKeyNode.h"
 #include "qpid/sys/OutputControl.h"
 #include "qpid/amqp/MessageEncoder.h"
 #include "qpid/log/Statement.h"
@@ -163,6 +164,57 @@ bool Outgoing::accept(const qpid::broker
     return canDeliver();
 }
 
+void Outgoing::setSubjectFilter(const std::string& f)
+{
+    subjectFilter = f;
+}
+
+namespace {
+
+bool match(TokenIterator& filter, TokenIterator& target)
+{
+    bool wild = false;
+    while (!filter.finished())
+    {
+        if (filter.match1('*')) {
+            if (target.finished()) return false;
+            //else move to next word in filter target
+            filter.next();
+            target.next();
+        } else if (filter.match1('#')) {
+            // i.e. filter word is '#' which can match a variable number of words in the
target
+            filter.next();
+            if (filter.finished()) return true;
+            else if (target.finished()) return false;
+            wild = true;
+        } else {
+            //filter word needs to match target exactly
+            if (target.finished()) return false;
+            std::string word;
+            target.pop(word);
+            if (filter.match(word)) {
+                wild = false;
+                filter.next();
+            } else if (!wild) {
+                return false;
+            }
+        }
+    }
+    return target.finished();
+}
+bool match(const std::string& filter, const std::string& target)
+{
+    TokenIterator lhs(filter);
+    TokenIterator rhs(target);
+    return match(lhs, rhs);
+}
+}
+
+bool Outgoing::filter(const qpid::broker::Message& m)
+{
+    return subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter,
m.getRoutingKey());
+}
+
 void Outgoing::cancel() {}
 
 void Outgoing::acknowledged(const qpid::broker::DeliveryRecord&) {}

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h?rev=1410750&r1=1410749&r2=1410750&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Outgoing.h Sat Nov 17 17:08:14 2012
@@ -60,6 +60,7 @@ class Outgoing : public qpid::broker::Co
 {
   public:
     Outgoing(Broker&,boost::shared_ptr<Queue> q, pn_link_t* l, ManagedSession&,
qpid::sys::OutputControl& o, bool topic);
+    void setSubjectFilter(const std::string&);
     void init();
     bool dispatch();
     void write(const char* data, size_t size);
@@ -71,6 +72,7 @@ class Outgoing : public qpid::broker::Co
     bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg);
     void notify();
     bool accept(const qpid::broker::Message&);
+    bool filter(const qpid::broker::Message&);
     void cancel();
     void acknowledged(const qpid::broker::DeliveryRecord&);
     qpid::broker::OwnershipToken* getSession();
@@ -99,6 +101,7 @@ class Outgoing : public qpid::broker::Co
     size_t current;
     int outstanding;
     std::vector<char> buffer;
+    std::string subjectFilter;
 };
 }}} // namespace qpid::broker::amqp
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1410750&r1=1410749&r2=1410750&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Sat Nov 17 17:08:14 2012
@@ -122,18 +122,21 @@ void Session::attach(pn_link_t* link)
         pn_terminus_set_address(pn_link_source(link), name.c_str());
 
         ResolvedNode node = resolve(name, source);
+        Filter filter;
+        filter.read(pn_terminus_filter(source));
 
         if (node.queue) {
             boost::shared_ptr<Outgoing> q(new Outgoing(broker, node.queue, link, *this,
out, false));
             q->init();
+            if (filter.hasSubjectFilter()) {
+                q->setSubjectFilter(filter.getSubjectFilter());
+            }
             senders[link] = q;
         } else if (node.exchange) {
             QueueSettings settings(false, true);
             //TODO: populate settings from source details when available from engine
             boost::shared_ptr<qpid::broker::Queue> queue
                 = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this,
"", connection.getUserid(), connection.getId()).first;
-            Filter filter;
-            filter.read(pn_terminus_filter(source));
             if (filter.hasSubjectFilter()) {
                 filter.bind(node.exchange, queue);
                 filter.write(pn_terminus_filter(pn_link_source(link)));



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message