qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1492310 - in /qpid/trunk/qpid: cpp/src/qpid/amqp/ cpp/src/qpid/broker/amqp/ cpp/src/qpid/messaging/amqp/ specs/
Date Wed, 12 Jun 2013 17:56:13 GMT
Author: gsim
Date: Wed Jun 12 17:56:12 2013
New Revision: 1492310

URL: http://svn.apache.org/r1492310
Log:
QPID-4766: Added generic filter support to address handling in qpid::messaging. Added support
for legacy-headers-binding and newly defined xquery filters.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/amqp/MapReader.cpp
    qpid/trunk/qpid/cpp/src/qpid/amqp/MapReader.h
    qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
    qpid/trunk/qpid/specs/apache-filters.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp/MapReader.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp/MapReader.cpp?rev=1492310&r1=1492309&r2=1492310&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp/MapReader.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp/MapReader.cpp Wed Jun 12 17:56:12 2013
@@ -30,7 +30,7 @@ void MapReader::onNull(const Descriptor*
     if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
     if (key) {
         onNullValue(key, d);
-        key.data = 0; key.size = 0;
+        clearKey();
     } else {
         throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
     }
@@ -40,7 +40,7 @@ void MapReader::onBoolean(bool v, const 
     if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
     if (key) {
         onBooleanValue(key, v, d);
-        key.data = 0; key.size = 0;
+        clearKey();
     } else {
         throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
     }
@@ -51,7 +51,7 @@ void MapReader::onUByte(uint8_t v, const
     if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
     if (key) {
         onUByteValue(key, v, d);
-        key.data = 0; key.size = 0;
+        clearKey();
     } else {
         throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
     }
@@ -62,7 +62,7 @@ void MapReader::onUShort(uint16_t v, con
     if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
     if (key) {
         onUShortValue(key, v, d);
-        key.data = 0; key.size = 0;
+        clearKey();
     } else {
         throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
     }
@@ -73,7 +73,7 @@ void MapReader::onUInt(uint32_t v, const
     if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
     if (key) {
         onUIntValue(key, v, d);
-        key.data = 0; key.size = 0;
+        clearKey();
     } else {
         throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
     }
@@ -84,7 +84,7 @@ void MapReader::onULong(uint64_t v, cons
     if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
     if (key) {
         onULongValue(key, v, d);
-        key.data = 0; key.size = 0;
+        clearKey();
     } else {
         throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
     }
@@ -95,7 +95,7 @@ void MapReader::onByte(int8_t v, const D
     if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
     if (key) {
         onByteValue(key, v, d);
-        key.data = 0; key.size = 0;
+        clearKey();
     } else {
         throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
     }
@@ -106,7 +106,7 @@ void MapReader::onShort(int16_t v, const
     if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
     if (key) {
         onShortValue(key, v, d);
-        key.data = 0; key.size = 0;
+        clearKey();
     } else {
         throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
     }
@@ -117,7 +117,7 @@ void MapReader::onInt(int32_t v, const D
     if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
     if (key) {
         onIntValue(key, v, d);
-        key.data = 0; key.size = 0;
+        clearKey();
     } else {
         throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
     }
@@ -128,7 +128,7 @@ void MapReader::onLong(int64_t v, const 
     if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
     if (key) {
         onLongValue(key, v, d);
-        key.data = 0; key.size = 0;
+        clearKey();
     } else {
         throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
     }
@@ -139,7 +139,7 @@ void MapReader::onFloat(float v, const D
     if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
     if (key) {
         onFloatValue(key, v, d);
-        key.data = 0; key.size = 0;
+        clearKey();
     } else {
         throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
     }
@@ -150,7 +150,7 @@ void MapReader::onDouble(double v, const
     if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
     if (key) {
         onDoubleValue(key, v, d);
-        key.data = 0; key.size = 0;
+        clearKey();
     } else {
         throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
     }
@@ -161,7 +161,7 @@ void MapReader::onUuid(const CharSequenc
     if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
     if (key) {
         onUuidValue(key, v, d);
-        key.data = 0; key.size = 0;
+        clearKey();
     } else {
         throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
     }
@@ -172,7 +172,7 @@ void MapReader::onTimestamp(int64_t v, c
     if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
     if (key) {
         onTimestampValue(key, v, d);
-        key.data = 0; key.size = 0;
+        clearKey();
     } else {
         throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
     }
@@ -183,7 +183,7 @@ void MapReader::onBinary(const CharSeque
     if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
     if (key) {
         onBinaryValue(key, v, d);
-        key.data = 0; key.size = 0;
+        clearKey();
     } else {
         throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
     }
@@ -194,9 +194,13 @@ void MapReader::onString(const CharSeque
     if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
     if (key) {
         onStringValue(key, v, d);
-        key.data = 0; key.size = 0;
+        clearKey();
     } else {
-        throw qpid::Exception(QPID_MSG("Expecting symbol as key, got string " << v.str()));
+        if (keyType & STRING_KEY) {
+            key = v;
+        } else {
+            throw qpid::Exception(QPID_MSG("Expecting symbol as key, got string " <<
v.str()));
+        }
     }
 }
 
@@ -205,9 +209,13 @@ void MapReader::onSymbol(const CharSeque
     if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
     if (key) {
         onSymbolValue(key, v, d);
-        key.data = 0; key.size = 0;
+        clearKey();
     } else {
-        key = v;
+        if (keyType & SYMBOL_KEY) {
+            key = v;
+        } else {
+            throw qpid::Exception(QPID_MSG("Expecting string as key, got symbol " <<
v.str()));
+        }
     }
 }
 
@@ -216,7 +224,7 @@ bool MapReader::onStartList(uint32_t cou
     if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
     if (key) {
         bool step = onStartListValue(key, count, d);
-        key.data = 0; key.size = 0;
+        clearKey();
         return step;
     } else {
         throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
@@ -229,7 +237,7 @@ bool MapReader::onStartMap(uint32_t coun
     if (level++) {
         if (key) {
             bool step = onStartMapValue(key, count, d);
-            key.data = 0; key.size = 0;
+            clearKey();
             return step;
         } else {
             throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
@@ -243,7 +251,7 @@ bool MapReader::onStartArray(uint32_t co
     if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
     if (key) {
         bool step = onStartArrayValue(key, count, c, d);
-        key.data = 0; key.size = 0;
+        clearKey();
         return step;
     } else {
         throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
@@ -256,7 +264,7 @@ void MapReader::onEndList(uint32_t count
     if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
     if (key) {
         onEndListValue(key, count, d);
-        key.data = 0; key.size = 0;
+        clearKey();
     } else {
         throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
     }
@@ -266,7 +274,7 @@ void MapReader::onEndMap(uint32_t count,
 {
     if (--level) {
         onEndMapValue(key, count, d);
-        key.data = 0; key.size = 0;
+        clearKey();
     }
 }
 
@@ -275,15 +283,27 @@ void MapReader::onEndArray(uint32_t coun
     if (!level) throw qpid::Exception(QPID_MSG("Expecting map as top level datum"));
     if (key) {
         onEndArrayValue(key, count, d);
-        key.data = 0; key.size = 0;
+        clearKey();
     } else {
         throw qpid::Exception(QPID_MSG("Expecting symbol as key"));
     }
 }
 
-MapReader::MapReader() : level(0)
+MapReader::MapReader() : level(0), keyType(SYMBOL_KEY)
+{
+    clearKey();
+}
+
+void MapReader::setAllowedKeyType(int t)
+{
+    keyType = t;
+}
+
+void MapReader::clearKey()
 {
     key.data = 0; key.size = 0;
 }
 
+const int MapReader::SYMBOL_KEY(1);
+const int MapReader::STRING_KEY(2);
 }} // namespace qpid::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp/MapReader.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp/MapReader.h?rev=1492310&r1=1492309&r2=1492310&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp/MapReader.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp/MapReader.h Wed Jun 12 17:56:12 2013
@@ -95,9 +95,15 @@ class MapReader : public Reader
     void onEndArray(uint32_t /*count*/, const Descriptor*);
 
     MapReader();
+    static const int SYMBOL_KEY;
+    static const int STRING_KEY;
+    void setAllowedKeyType(int);
   private:
     CharSequence key;
     size_t level;
+    int keyType;
+
+    void clearKey();
 };
 }} // namespace qpid::amqp
 

Modified: qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h?rev=1492310&r1=1492309&r2=1492310&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/amqp/descriptors.h Wed Jun 12 17:56:12 2013
@@ -78,11 +78,15 @@ const Descriptor SASL_OUTCOME(SASL_OUTCO
 namespace filters {
 const std::string LEGACY_DIRECT_FILTER_SYMBOL("apache.org:legacy-amqp-direct-binding:string");
 const std::string LEGACY_TOPIC_FILTER_SYMBOL("apache.org:legacy-amqp-topic-binding:string");
+const std::string LEGACY_HEADERS_FILTER_SYMBOL("apache.org:legacy-amqp-headers-binding:map");
 const std::string SELECTOR_FILTER_SYMBOL("apache.org:selector-filter:string");
+const std::string XQUERY_FILTER_SYMBOL("apache.org:xquery-filter:string");
 
 const uint64_t LEGACY_DIRECT_FILTER_CODE(0x0000468C00000000ULL);
 const uint64_t LEGACY_TOPIC_FILTER_CODE(0x0000468C00000001ULL);
+const uint64_t LEGACY_HEADERS_FILTER_CODE(0x0000468C00000002ULL);
 const uint64_t SELECTOR_FILTER_CODE(0x0000468C00000004ULL);
+const uint64_t XQUERY_FILTER_CODE(0x0000468C00000005ULL);
 }
 
 }} // namespace qpid::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp?rev=1492310&r1=1492309&r2=1492310&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.cpp Wed Jun 12 17:56:12 2013
@@ -20,9 +20,12 @@
  */
 #include "qpid/broker/amqp/Filter.h"
 #include "qpid/broker/amqp/DataReader.h"
+#include "qpid/broker/amqp/Outgoing.h"
 #include "qpid/broker/DirectExchange.h"
+#include "qpid/broker/HeadersExchange.h"
 #include "qpid/broker/TopicExchange.h"
 #include "qpid/amqp/descriptors.h"
+#include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/log/Statement.h"
 extern "C" {
 #include <proton/engine.h>
@@ -31,6 +34,18 @@ extern "C" {
 namespace qpid {
 namespace broker {
 namespace amqp {
+namespace {
+const std::string XQUERY("xquery");
+const std::string XML("xml");
+const std::string DEFAULT_SUBJECT_FILTER("default-subject-filter");
+const std::string DEFAULT_HEADERS_FILTER("default-headers-filter");
+const std::string XMATCH("x-match");
+const std::string ALL("all");
+const std::string DEFAULT_XQUERY_FILTER("default-xquery-filter");
+const std::string DEFAULT_XQUERY_VALUE("true()");
+const std::string WILDCARD("#");
+}
+Filter::Filter() : inHeadersMap(false) {}
 
 void Filter::read(pn_data_t* data)
 {
@@ -44,52 +59,141 @@ void Filter::read(pn_data_t* data)
 
 void Filter::write(pn_data_t* data)
 {
-    pn_data_put_map(data);
-    pn_data_enter(data);
-    subjectFilter.write(data);
-    pn_data_exit(data);
+    if (!active.empty()) {
+        pn_data_put_map(data);
+        pn_data_enter(data);
+        for (std::vector<FilterBase*>::const_iterator i = active.begin(); i != active.end();
++i) {
+            (*i)->write(data);
+        }
+        pn_data_exit(data);
+    }
 }
 
 void Filter::onStringValue(const qpid::amqp::CharSequence& key, const qpid::amqp::CharSequence&
value, const qpid::amqp::Descriptor* descriptor)
 {
-    StringFilter filter;
-    filter.key = std::string(key.data, key.size);
-    filter.value = std::string(value.data, value.size);
-    if (descriptor) {
-        filter.described = true;
-        filter.descriptor = *descriptor;
-        if (descriptor->match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE)
-            || descriptor->match(qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE))
{
-            setSubjectFilter(filter);
-        } else if (descriptor->match(qpid::amqp::filters::SELECTOR_FILTER_SYMBOL, qpid::amqp::filters::SELECTOR_FILTER_CODE))
{
-            setSelectorFilter(filter);
+    if (inHeadersMap) {
+        headersFilter.value[std::string(key.data, key.size)] = std::string(value.data, value.size);
+    } else {
+        StringFilter filter;
+        filter.key = std::string(key.data, key.size);
+        filter.value = std::string(value.data, value.size);
+        if (descriptor) {
+            filter.described = true;
+            filter.descriptor = *descriptor;
+            if (descriptor->match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE)
+                || descriptor->match(qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL,
qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE)) {
+                setFilter(subjectFilter, filter);
+            } else if (descriptor->match(qpid::amqp::filters::SELECTOR_FILTER_SYMBOL,
qpid::amqp::filters::SELECTOR_FILTER_CODE)) {
+                setFilter(selectorFilter, filter);
+            } else if (descriptor->match(qpid::amqp::filters::XQUERY_FILTER_SYMBOL, qpid::amqp::filters::XQUERY_FILTER_CODE))
{
+                setFilter(xqueryFilter, filter);
+            } else {
+                QPID_LOG(notice, "Skipping unrecognised string filter with key " <<
filter.key << " and descriptor " << filter.descriptor);
+            }
         } else {
-            QPID_LOG(notice, "Skipping unrecognised string filter with key " << filter.key
<< " and descriptor " << filter.descriptor);
+            setFilter(subjectFilter, filter);
         }
-    } else {
-        setSubjectFilter(filter);
     }
 }
+void Filter::onNullValue(const qpid::amqp::CharSequence& key, const qpid::amqp::Descriptor*)
+{
+    headersFilter.value[std::string(key.data, key.size)] = qpid::types::Variant();
+}
+void Filter::onBooleanValue(const qpid::amqp::CharSequence& key, bool value, const qpid::amqp::Descriptor*)
+{
+    headersFilter.value[std::string(key.data, key.size)] = value;
+}
 
-bool Filter::hasSubjectFilter() const
+void Filter::onUByteValue(const qpid::amqp::CharSequence& key, uint8_t value, const qpid::amqp::Descriptor*)
 {
-    return !subjectFilter.value.empty();
+    headersFilter.value[std::string(key.data, key.size)] = value;
 }
 
-std::string Filter::getSubjectFilter() const
+void Filter::onUShortValue(const qpid::amqp::CharSequence& key, uint16_t value, const
qpid::amqp::Descriptor*)
 {
-    return subjectFilter.value;
+    headersFilter.value[std::string(key.data, key.size)] = value;
 }
 
+void Filter::onUIntValue(const qpid::amqp::CharSequence& key, uint32_t value, const qpid::amqp::Descriptor*)
+{
+    headersFilter.value[std::string(key.data, key.size)] = value;
+}
 
-void Filter::setSubjectFilter(const StringFilter& filter)
+void Filter::onULongValue(const qpid::amqp::CharSequence& key, uint64_t value, const
qpid::amqp::Descriptor*)
 {
-    if (hasSubjectFilter()) {
-        QPID_LOG(notice, "Skipping filter with key " << filter.key << "; subject
filter already set");
+    headersFilter.value[std::string(key.data, key.size)] = value;
+}
+
+void Filter::onByteValue(const qpid::amqp::CharSequence& key, int8_t value, const qpid::amqp::Descriptor*)
+{
+    headersFilter.value[std::string(key.data, key.size)] = value;
+}
+
+void Filter::onShortValue(const qpid::amqp::CharSequence& key, int16_t value, const qpid::amqp::Descriptor*)
+{
+    headersFilter.value[std::string(key.data, key.size)] = value;
+}
+
+void Filter::onIntValue(const qpid::amqp::CharSequence& key, int32_t value, const qpid::amqp::Descriptor*)
+{
+    headersFilter.value[std::string(key.data, key.size)] = value;
+}
+
+void Filter::onLongValue(const qpid::amqp::CharSequence& key, int64_t value, const qpid::amqp::Descriptor*)
+{
+    headersFilter.value[std::string(key.data, key.size)] = value;
+}
+
+void Filter::onFloatValue(const qpid::amqp::CharSequence& key, float value, const qpid::amqp::Descriptor*)
+{
+    headersFilter.value[std::string(key.data, key.size)] = value;
+}
+
+void Filter::onDoubleValue(const qpid::amqp::CharSequence& key, double value, const qpid::amqp::Descriptor*)
+{
+    headersFilter.value[std::string(key.data, key.size)] = value;
+}
+
+bool Filter::onStartMapValue(const qpid::amqp::CharSequence& key, uint32_t /*count*/,
const qpid::amqp::Descriptor* descriptor)
+{
+    if (inHeadersMap) {
+        QPID_LOG(warning, "Skipping illegal nested map data in headers filter");
+        return false;
+    } else if (descriptor && descriptor->match(qpid::amqp::filters::LEGACY_HEADERS_FILTER_SYMBOL,
qpid::amqp::filters::LEGACY_HEADERS_FILTER_CODE)) {
+        inHeadersMap = true;
+        setAllowedKeyType(STRING_KEY);
+        headersFilter.requested = true;
+        headersFilter.described = true;
+        headersFilter.descriptor = *descriptor;
+        headersFilter.key = std::string(key.data, key.size);
+        return true;
     } else {
-        subjectFilter = filter;
+        if (descriptor) {
+            QPID_LOG(info, "Skipping unrecognised map data in filter: " << *descriptor);
+        } else {
+            QPID_LOG(info, "Skipping undescribed map data in filter");
+        }
+        return false;
     }
 }
+void Filter::onEndMapValue(const qpid::amqp::CharSequence&, uint32_t, const qpid::amqp::Descriptor*)
+{
+    if (inHeadersMap) {
+        inHeadersMap = false;
+        setAllowedKeyType(SYMBOL_KEY);
+    }
+}
+
+bool Filter::hasSubjectFilter() const
+{
+    return !subjectFilter.value.empty();
+}
+
+std::string Filter::getSubjectFilter() const
+{
+    return subjectFilter.value;
+}
+
 
 bool Filter::hasSelectorFilter() const
 {
@@ -101,22 +205,106 @@ std::string Filter::getSelectorFilter() 
     return selectorFilter.value;
 }
 
+void Filter::setFilter(StringFilter& lhs, const StringFilter& rhs)
+{
+    if (!lhs.value.empty()) {
+        QPID_LOG(notice, "Skipping filter with key " << rhs.key << "; value provided
for " << lhs.key << " already");
+    } else {
+        lhs = rhs;
+        lhs.requested = true;
+    }
+}
 
-void Filter::setSelectorFilter(const StringFilter& filter)
+void Filter::apply(boost::shared_ptr<Outgoing> queue)
 {
+    if (hasSubjectFilter()) {
+        queue->setSubjectFilter(getSubjectFilter());
+        active.push_back(&subjectFilter);
+    }
     if (hasSelectorFilter()) {
-        QPID_LOG(notice, "Skipping filter with key " << filter.key << "; selector
filter already set");
-    } else {
-        selectorFilter = filter;
+        queue->setSelectorFilter(getSelectorFilter());
+        active.push_back(&selectorFilter);
+    }
+}
+
+void Filter::configure(QueueSettings& settings)
+{
+    if (hasSelectorFilter()) {
+        settings.filter = getSelectorFilter();
+        active.push_back(&selectorFilter);
     }
 }
 
 void Filter::bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue>
queue)
 {
-    subjectFilter.bind(exchange, queue);
+    qpid::framing::FieldTable bindingArgs;
+    if (exchange->getType() == TopicExchange::typeName) {
+        setDefaultSubjectFilter(true);
+        active.push_back(&subjectFilter);
+    } else if (exchange->getType() == DirectExchange::typeName) {
+        if (!setDefaultSubjectFilter() && adjustDirectFilter()) {
+            QPID_LOG(info, "Using legacy topic filter as a direct matching filter for " <<
exchange->getName());
+        }
+        active.push_back(&subjectFilter);
+    } else if (exchange->getType() == HeadersExchange::typeName) {
+        setDefaultHeadersFilter();
+        qpid::amqp_0_10::translate(headersFilter.value, bindingArgs);
+        active.push_back(&headersFilter);
+    } else if (exchange->getType() == XML) {
+        setDefaultXQueryFilter();
+        setDefaultSubjectFilter();
+        bindingArgs.setString(XQUERY, xqueryFilter.value);
+        active.push_back(&subjectFilter);
+        active.push_back(&xqueryFilter);
+    }
+    exchange->bind(queue, subjectFilter.value, &bindingArgs);
+}
+
+void Filter::setDefaultXQueryFilter()
+{
+    if (!xqueryFilter.requested) {
+        xqueryFilter.key = DEFAULT_XQUERY_FILTER;
+        xqueryFilter.value = DEFAULT_XQUERY_VALUE;
+        xqueryFilter.setDescriptor(qpid::amqp::Descriptor(qpid::amqp::filters::XQUERY_FILTER_CODE));
+    }
+}
+void Filter::setDefaultHeadersFilter()
+{
+    if (!headersFilter.requested) {
+        headersFilter.key = DEFAULT_HEADERS_FILTER;
+        headersFilter.value[XMATCH] = ALL;
+        headersFilter.setDescriptor(qpid::amqp::Descriptor(qpid::amqp::filters::LEGACY_HEADERS_FILTER_CODE));
+    }
 }
 
-Filter::StringFilter::StringFilter() : described(false), descriptor(0) {}
+bool Filter::setDefaultSubjectFilter(const qpid::amqp::Descriptor& d, const std::string&
value)
+{
+    if (!subjectFilter.requested) {
+        subjectFilter.key = DEFAULT_SUBJECT_FILTER;
+        subjectFilter.value = value;
+        subjectFilter.setDescriptor(d);
+        return true;
+    } else {
+        return false;
+    }
+}
+
+bool Filter::setDefaultSubjectFilter(bool wildcards)
+{
+    if (wildcards) {
+        return setDefaultSubjectFilter(qpid::amqp::Descriptor(qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE),
WILDCARD);
+    } else {
+        return setDefaultSubjectFilter(qpid::amqp::Descriptor(qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE));
+    }
+}
+
+Filter::FilterBase::FilterBase() : described(false), requested(false), descriptor(0) {}
+Filter::FilterBase::~FilterBase() {}
+void Filter::FilterBase::setDescriptor(const qpid::amqp::Descriptor& d)
+{
+    described = true;
+    descriptor = d;
+}
 namespace {
 pn_bytes_t convert(const std::string& in)
 {
@@ -132,8 +320,30 @@ pn_bytes_t convert(const qpid::amqp::Cha
     out.size = in.size;
     return out;
 }
+qpid::amqp::Descriptor symbolicDescriptor(const std::string& symbol)
+{
+    qpid::amqp::CharSequence cs;
+    cs.data = symbol.data();
+    cs.size = symbol.size();
+    return qpid::amqp::Descriptor(cs);
+}
 }
-void Filter::StringFilter::write(pn_data_t* data)
+
+bool Filter::adjustDirectFilter()
+{
+    if (subjectFilter.descriptor.match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE))
{
+        if (subjectFilter.descriptor.type == qpid::amqp::Descriptor::NUMERIC) {
+            subjectFilter.descriptor = qpid::amqp::Descriptor(qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE);
+        } else {
+            subjectFilter.descriptor = symbolicDescriptor(qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL);
+        }
+        return true;
+    } else {
+        return false;
+    }
+}
+
+void Filter::FilterBase::write(pn_data_t* data)
 {
     pn_data_put_symbol(data, convert(key));
     if (described) {
@@ -147,26 +357,27 @@ void Filter::StringFilter::write(pn_data
             pn_data_put_symbol(data, convert(descriptor.value.symbol));
             break;
         }
+        writeValue(data);
+        pn_data_exit(data);
+    } else {
+        writeValue(data);
     }
+}
+void Filter::StringFilter::writeValue(pn_data_t* data)
+{
     pn_data_put_string(data, convert(value));
-    if (described) pn_data_exit(data);
 }
 
-void Filter::StringFilter::bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue>
queue)
+void Filter::MapFilter::writeValue(pn_data_t* data)
 {
-    if (described && exchange->getType() == DirectExchange::typeName
-        && descriptor.match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE))
{
-        QPID_LOG(info, "Using legacy topic filter as a direct matching filter for " <<
exchange->getName());
-        if (descriptor.type == qpid::amqp::Descriptor::NUMERIC) {
-            descriptor = qpid::amqp::Descriptor(qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE);
-        } else {
-            qpid::amqp::CharSequence symbol;
-            symbol.data = qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL.data();
-            symbol.size = qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL.size();
-            descriptor = qpid::amqp::Descriptor(symbol);
-        }
+    pn_data_put_map(data);
+    pn_data_enter(data);
+    for (ValueType::const_iterator i = value.begin(); i != value.end(); ++i) {
+        pn_data_put_string(data, convert(i->first));
+        pn_data_put_string(data, convert(i->second));//TODO: other types?
     }
-    exchange->bind(queue, value, 0);
+    pn_data_exit(data);
 }
 
+
 }}} // namespace qpid::broker::amqp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h?rev=1492310&r1=1492309&r2=1492310&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Filter.h Wed Jun 12 17:56:12 2013
@@ -23,6 +23,9 @@
  */
 #include "qpid/amqp/MapReader.h"
 #include "qpid/amqp/Descriptor.h"
+#include "qpid/types/Variant.h"
+#include <map>
+#include <vector>
 #include <boost/shared_ptr.hpp>
 
 struct pn_data_t;
@@ -30,37 +33,87 @@ namespace qpid {
 namespace broker {
 class Exchange;
 class Queue;
+class QueueSettings;
 namespace amqp {
-
+class Outgoing;
 
 class Filter : qpid::amqp::MapReader
 {
   public:
+    Filter();
     void read(pn_data_t*);
     void write(pn_data_t*);
-    bool hasSubjectFilter() const;
-    std::string getSubjectFilter() const;
-    bool hasSelectorFilter() const;
-    std::string getSelectorFilter() const;
+
+    /**
+     * Apply filters where source is a queue
+     */
+    void apply(boost::shared_ptr<Outgoing> queue);
+
+    /**
+     * Configure subscription queue for case where source is an exchange
+     */
+    void configure(QueueSettings&);
+    /**
+     * Bind subscription queue for case where source is an exchange
+     */
     void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue>
queue);
   private:
-    struct StringFilter
+    struct FilterBase
     {
         bool described;
+        bool requested;
         qpid::amqp::Descriptor descriptor;
         std::string key;
-        std::string value;
-        StringFilter();
+        FilterBase();
+        virtual ~FilterBase();
         void write(pn_data_t*);
-        void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue>
queue);
+        virtual void writeValue(pn_data_t*) = 0;
+        void setDescriptor(const qpid::amqp::Descriptor&);
+    };
+    struct StringFilter : FilterBase
+    {
+        std::string value;
+        void writeValue(pn_data_t*);
+    };
+    struct MapFilter : FilterBase
+    {
+        typedef std::map<std::string, qpid::types::Variant> ValueType;
+        ValueType value;
+        void writeValue(pn_data_t*);
     };
 
     void onStringValue(const qpid::amqp::CharSequence& key, const qpid::amqp::CharSequence&
value, const qpid::amqp::Descriptor* descriptor);
-    void setSubjectFilter(const StringFilter&);
-    void setSelectorFilter(const StringFilter&);
+    void onNullValue(const qpid::amqp::CharSequence&, const qpid::amqp::Descriptor*);
+    void onBooleanValue(const qpid::amqp::CharSequence&, bool, const qpid::amqp::Descriptor*);
+    void onUByteValue(const qpid::amqp::CharSequence&, uint8_t, const qpid::amqp::Descriptor*);
+    void onUShortValue(const qpid::amqp::CharSequence&, uint16_t, const qpid::amqp::Descriptor*);
+    void onUIntValue(const qpid::amqp::CharSequence&, uint32_t, const qpid::amqp::Descriptor*);
+    void onULongValue(const qpid::amqp::CharSequence&, uint64_t, const qpid::amqp::Descriptor*);
+    void onByteValue(const qpid::amqp::CharSequence&, int8_t, const qpid::amqp::Descriptor*);
+    void onShortValue(const qpid::amqp::CharSequence&, int16_t, const qpid::amqp::Descriptor*);
+    void onIntValue(const qpid::amqp::CharSequence&, int32_t, const qpid::amqp::Descriptor*);
+    void onLongValue(const qpid::amqp::CharSequence&, int64_t, const qpid::amqp::Descriptor*);
+    void onFloatValue(const qpid::amqp::CharSequence&, float, const qpid::amqp::Descriptor*);
+    void onDoubleValue(const qpid::amqp::CharSequence&, double, const qpid::amqp::Descriptor*);
+    bool onStartMapValue(const qpid::amqp::CharSequence& key, uint32_t count, const qpid::amqp::Descriptor*
descriptor);
+    void onEndMapValue(const qpid::amqp::CharSequence& key, uint32_t count, const qpid::amqp::Descriptor*
descriptor);
+    void setFilter(StringFilter&, const StringFilter&);
+    bool hasSubjectFilter() const;
+    std::string getSubjectFilter() const;
+    bool hasSelectorFilter() const;
+    std::string getSelectorFilter() const;
+    bool setDefaultSubjectFilter(bool wildcards=false);
+    bool setDefaultSubjectFilter(const qpid::amqp::Descriptor& descriptor, const std::string&
value=std::string());
+    bool adjustDirectFilter();
+    void setDefaultHeadersFilter();
+    void setDefaultXQueryFilter();
 
     StringFilter subjectFilter;
     StringFilter selectorFilter;
+    StringFilter xqueryFilter;
+    MapFilter headersFilter;
+    std::vector<FilterBase*> active;
+    bool inHeadersMap;
 };
 }}} // namespace qpid::broker::amqp
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1492310&r1=1492309&r2=1492310&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Wed Jun 12 17:56:12 2013
@@ -278,34 +278,20 @@ void Session::setupOutgoing(pn_link_t* l
     if (node.queue) {
         boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, node.queue,
link, *this, out, false));
         q->init();
-        if (filter.hasSubjectFilter()) {
-            q->setSubjectFilter(filter.getSubjectFilter());
-        }
-        if (filter.hasSelectorFilter()) {
-            q->setSelectorFilter(filter.getSelectorFilter());
-        }
+        filter.apply(q);
         outgoing[link] = q;
     } else if (node.exchange) {
         bool durable = pn_terminus_get_durability(source);
         QueueSettings settings(durable, !durable);
-        if (filter.hasSelectorFilter()) {
-            settings.filter = filter.getSelectorFilter();
-            QPID_LOG(debug, "Selector specified for outgoing link from exchange " <<
node.exchange->getName() << ": " << settings.filter);
-        }
+        filter.configure(settings);
         //TODO: populate settings from source details when available from engine
         std::stringstream queueName;//combination of container id and link name is unique
         queueName << connection.getContainerId() << "_" << pn_link_name(link);
         boost::shared_ptr<qpid::broker::Queue> queue
             = broker.createQueue(queueName.str(), settings, this, "", connection.getUserid(),
connection.getId()).first;
         queue->setExclusiveOwner(this);
-        if (filter.hasSubjectFilter()) {
-            filter.bind(node.exchange, queue);
-            filter.write(pn_terminus_filter(pn_link_source(link)));
-        } else if (node.exchange->getType() == TopicExchange::typeName) {
-            node.exchange->bind(queue, "#", 0);
-        } else {
-            node.exchange->bind(queue, std::string(), 0);
-        }
+
+        filter.bind(node.exchange, queue);
         boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, queue,
link, *this, out, true));
         outgoing[link] = q;
         q->init();
@@ -317,6 +303,7 @@ void Session::setupOutgoing(pn_link_t* l
         pn_terminus_set_type(pn_link_source(link), PN_UNSPECIFIED);
         throw qpid::Exception("Node not found: " + name);/*not-found*/
     }
+    filter.write(pn_terminus_filter(pn_link_source(link)));
     QPID_LOG(debug, "Outgoing link attached");
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp?rev=1492310&r1=1492309&r2=1492310&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp Wed Jun 12 17:56:12 2013
@@ -21,6 +21,7 @@
 #include "qpid/messaging/amqp/AddressHelper.h"
 #include "qpid/messaging/Address.h"
 #include "qpid/messaging/AddressImpl.h"
+#include "qpid/amqp/descriptors.h"
 #include "qpid/log/Statement.h"
 #include <vector>
 #include <set>
@@ -64,6 +65,10 @@ const std::string DURABLE("durable");
 const std::string NAME("name");
 const std::string RELIABILITY("reliability");
 const std::string SELECTOR("selector");
+const std::string FILTER("filter");
+const std::string DESCRIPTOR("descriptor");
+const std::string VALUE("value");
+const std::string SUBJECT_FILTER("subject-filter");
 
 //distribution modes:
 const std::string MOVE("move");
@@ -100,7 +105,15 @@ pn_bytes_t convert(const std::string& s)
     result.size = s.size();
     return result;
 }
+bool hasWildcards(const std::string& key)
+{
+    return key.find('*') != std::string::npos || key.find('#') != std::string::npos;
+}
 
+uint64_t getFilterDescriptor(const std::string& key)
+{
+    return hasWildcards(key) ? qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE : qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE;
+}
 bool contains(const Variant::List& list, const std::string& item)
 {
     for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
@@ -183,6 +196,58 @@ void flatten(Variant::Map& base, const s
         base.erase(i);
     }
 }
+void write(pn_data_t* data, const Variant& value);
+
+void write(pn_data_t* data, const Variant::Map& map)
+{
+    pn_data_put_map(data);
+    pn_data_enter(data);
+    for (Variant::Map::const_iterator i = map.begin(); i != map.end(); ++i) {
+        pn_data_put_string(data, convert(i->first));
+        write(data, i->second);
+    }
+    pn_data_exit(data);
+}
+void write(pn_data_t* data, const Variant::List& list)
+{
+    pn_data_put_list(data);
+    pn_data_enter(data);
+    for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
+        write(data, *i);
+    }
+    pn_data_exit(data);
+}
+void write(pn_data_t* data, const Variant& value)
+{
+    switch (value.getType()) {
+      case qpid::types::VAR_VOID:
+        pn_data_put_null(data);
+        break;
+      case qpid::types::VAR_BOOL:
+        pn_data_put_bool(data, value.asBool());
+        break;
+      case qpid::types::VAR_UINT64:
+        pn_data_put_ulong(data, value.asUint64());
+        break;
+      case qpid::types::VAR_INT64:
+        pn_data_put_long(data, value.asInt64());
+        break;
+      case qpid::types::VAR_DOUBLE:
+        pn_data_put_double(data, value.asDouble());
+        break;
+      case qpid::types::VAR_STRING:
+        pn_data_put_string(data, convert(value.asString()));
+        break;
+      case qpid::types::VAR_MAP:
+        write(data, value.asMap());
+        break;
+      case qpid::types::VAR_LIST:
+        write(data, value.asList());
+        break;
+      default:
+        break;
+    }
+}
 }
 
 AddressHelper::AddressHelper(const Address& address) :
@@ -241,6 +306,67 @@ AddressHelper::AddressHelper(const Addre
     if (properties.size() && !(isTemporary || createPolicy.size())) {
         QPID_LOG(warning, "Properties will be ignored! " << address);
     }
+
+    qpid::types::Variant::Map::const_iterator selector = link.find(SELECTOR);
+    if (selector != link.end()) {
+        addFilter(SELECTOR, qpid::amqp::filters::SELECTOR_FILTER_CODE, selector->second);
+    }
+    if (!address.getSubject().empty()) {
+        addFilter(SUBJECT_FILTER, getFilterDescriptor(address.getSubject()), address.getSubject());
+    }
+    qpid::types::Variant::Map::const_iterator filter = link.find(FILTER);
+    if (filter != link.end()) {
+        if (filter->second.getType() == qpid::types::VAR_MAP) {
+            addFilter(filter->second.asMap());
+        } else if (filter->second.getType() == qpid::types::VAR_LIST) {
+            addFilters(filter->second.asList());
+        } else {
+            throw qpid::messaging::AddressError("Filter must be a map or a list of maps,
each containing name, descriptor and value.");
+        }
+    }
+}
+
+void AddressHelper::addFilters(const qpid::types::Variant::List& f)
+{
+    for (qpid::types::Variant::List::const_iterator i = f.begin(); i != f.end(); ++i) {
+        addFilter(i->asMap());
+    }
+}
+
+void AddressHelper::addFilter(const qpid::types::Variant::Map& f)
+{
+    qpid::types::Variant::Map::const_iterator name = f.find(NAME);
+    qpid::types::Variant::Map::const_iterator descriptor = f.find(DESCRIPTOR);
+    qpid::types::Variant::Map::const_iterator value = f.find(VALUE);
+    //all fields are required at present (may relax this at a later stage):
+    if (name == f.end()) {
+        throw qpid::messaging::AddressError("Filter entry must specify name");
+    }
+    if (descriptor == f.end()) {
+        throw qpid::messaging::AddressError("Filter entry must specify descriptor");
+    }
+    if (value == f.end()) {
+        throw qpid::messaging::AddressError("Filter entry must specify value");
+    }
+    try {
+        addFilter(name->second.asString(), descriptor->second.asUint64(), value->second);
+    } catch (const qpid::types::InvalidConversion&) {
+        addFilter(name->second.asString(), descriptor->second.asString(), value->second);
+    }
+
+}
+
+AddressHelper::Filter::Filter() : descriptorCode(0){}
+AddressHelper::Filter::Filter(const std::string& n, uint64_t d, const qpid::types::Variant&
v) : name(n), descriptorCode(d), value(v) {}
+AddressHelper::Filter::Filter(const std::string& n, const std::string& d, const qpid::types::Variant&
v) : name(n), descriptorSymbol(d), descriptorCode(0), value(v) {}
+
+void AddressHelper::addFilter(const std::string& name, uint64_t descriptor, const qpid::types::Variant&
value)
+{
+    filters.push_back(Filter(name, descriptor, value));
+}
+void AddressHelper::addFilter(const std::string& name, const std::string& descriptor,
const qpid::types::Variant& value)
+{
+    filters.push_back(Filter(name, descriptor, value));
 }
 
 void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode)
@@ -331,6 +457,26 @@ void AddressHelper::configure(pn_terminu
     if (mode == FOR_RECEIVER && browse) {
         //when PROTON-139 is resolved, set the required delivery-mode
     }
+    //set filter(s):
+    if (mode == FOR_RECEIVER && !filters.empty()) {
+        pn_data_t* filter = pn_terminus_filter(terminus);
+        pn_data_put_map(filter);
+        pn_data_enter(filter);
+        for (std::vector<Filter>::const_iterator i = filters.begin(); i != filters.end();
++i) {
+            pn_data_put_symbol(filter, convert(i->name));
+            pn_data_put_described(filter);
+            pn_data_enter(filter);
+            if (i->descriptorSymbol.size()) {
+                pn_data_put_symbol(filter, convert(i->descriptorSymbol));
+            } else {
+                pn_data_put_ulong(filter, i->descriptorCode);
+            }
+            write(filter, i->value);
+            pn_data_exit(filter);
+        }
+        pn_data_exit(filter);
+    }
+
 }
 
 void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create)
@@ -401,6 +547,7 @@ Verifier::Verifier()
     link[X_DECLARE] = true;
     link[X_BINDINGS] = true;
     link[SELECTOR] = true;
+    link[FILTER] = true;
     defined[LINK] = link;
 }
 void Verifier::verify(const Address& address) const

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h?rev=1492310&r1=1492309&r2=1492310&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h Wed Jun 12 17:56:12 2013
@@ -22,6 +22,7 @@
  *
  */
 #include "qpid/types/Variant.h"
+#include <vector>
 
 struct pn_terminus_t;
 
@@ -29,7 +30,6 @@ namespace qpid {
 namespace messaging {
 class Address;
 namespace amqp {
-
 class AddressHelper
 {
   public:
@@ -43,6 +43,18 @@ class AddressHelper
     const qpid::types::Variant::Map& getLinkProperties() const;
     static std::string getLinkName(const Address& address);
   private:
+    struct Filter
+    {
+        std::string name;
+        std::string descriptorSymbol;
+        uint64_t descriptorCode;
+        qpid::types::Variant value;
+
+        Filter();
+        Filter(const std::string& name, uint64_t descriptor, const qpid::types::Variant&
value);
+        Filter(const std::string& name, const std::string& descriptor, const qpid::types::Variant&
value);
+    };
+
     bool isTemporary;
     std::string createPolicy;
     std::string assertPolicy;
@@ -56,12 +68,17 @@ class AddressHelper
     bool durableNode;
     bool durableLink;
     bool browse;
+    std::vector<Filter> filters;
 
     bool enabled(const std::string& policy, CheckMode mode) const;
     bool createEnabled(CheckMode mode) const;
     bool assertEnabled(CheckMode mode) const;
     void setCapabilities(pn_terminus_t* terminus, bool create);
     void setNodeProperties(pn_terminus_t* terminus);
+    void addFilter(const qpid::types::Variant::Map&);
+    void addFilter(const std::string& name, uint64_t descriptor, const qpid::types::Variant&
value);
+    void addFilter(const std::string& name, const std::string& descriptor, const
qpid::types::Variant& value);
+    void addFilters(const qpid::types::Variant::List&);
 };
 }}} // namespace qpid::messaging::amqp
 

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp?rev=1492310&r1=1492309&r2=1492310&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp Wed Jun 12 17:56:12 2013
@@ -22,7 +22,6 @@
 #include "qpid/messaging/AddressImpl.h"
 #include "qpid/messaging/Duration.h"
 #include "qpid/messaging/Message.h"
-#include "qpid/amqp/descriptors.h"
 #include "qpid/log/Statement.h"
 extern "C" {
 #include <proton/engine.h>
@@ -90,24 +89,6 @@ const std::string& ReceiverContext::getS
 {
     return address.getName();
 }
-namespace {
-pn_bytes_t convert(const std::string& s)
-{
-    pn_bytes_t result;
-    result.start = const_cast<char*>(s.data());
-    result.size = s.size();
-    return result;
-}
-bool hasWildcards(const std::string& key)
-{
-    return key.find('*') != std::string::npos || key.find('#') != std::string::npos;
-}
-
-uint64_t getFilterDescriptor(const std::string& key)
-{
-    return hasWildcards(key) ? qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE : qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE;
-}
-}
 void ReceiverContext::verify(pn_terminus_t* source)
 {
     helper.checkAssertion(source, AddressHelper::FOR_RECEIVER);
@@ -119,34 +100,6 @@ void ReceiverContext::configure()
 void ReceiverContext::configure(pn_terminus_t* source)
 {
     helper.configure(source, AddressHelper::FOR_RECEIVER);
-
-    // Look specifically for qpid.selector link property and add a filter for it
-    qpid::types::Variant::Map::const_iterator i = helper.getLinkProperties().find("selector");
-    if (i!=helper.getLinkProperties().end()) {
-        pn_data_t* filter = pn_terminus_filter(source);
-        pn_data_put_map(filter);
-        pn_data_enter(filter);
-        pn_data_put_symbol(filter, convert("selector"));
-        pn_data_put_described(filter);
-        pn_data_enter(filter);
-        pn_data_put_ulong(filter, qpid::amqp::filters::SELECTOR_FILTER_CODE);
-        pn_data_put_string(filter, convert(i->second));
-        pn_data_exit(filter);
-        pn_data_exit(filter);
-    }
-    if (!address.getSubject().empty()) {
-        //filter:
-        pn_data_t* filter = pn_terminus_filter(source);
-        pn_data_put_map(filter);
-        pn_data_enter(filter);
-        pn_data_put_symbol(filter, convert("subject"));
-        pn_data_put_described(filter);
-        pn_data_enter(filter);
-        pn_data_put_ulong(filter, getFilterDescriptor(address.getSubject()));
-        pn_data_put_string(filter, convert(address.getSubject()));
-        pn_data_exit(filter);
-        pn_data_exit(filter);
-    }
 }
 
 Address ReceiverContext::getAddress() const

Modified: qpid/trunk/qpid/specs/apache-filters.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/apache-filters.xml?rev=1492310&r1=1492309&r2=1492310&view=diff
==============================================================================
--- qpid/trunk/qpid/specs/apache-filters.xml (original)
+++ qpid/trunk/qpid/specs/apache-filters.xml Wed Jun 12 17:56:12 2013
@@ -224,4 +224,22 @@ symbol            | String
         </doc>
 	</type>
   </section>
+  <section name="xquery" title="An xquery based filter">
+      <type class="restricted" name="xquery" source="string" provides="filter">
+          <descriptor name="apache.org:xquery-filter:string" code="0x0000468C:0x00000005"/>
+          <doc>
+            <p>
+            The xquery filter consists of a described string value
+            containing a valid xquery string against which messages
+            are matched.
+            </p>
+            <p>
+            Containers which support the filter defined in this
+            section should advertise the capability
+            <term>APACHE.ORG:XQUERY</term> in their connection
+            capabilities when sending the open performative.
+            </p>
+          </doc>
+     </type>
+ </section>
 </amqp>



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message