Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D06CF18EDC for ; Fri, 28 Aug 2015 22:16:27 +0000 (UTC) Received: (qmail 54565 invoked by uid 500); 28 Aug 2015 22:16:27 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 54536 invoked by uid 500); 28 Aug 2015 22:16:27 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 54527 invoked by uid 99); 28 Aug 2015 22:16:27 -0000 Received: from eris.apache.org (HELO hades.apache.org) (140.211.11.105) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Aug 2015 22:16:27 +0000 Received: from hades.apache.org (localhost [127.0.0.1]) by hades.apache.org (ASF Mail Server at hades.apache.org) with ESMTP id 875BFAC0113 for ; Fri, 28 Aug 2015 22:16:27 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1698426 - in /qpid/trunk/qpid: cpp/src/qpid/broker/ cpp/src/qpid/broker/amqp/ cpp/src/qpid/broker/amqp_0_10/ tests/src/py/qpid_tests/broker_1_0/ Date: Fri, 28 Aug 2015 22:16:27 -0000 To: commits@qpid.apache.org From: gsim@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20150828221627.875BFAC0113@hades.apache.org> Author: gsim Date: Fri Aug 28 22:16:27 2015 New Revision: 1698426 URL: http://svn.apache.org/r1698426 Log: QPID-6714: support for JMS header names in selectors, plus support for to, replyto and subject Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp qpid/trunk/qpid/cpp/src/qpid/broker/Message.h qpid/trunk/qpid/cpp/src/qpid/broker/Selector.cpp qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=1698426&r1=1698425&r2=1698426&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Fri Aug 28 22:16:27 2015 @@ -64,6 +64,19 @@ std::string Message::getRoutingKey() con return getEncoding().getRoutingKey(); } +std::string Message::getTo() const +{ + return getEncoding().getTo(); +} +std::string Message::getSubject() const +{ + return getEncoding().getSubject(); +} +std::string Message::getReplyTo() const +{ + return getEncoding().getReplyTo(); +} + bool Message::isPersistent() const { return getEncoding().isPersistent(); Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Message.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=1698426&r1=1698425&r2=1698426&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Aug 28 22:16:27 2015 @@ -78,6 +78,9 @@ public: virtual void processProperties(qpid::amqp::MapHandler&) const = 0; virtual std::string getUserId() const = 0; virtual uint64_t getTimestamp() const = 0; + virtual std::string getTo() const = 0; + virtual std::string getSubject() const = 0; + virtual std::string getReplyTo() const = 0; }; class SharedState : public Encoding @@ -137,6 +140,11 @@ public: QPID_BROKER_EXTERN uint64_t getTimestamp() const; + //required for selectors: + QPID_BROKER_EXTERN std::string getTo() const; + QPID_BROKER_EXTERN std::string getSubject() const; + QPID_BROKER_EXTERN std::string getReplyTo() const; + QPID_BROKER_EXTERN void addAnnotation(const std::string& key, const qpid::types::Variant& value); QPID_BROKER_EXTERN bool isExcluded(const std::vector& excludes) const; QPID_BROKER_EXTERN void addTraceId(const std::string& id); Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Selector.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Selector.cpp?rev=1698426&r1=1698425&r2=1698426&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Selector.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Selector.cpp Fri Aug 28 22:16:27 2015 @@ -30,6 +30,7 @@ #include "qpid/log/Statement.h" #include "qpid/types/Variant.h" +#include #include #include #include @@ -54,6 +55,7 @@ using qpid::amqp::MessageId; * priority | Priority | priority header section * delivery_count | | delivery-count header section * redelivered |[Redelivered] | (delivery_count>0) (computed value) + * subject | Type | subject properties section * correlation_id | CorrelationID| correlation-id properties section * to |[Destination] | to properties section * absolute_expiry_time |[Expiration] | absolute-expiry-time properties section @@ -66,6 +68,26 @@ const string EMPTY; const string PERSISTENT("PERSISTENT"); const string NON_PERSISTENT("NON_PERSISTENT"); +namespace { + typedef std::map Aliases; + Aliases define_aliases() + { + Aliases aliases; + aliases["JMSType"] = "subject"; + aliases["JMSCorrelationID"] = "correlation_id"; + aliases["JMSMessageID"] = "message_id"; + aliases["JMSDeliveryMode"] = "delivery_mode"; + aliases["JMSRedelivered"] = "redelivered"; + aliases["JMSPriority"] = "priority"; + aliases["JMSDestination"] = "to"; + aliases["JMSReplyTo"] = "reply_to"; + aliases["JMSTimestamp"] = "creation_time"; + aliases["JMSExpiration"] = "absolute_expiry_time"; + return aliases; + } + const Aliases aliases = define_aliases(); +} + class MessageSelectorEnv : public SelectorEnv { const Message& msg; mutable boost::ptr_vector returnedStrings; @@ -82,8 +104,7 @@ public: MessageSelectorEnv::MessageSelectorEnv(const Message& m) : msg(m), valuesLookedup(false) -{ -} +{} const Value MessageSelectorEnv::specialValue(const string& id) const { @@ -91,6 +112,12 @@ const Value MessageSelectorEnv::specialV // TODO: Just use a simple if chain for now - improve this later if ( id=="delivery_mode" ) { v = msg.getEncoding().isPersistent() ? PERSISTENT : NON_PERSISTENT; + } else if ( id=="subject" ) { + std::string s = msg.getSubject(); + if (!s.empty()) { + returnedStrings.push_back(new string(s)); + v = returnedStrings[returnedStrings.size()-1]; + } } else if ( id=="redelivered" ) { // Although redelivered is defined to be true delivery-count>0 if it is 0 now // it will be 1 by the time the message is delivered @@ -110,9 +137,17 @@ const Value MessageSelectorEnv::specialV v = returnedStrings[returnedStrings.size()-1]; } } else if ( id=="to" ) { - v = EMPTY; // Hard to get this correct for both 1.0 and 0-10 + std::string s = msg.getTo(); + if (!s.empty()) { + returnedStrings.push_back(new string(s)); + v = returnedStrings[returnedStrings.size()-1]; + } } else if ( id=="reply_to" ) { - v = EMPTY; // Hard to get this correct for both 1.0 and 0-10 + std::string s = msg.getReplyTo(); + if (!s.empty()) { + returnedStrings.push_back(new string(s)); + v = returnedStrings[returnedStrings.size()-1]; + } } else if ( id=="absolute_expiry_time" ) { qpid::sys::AbsTime expiry = msg.getExpiration(); // Java property has value of 0 for no expiry @@ -183,6 +218,14 @@ const Value& MessageSelectorEnv::value(c QPID_LOG(debug, "Selector lookup special identifier: " << identifier); returnedValues[identifier] = specialValue(identifier.substr(5)); } + } else if (identifier.substr(0, 3) == "JMS") { + Aliases::const_iterator equivalent = aliases.find(identifier); + if (equivalent != aliases.end()) { + QPID_LOG(debug, "Selector lookup JMS identifier: " << identifier << " treated as alias for " << equivalent->second); + returnedValues[identifier] = specialValue(equivalent->second); + } else { + QPID_LOG(info, "Unrecognised JMS identifier in selector: " << identifier); + } } else if (!valuesLookedup) { QPID_LOG(debug, "Selector lookup triggered by: " << identifier); // Iterate over all the message properties Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp?rev=1698426&r1=1698425&r2=1698426&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.cpp Fri Aug 28 22:16:27 2015 @@ -62,9 +62,8 @@ std::string Message::getUserId() const uint64_t Message::getTimestamp() const { - //AMQP 1.0 message doesn't have the equivalent of the 0-10 timestamp field - //TODO: define an annotation for that - return 0; + //creation time is in milliseconds, timestamp (from the 0-10 spec) is in seconds + return !creationTime ? 0 : creationTime.get()/1000; } bool Message::isPersistent() const @@ -87,6 +86,25 @@ uint8_t Message::getPriority() const else return priority.get(); } +std::string Message::getTo() const +{ + std::string v; + if (to.data) v.assign(to.data, to.size); + return v; +} +std::string Message::getSubject() const +{ + std::string v; + if (subject.data) v.assign(subject.data, subject.size); + return v; +} +std::string Message::getReplyTo() const +{ + std::string v; + if (replyTo.data) v.assign(replyTo.data, replyTo.size); + return v; +} + namespace { class StringRetriever : public MapHandler { @@ -242,7 +260,7 @@ qpid::amqp::MessageId Message::getMessag { return messageId; } -qpid::amqp::CharSequence Message::getReplyTo() const +qpid::amqp::CharSequence Message::getReplyToAsCharSequence() const { return replyTo; } @@ -318,7 +336,7 @@ void Message::onCorrelationId(const qpid void Message::onContentType(const qpid::amqp::CharSequence& v) { contentType = v; } void Message::onContentEncoding(const qpid::amqp::CharSequence& v) { contentEncoding = v; } void Message::onAbsoluteExpiryTime(int64_t) {} -void Message::onCreationTime(int64_t) {} +void Message::onCreationTime(int64_t v) { creationTime = v; } void Message::onGroupId(const qpid::amqp::CharSequence&) {} void Message::onGroupSequence(uint32_t) {} void Message::onReplyToGroupId(const qpid::amqp::CharSequence&) {} Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h?rev=1698426&r1=1698425&r2=1698426&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Message.h Fri Aug 28 22:16:27 2015 @@ -53,10 +53,13 @@ class Message : public qpid::broker::Mes void processProperties(qpid::amqp::MapHandler&) const; std::string getUserId() const; uint64_t getTimestamp() const; + std::string getTo() const; + std::string getSubject() const; + std::string getReplyTo() const; qpid::amqp::MessageId getMessageId() const; qpid::amqp::MessageId getCorrelationId() const; - qpid::amqp::CharSequence getReplyTo() const; + qpid::amqp::CharSequence getReplyToAsCharSequence() const; qpid::amqp::CharSequence getContentType() const; qpid::amqp::CharSequence getContentEncoding() const; @@ -108,6 +111,7 @@ class Message : public qpid::broker::Mes qpid::amqp::MessageId correlationId; qpid::amqp::CharSequence contentType; qpid::amqp::CharSequence contentEncoding; + boost::optional creationTime; //application-properties: qpid::amqp::CharSequence applicationProperties; Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp?rev=1698426&r1=1698425&r2=1698426&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp Fri Aug 28 22:16:27 2015 @@ -245,7 +245,7 @@ boost::intrusive_ptrsetCorrelationId(boost::lexical_cast(cid.value.ulong)); break; } - if (message->getReplyTo()) props->setReplyTo(translate(message->getReplyTo(), broker)); + if (message->getReplyToAsCharSequence()) props->setReplyTo(translate(message->getReplyTo(), broker)); if (message->getContentType()) props->setContentType(translate(message->getContentType())); if (message->getContentEncoding()) props->setContentEncoding(translate(message->getContentEncoding())); props->setUserId(message->getUserId()); Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp?rev=1698426&r1=1698425&r2=1698426&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp Fri Aug 28 22:16:27 2015 @@ -41,8 +41,11 @@ namespace qpid { namespace broker { namespace amqp_0_10 { namespace { +const std::string DELIMITER("/"); +const std::string EMPTY; const std::string QMF2("qmf2"); const std::string PARTIAL("partial"); +const std::string SUBJECT_KEY("qpid.subject"); } MessageTransfer::MessageTransfer() : frames(framing::SequenceNumber()), requiredCredit(0), cachedRequiredCredit(false) {} MessageTransfer::MessageTransfer(const framing::SequenceNumber& id) : frames(id), requiredCredit(0), cachedRequiredCredit(false) {} @@ -143,6 +146,41 @@ uint64_t MessageTransfer::getTimestamp() return props ? props->getTimestamp() : 0; } +std::string MessageTransfer::getTo() const +{ + const DeliveryProperties* props = getProperties(); + if (props) { + //if message was sent to 'nameless exchange' then the routing key is the queue + return props->getExchange().empty() ? props->getRoutingKey() : props->getExchange(); + } else { + return EMPTY; + } +} +std::string MessageTransfer::getSubject() const +{ + const DeliveryProperties* props = getProperties(); + if (props) { + //if message was sent to 'nameless exchange' then the routing key is the queue name, not the subject + return props->getExchange().empty() ? getPropertyAsString(SUBJECT_KEY) : props->getRoutingKey(); + } else { + return EMPTY; + } +} +std::string MessageTransfer::getReplyTo() const +{ + const MessageProperties* props = getProperties(); + if (props && props->hasReplyTo()) { + const qpid::framing::ReplyTo& replyto = props->getReplyTo(); + if (replyto.hasExchange() && replyto.hasRoutingKey()) + return replyto.getExchange() + DELIMITER + replyto.getRoutingKey(); + else if (replyto.hasExchange()) return replyto.getExchange(); + else if (replyto.hasRoutingKey()) return replyto.getRoutingKey(); + else return EMPTY; + } else { + return EMPTY; + } +} + bool MessageTransfer::requiresAccept() const { const framing::MessageTransferBody* b = getFrames().as(); Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h?rev=1698426&r1=1698425&r2=1698426&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h Fri Aug 28 22:16:27 2015 @@ -57,6 +57,10 @@ class MessageTransfer : public qpid::bro std::string getUserId() const; void setTimestamp(); uint64_t getTimestamp() const; + std::string getTo() const; + std::string getSubject() const; + std::string getReplyTo() const; + bool requiresAccept() const; const qpid::framing::SequenceNumber& getCommandId() const; Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py?rev=1698426&r1=1698425&r2=1698426&view=diff ============================================================================== --- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py (original) +++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py Fri Aug 28 22:16:27 2015 @@ -71,3 +71,25 @@ class SelectorTests (VersionTest): msg = rcv_4.fetch(0) assert msg.content == 'd' self.ssn.acknowledge(msg) + + def check_selected(self,node, selector, expected_content): + rcv = self.ssn.receiver("%s; {mode:browse, link:{selector:\"%s\"}}" % (node, selector)) + msg = rcv.fetch(0) + assert msg.content == expected_content, msg + rcv.close() + + def test_jms_header_names(self): + """ + The new AMQP 1.0 based JMS client uses these rather than the special names above + """ + msgs = [Message(content=i, id=i, correlation_id=i, subject=i, priority=p+1, reply_to=i, properties={'x-amqp-to':i}) for p, i in enumerate(['a', 'b', 'c', 'd'])] + + snd = self.ssn.sender("#") + for m in msgs: snd.send(m) + + self.check_selected(snd.target, "JMSMessageID = 'a'", 'a') + self.check_selected(snd.target, "JMSCorrelationID = 'b'", 'b') + self.check_selected(snd.target, "JMSPriority = 3", 'c') + self.check_selected(snd.target, "JMSDestination = 'a'", 'a') + self.check_selected(snd.target, "JMSReplyTo = 'b'", 'b') + self.check_selected(snd.target, "JMSType = 'c'", 'c') --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org