Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2F82ED6F2 for ; Thu, 15 Nov 2012 14:42:50 +0000 (UTC) Received: (qmail 21577 invoked by uid 500); 15 Nov 2012 14:42:50 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 21551 invoked by uid 500); 15 Nov 2012 14:42:50 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 21540 invoked by uid 99); 15 Nov 2012 14:42:49 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Nov 2012 14:42:49 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,FILL_THIS_FORM X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Nov 2012 14:42:45 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 3B3D323889FD for ; Thu, 15 Nov 2012 14:42:24 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1409813 - in /qpid/trunk/qpid/cpp/src/qpid: broker/amqp/ messaging/amqp/ Date: Thu, 15 Nov 2012 14:42:23 -0000 To: commits@qpid.apache.org From: gsim@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121115144224.3B3D323889FD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gsim Date: Thu Nov 15 14:42:22 2012 New Revision: 1409813 URL: http://svn.apache.org/viewvc?rev=1409813&view=rev Log: QPID-4368: Add support for subject, translated to a filter (i.e. at present a binding key) by receivers and used as default value for senders Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp?rev=1409813&r1=1409812&r2=1409813&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Session.cpp Thu Nov 15 14:42:22 2012 @@ -22,6 +22,8 @@ #include "Outgoing.h" #include "Message.h" #include "ManagedConnection.h" +#include "qpid/amqp/CharSequence.h" +#include "qpid/amqp/Descriptor.h" #include "qpid/broker/AsyncCompletion.h" #include "qpid/broker/Broker.h" #include "qpid/broker/DeliverableMessage.h" @@ -104,8 +106,76 @@ void Session::attach(pn_link_t* link) QueueSettings settings(false, true); //TODO: populate settings from source details when available from engine queue = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first; - //TODO: bind based on filter when that is exposed by engine - if (exchange->getType() == FanOutExchange::typeName) { + pn_data_t* filter = pn_terminus_filter(source); + pn_data_next(filter); + if (filter && !pn_data_is_null(filter)) { + if (pn_data_type(filter) == PN_MAP) { + pn_data_t* echo = pn_terminus_filter(pn_link_source(link)); + pn_data_put_map(echo); + pn_data_enter(echo); + size_t count = pn_data_get_map(filter)/2; + QPID_LOG(debug, "Got filter map with " << count << " entries"); + pn_data_enter(filter); + for (size_t i = 0; i < count; i++) { + pn_bytes_t fname = pn_data_get_symbol(filter); + pn_data_next(filter); + bool isDescribed = pn_data_is_described(filter); + qpid::amqp::Descriptor descriptor(0); + if (isDescribed) { + pn_data_enter(filter); + pn_data_next(filter); + //TODO: use or at least verify descriptor + if (pn_data_type(filter) == PN_ULONG) { + descriptor = qpid::amqp::Descriptor(pn_data_get_ulong(filter)); + } else if (pn_data_type(filter) == PN_SYMBOL) { + pn_bytes_t d = pn_data_get_symbol(filter); + qpid::amqp::CharSequence c; + c.data = d.start; + c.size = d.size; + descriptor = qpid::amqp::Descriptor(c); + } else { + QPID_LOG(notice, "Ignoring filter with descriptor with key " << std::string(fname.start, fname.size) << " and type " << pn_data_type(filter)); + continue; + } + QPID_LOG(debug, "Got filter with descriptor " << descriptor); + pn_data_next(filter); + } else { + QPID_LOG(debug, "Got undescribed filter of type " << pn_data_type(filter)); + } + if (pn_data_type(filter) == PN_STRING) { + pn_bytes_t value = pn_data_get_string(filter); + pn_data_next(filter); + exchange->bind(queue, std::string(value.start, value.size), 0); + pn_data_put_symbol(echo, fname); + if (isDescribed) { + pn_data_put_described(echo); + pn_data_enter(echo); + pn_bytes_t symbol; + switch (descriptor.type) { + case qpid::amqp::Descriptor::NUMERIC: + pn_data_put_ulong(echo, descriptor.value.code); + break; + case qpid::amqp::Descriptor::SYMBOLIC: + symbol.start = const_cast(descriptor.value.symbol.data); + symbol.size = descriptor.value.symbol.size; + pn_data_put_symbol(echo, symbol); + break; + } + } + pn_data_put_string(echo, value); + if (isDescribed) pn_data_exit(echo); + + QPID_LOG(debug, "Binding using filter " << std::string(fname.start, fname.size) << ":" << std::string(value.start, value.size)); + } else { + //TODO: handle headers exchange filters + QPID_LOG(warning, "Ignoring unsupported filter type with key " << std::string(fname.start, fname.size) << " and type " << pn_data_type(filter)); + } + } + pn_data_exit(echo); + } else { + QPID_LOG(warning, "Filter should be map, got type: " << pn_data_type(filter)); + } + } else if (exchange->getType() == FanOutExchange::typeName) { exchange->bind(queue, std::string(), 0); } else if (exchange->getType() == TopicExchange::typeName) { exchange->bind(queue, "#", 0); Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1409813&r1=1409812&r2=1409813&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Thu Nov 15 14:42:22 2012 @@ -262,10 +262,9 @@ void ConnectionContext::attach(boost::sh void ConnectionContext::attach(boost::shared_ptr ssn, boost::shared_ptr lnk) { - pn_terminus_t* source = pn_link_source((pn_link_t*) lnk->receiver); - pn_terminus_set_address(source, lnk->getSource().c_str()); - attach(ssn->session, (pn_link_t*) lnk->receiver, lnk->capacity); - if (!pn_link_remote_source((pn_link_t*) lnk->receiver)) { + lnk->configure(); + attach(ssn->session, lnk->receiver, lnk->capacity); + if (!pn_link_remote_source(lnk->receiver)) { std::string msg("No such source : "); msg += lnk->getSource(); throw qpid::messaging::NotFound(msg); Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp?rev=1409813&r1=1409812&r2=1409813&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp Thu Nov 15 14:42:22 2012 @@ -29,10 +29,10 @@ namespace qpid { namespace messaging { namespace amqp { //TODO: proper conversion to wide string for address -ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, const std::string& s) +ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a) : name(n), - source(s), - receiver(pn_receiver(session, source.c_str())), + address(a), + receiver(pn_receiver(session, name.c_str())), capacity(0) {} ReceiverContext::~ReceiverContext() { @@ -84,7 +84,36 @@ const std::string& ReceiverContext::getN const std::string& ReceiverContext::getSource() const { - return source; + return address.getName(); +} +namespace { +pn_bytes_t convert(const std::string& s) +{ + pn_bytes_t result; + result.start = const_cast(s.data()); + result.size = s.size(); + return result; +} +} + +void ReceiverContext::configure() const +{ + configure(pn_link_source(receiver)); +} +void ReceiverContext::configure(pn_terminus_t* source) const +{ + pn_terminus_set_address(source, address.getName().c_str()); + + pn_data_t* filter = pn_terminus_filter(source); + pn_data_put_map(filter); + pn_data_enter(filter); + pn_data_put_symbol(filter, convert("subject")); + pn_data_put_described(filter); + pn_data_enter(filter); + pn_data_put_ulong(filter, 0x0000468C00000001/*LEGACY_TOPIC_FILTER*/); + pn_data_put_string(filter, convert(address.getSubject())); + pn_data_exit(filter); + pn_data_exit(filter); } bool ReceiverContext::isClosed() const Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h?rev=1409813&r1=1409812&r2=1409813&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h Thu Nov 15 14:42:22 2012 @@ -21,11 +21,13 @@ * under the License. * */ +#include "qpid/messaging/Address.h" #include #include "qpid/sys/IntegerTypes.h" struct pn_link_t; struct pn_session_t; +struct pn_terminus_t; namespace qpid { namespace messaging { @@ -41,22 +43,25 @@ namespace amqp { class ReceiverContext { public: - ReceiverContext(pn_session_t* session, const std::string& name, const std::string& source); + ReceiverContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& source); ~ReceiverContext(); void setCapacity(uint32_t); uint32_t getCapacity(); uint32_t getAvailable(); uint32_t getUnsettled(); + void attach(); void close(); const std::string& getName() const; const std::string& getSource() const; bool isClosed() const; + void configure() const; private: friend class ConnectionContext; const std::string name; - const std::string source; + const Address address; pn_link_t* receiver; uint32_t capacity; + void configure(pn_terminus_t*) const; }; }}} // namespace qpid::messaging::amqp Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1409813&r1=1409812&r2=1409813&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Thu Nov 15 14:42:22 2012 @@ -36,10 +36,10 @@ namespace qpid { namespace messaging { namespace amqp { //TODO: proper conversion to wide string for address -SenderContext::SenderContext(pn_session_t* session, const std::string& n, const std::string& t) +SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a) : name(n), - target(t), - sender(pn_sender(session, target.c_str())), capacity(1000) {} + address(a), + sender(pn_sender(session, n.c_str())), capacity(1000) {} SenderContext::~SenderContext() { @@ -74,7 +74,7 @@ const std::string& SenderContext::getNam const std::string& SenderContext::getTarget() const { - return target; + return address.getName(); } SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& message) @@ -82,7 +82,7 @@ SenderContext::Delivery* SenderContext:: if (processUnsettled() < capacity) { deliveries.push_back(Delivery(nextId++)); Delivery& delivery = deliveries.back(); - delivery.encode(MessageImplAccess::get(message)); + delivery.encode(MessageImplAccess::get(message), address); delivery.send(sender); return &delivery; } else { @@ -135,7 +135,7 @@ const std::string EMPTY; class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties { public: - PropertiesAdapter(const qpid::messaging::MessageImpl& impl) : msg(impl) {} + PropertiesAdapter(const qpid::messaging::MessageImpl& impl, const std::string& s) : msg(impl), subject(s) {} bool hasMessageId() const { return getMessageId().size(); @@ -167,12 +167,12 @@ class PropertiesAdapter : public qpid::a bool hasSubject() const { - return getSubject().size(); + return subject.size() || getSubject().size(); } std::string getSubject() const { - return msg.getSubject(); + return subject.size() ? subject : msg.getSubject(); } bool hasReplyTo() const @@ -266,16 +266,23 @@ class PropertiesAdapter : public qpid::a } private: const qpid::messaging::MessageImpl& msg; + const std::string subject; }; + +bool changedSubject(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address) +{ + return address.getSubject().size() && address.getSubject() != msg.getSubject(); +} + } SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0) {} -void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg) +void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address) { boost::shared_ptr original = msg.getEncoded(); - if (original) { //still have the content as received, send at least the bare message unaltered + if (original && !changedSubject(msg, address)) { //still have the content as received, send at least the bare message unaltered //do we need to alter the header? are durable, priority, ttl, first-acquirer, delivery-count different from what was received? if (original->hasHeaderChanged(msg)) { //since as yet have no annotations, just write the revised header then the rest of the message as received @@ -293,7 +300,7 @@ void SenderContext::Delivery::encode(con } } else { HeaderAdapter header(msg); - PropertiesAdapter properties(msg); + PropertiesAdapter properties(msg, address.getSubject()); //compute size: encoded.resize(qpid::amqp::MessageEncoder::getEncodedSize(header, properties, msg.getHeaders(), msg.getBytes())); QPID_LOG(debug, "Sending message, buffer is " << encoded.getSize() << " bytes") Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h?rev=1409813&r1=1409812&r2=1409813&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h Thu Nov 15 14:42:22 2012 @@ -25,6 +25,7 @@ #include #include #include "qpid/sys/IntegerTypes.h" +#include "qpid/messaging/Address.h" #include "qpid/messaging/amqp/EncodedMessage.h" struct pn_delivery_t; @@ -48,7 +49,7 @@ class SenderContext { public: Delivery(int32_t id); - void encode(const qpid::messaging::MessageImpl& message); + void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&); void send(pn_link_t*); bool accepted(); private: @@ -57,7 +58,7 @@ class SenderContext EncodedMessage encoded; }; - SenderContext(pn_session_t* session, const std::string& name, const std::string& target); + SenderContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& target); ~SenderContext(); void close(); void setCapacity(uint32_t); @@ -71,7 +72,7 @@ class SenderContext typedef std::deque Deliveries; const std::string name; - const std::string target; + const qpid::messaging::Address address; pn_link_t* sender; int32_t nextId; Deliveries deliveries; Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp?rev=1409813&r1=1409812&r2=1409813&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp Thu Nov 15 14:42:22 2012 @@ -49,7 +49,7 @@ boost::shared_ptr Session for (SenderMap::const_iterator i = senders.find(name); i != senders.end(); i = senders.find(name)) { name = (boost::format("%1%_%2%") % address.getName() % ++count).str(); } - boost::shared_ptr s(new SenderContext(session, name, address.str())); + boost::shared_ptr s(new SenderContext(session, name, address)); senders[name] = s; return s; } @@ -62,7 +62,7 @@ boost::shared_ptr Sessi for (ReceiverMap::const_iterator i = receivers.find(name); i != receivers.end(); i = receivers.find(name)) { name = (boost::format("%1%_%2%") % address.getName() % ++count).str(); } - boost::shared_ptr r(new ReceiverContext(session, name, address.str())); + boost::shared_ptr r(new ReceiverContext(session, name, address)); receivers[name] = r; return r; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org