Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 21900100B7 for ; Tue, 13 Aug 2013 15:07:22 +0000 (UTC) Received: (qmail 73919 invoked by uid 500); 13 Aug 2013 15:07:21 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 73899 invoked by uid 500); 13 Aug 2013 15:07:21 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 73892 invoked by uid 99); 13 Aug 2013 15:07:21 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Aug 2013 15:07:21 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Aug 2013 15:07:16 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 4F49723889FA; Tue, 13 Aug 2013 15:06:55 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1513537 - in /qpid/trunk/qpid: cpp/bindings/qpid/python/ cpp/src/qpid/broker/amqp/ python/qpid/tests/messaging/ tests/src/py/qpid_tests/broker_1_0/ Date: Tue, 13 Aug 2013 15:06:55 -0000 To: commits@qpid.apache.org From: gsim@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130813150655.4F49723889FA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gsim Date: Tue Aug 13 15:06:54 2013 New Revision: 1513537 URL: http://svn.apache.org/r1513537 Log: QPID-4711: translate between structured content in AMQP 0-10 and 1.0 Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/translation.py Modified: qpid/trunk/qpid/cpp/bindings/qpid/python/python.i qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/general.py qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/legacy_exchanges.py qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py Modified: qpid/trunk/qpid/cpp/bindings/qpid/python/python.i URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qpid/python/python.i?rev=1513537&r1=1513536&r2=1513537&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/bindings/qpid/python/python.i (original) +++ qpid/trunk/qpid/cpp/bindings/qpid/python/python.i Tue Aug 13 15:06:54 2013 @@ -351,6 +351,9 @@ QPID_EXCEPTION(UnauthorizedAccess, Sessi self.setProperty(k, v) def _get_content(self) : + obj = self.getContentObject() + if obj: + return obj if self.content_type == "amqp/list" : return decodeList(self) if self.content_type == "amqp/map" : @@ -365,9 +368,7 @@ QPID_EXCEPTION(UnauthorizedAccess, Sessi elif isinstance(content, list) or isinstance(content, dict) : encode(content, self) else : - # Not a type we can handle. Try setting it anyway, - # although this will probably lead to a swig error - self.setContent(str(content)) + self.setContentObject(content) __swig_getmethods__["content"] = _get_content __swig_setmethods__["content"] = _set_content if _newclass: content = property(_get_content, _set_content) Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp?rev=1513537&r1=1513536&r2=1513537&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp Tue Aug 13 15:06:54 2013 @@ -27,6 +27,7 @@ #include "qpid/amqp/MessageEncoder.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/types/Variant.h" +#include "qpid/types/encodings.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" #include @@ -38,6 +39,8 @@ namespace { const std::string EMPTY; const std::string FORWARD_SLASH("/"); +const std::string TEXT_PLAIN("text/plain"); +const std::string SUBJECT_KEY("qpid.subject"); qpid::framing::ReplyTo translate(const std::string address, Broker* broker) { @@ -98,8 +101,25 @@ class Properties_0_10 : public qpid::amq std::string getUserId() const { return messageProperties ? messageProperties->getUserId() : EMPTY; } bool hasTo() const { return getDestination().size() || hasSubject(); } std::string getTo() const { return getDestination().size() ? getDestination() : getSubject(); } - bool hasSubject() const { return deliveryProperties && getDestination().size() && deliveryProperties->hasRoutingKey(); } - std::string getSubject() const { return deliveryProperties && getDestination().size() ? deliveryProperties->getRoutingKey() : EMPTY; } + bool hasSubject() const + { + if (getDestination().empty()) { + return getApplicationProperties().isSet(SUBJECT_KEY); + } else { + return deliveryProperties && deliveryProperties->hasRoutingKey(); + } + } + std::string getSubject() const + { + if (getDestination().empty()) { + //message was sent to default exchange, routing key is the queue name + return getApplicationProperties().getAsString(SUBJECT_KEY); + } else if (deliveryProperties) { + return deliveryProperties->getRoutingKey(); + } else { + return EMPTY; + } + } bool hasReplyTo() const { return messageProperties && messageProperties->hasReplyTo(); } std::string getReplyTo() const { return messageProperties ? translate(messageProperties->getReplyTo()) : EMPTY; } bool hasCorrelationId() const { return messageProperties && messageProperties->hasCorrelationId(); } @@ -119,7 +139,7 @@ class Properties_0_10 : public qpid::amq bool hasReplyToGroupId() const { return false; } std::string getReplyToGroupId() const { return EMPTY; } - const qpid::framing::FieldTable& getApplicationProperties() { return messageProperties->getApplicationHeaders(); } + const qpid::framing::FieldTable& getApplicationProperties() const { return messageProperties->getApplicationHeaders(); } Properties_0_10(const qpid::broker::amqp_0_10::MessageTransfer& t) : transfer(t), messageProperties(transfer.getProperties()), deliveryProperties(transfer.getProperties()) @@ -138,7 +158,6 @@ class Properties_0_10 : public qpid::amq Translation::Translation(const qpid::broker::Message& m, Broker* b) : original(m), broker(b) {} - boost::intrusive_ptr Translation::getTransfer() { boost::intrusive_ptr t = @@ -161,13 +180,38 @@ boost::intrusive_ptrgetFrames().append(method); transfer->getFrames().append(header); - qpid::amqp::CharSequence body = message->getBody(); - content.castBody()->getData().assign(body.data, body.size); - transfer->getFrames().append(content); - qpid::framing::MessageProperties* props = transfer->getFrames().getHeaders()->get(true); - props->setContentLength(body.size); + + if (message->isTypedBody()) { + qpid::types::Variant body = message->getTypedBody(); + std::string& data = content.castBody()->getData(); + if (body.getType() == qpid::types::VAR_MAP) { + qpid::amqp_0_10::MapCodec::encode(body.asMap(), data); + props->setContentType(qpid::amqp_0_10::MapCodec::contentType); + } else if (body.getType() == qpid::types::VAR_LIST) { + qpid::amqp_0_10::ListCodec::encode(body.asList(), data); + props->setContentType(qpid::amqp_0_10::ListCodec::contentType); + } else if (body.getType() == qpid::types::VAR_STRING) { + data = body.getString(); + if (body.getEncoding() == qpid::types::encodings::UTF8 || body.getEncoding() == qpid::types::encodings::ASCII) { + props->setContentType(TEXT_PLAIN); + } + } else { + qpid::types::Variant::List container; + container.push_back(body); + qpid::amqp_0_10::ListCodec::encode(container, data); + props->setContentType(qpid::amqp_0_10::ListCodec::contentType); + } + transfer->getFrames().append(content); + props->setContentLength(data.size()); + } else { + qpid::amqp::CharSequence body = message->getBody(); + content.castBody()->getData().assign(body.data, body.size); + transfer->getFrames().append(content); + + props->setContentLength(body.size); + } qpid::amqp::MessageId mid = message->getMessageId(); qpid::framing::Uuid uuid; @@ -215,7 +259,10 @@ boost::intrusive_ptrgetFrames().getHeaders()->get(true); dp->setPriority(message->getPriority()); if (message->isPersistent()) dp->setDeliveryMode(2); - if (message->getRoutingKey().size()) dp->setRoutingKey(message->getRoutingKey()); + if (message->getRoutingKey().size()) { + dp->setRoutingKey(message->getRoutingKey()); + props->getApplicationHeaders().setString(SUBJECT_KEY, message->getRoutingKey()); + } return transfer.get(); } else { @@ -246,14 +293,40 @@ void Translation::write(OutgoingFromQueu Properties_0_10 properties(*transfer); qpid::types::Variant::Map applicationProperties; qpid::amqp_0_10::translate(properties.getApplicationProperties(), applicationProperties); - std::string content = transfer->getContent(); - size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties, applicationProperties, content); - std::vector buffer(size); - qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size()); - encoder.writeProperties(properties); - encoder.writeApplicationProperties(applicationProperties); - if (content.size()) encoder.writeBinary(content, &qpid::amqp::message::DATA); - out.write(&buffer[0], encoder.getPosition()); + if (properties.getContentType() == qpid::amqp_0_10::MapCodec::contentType) { + qpid::types::Variant::Map content; + qpid::amqp_0_10::MapCodec::decode(transfer->getContent(), content); + size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties); + size += qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties, true) + 3;/*descriptor*/ + size += qpid::amqp::MessageEncoder::getEncodedSize(content, true) + 3/*descriptor*/; + std::vector buffer(size); + qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size()); + encoder.writeProperties(properties); + encoder.writeApplicationProperties(applicationProperties); + encoder.writeMap(content, &qpid::amqp::message::AMQP_VALUE); + out.write(&buffer[0], encoder.getPosition()); + } else if (properties.getContentType() == qpid::amqp_0_10::ListCodec::contentType) { + qpid::types::Variant::List content; + qpid::amqp_0_10::ListCodec::decode(transfer->getContent(), content); + size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties); + size += qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties, true) + 3;/*descriptor*/ + size += qpid::amqp::MessageEncoder::getEncodedSize(content, true) + 3/*descriptor*/; + std::vector buffer(size); + qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size()); + encoder.writeProperties(properties); + encoder.writeApplicationProperties(applicationProperties); + encoder.writeList(content, &qpid::amqp::message::AMQP_VALUE); + out.write(&buffer[0], encoder.getPosition()); + } else { + std::string content = transfer->getContent(); + size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties, applicationProperties, content); + std::vector buffer(size); + qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size()); + encoder.writeProperties(properties); + encoder.writeApplicationProperties(applicationProperties); + if (content.size()) encoder.writeBinary(content, &qpid::amqp::message::DATA); + out.write(&buffer[0], encoder.getPosition()); + } } else { QPID_LOG(error, "Could not write message data in AMQP 1.0 format"); } Modified: qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py?rev=1513537&r1=1513536&r2=1513537&view=diff ============================================================================== --- qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py (original) +++ qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py Tue Aug 13 15:06:54 2013 @@ -188,4 +188,17 @@ class Base(Test): return {"reconnect": self.reconnect(), "transport": self.transport()} +class VersionTest (Base): + def create_connection(self, version="amqp1.0", force=False): + opts = self.connection_options() + if force or not 'protocol' in opts: + opts['protocol'] = version; + return Connection.establish(self.broker, **opts) + + def setup_connection(self): + return self.create_connection() + + def setup_session(self): + return self.conn.session() + import address, endpoints, message Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py?rev=1513537&r1=1513536&r2=1513537&view=diff ============================================================================== --- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py (original) +++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py Tue Aug 13 15:06:54 2013 @@ -19,6 +19,7 @@ # under the License. # +from general import * from legacy_exchanges import * from selector import * -from general import * +from translation import * Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/general.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/general.py?rev=1513537&r1=1513536&r2=1513537&view=diff ============================================================================== --- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/general.py (original) +++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/general.py Tue Aug 13 15:06:54 2013 @@ -18,18 +18,12 @@ # from qpid.tests.messaging.implementation import * -from qpid.tests.messaging import Base +from qpid.tests.messaging import VersionTest -class GeneralTests (Base): +class GeneralTests (VersionTest): """ Miscellaneous tests for core AMQP 1.0 messaging behaviour. """ - def setup_connection(self): - return Connection.establish(self.broker, **self.connection_options()) - - def setup_session(self): - return self.conn.session() - def test_request_response(self): snd_request = self.ssn.sender("#") rcv_response = self.ssn.receiver("#") Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/legacy_exchanges.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/legacy_exchanges.py?rev=1513537&r1=1513536&r2=1513537&view=diff ============================================================================== --- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/legacy_exchanges.py (original) +++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/legacy_exchanges.py Tue Aug 13 15:06:54 2013 @@ -18,19 +18,13 @@ # from qpid.tests.messaging.implementation import * -from qpid.tests.messaging import Base +from qpid.tests.messaging import VersionTest -class LegacyExchangeTests (Base): +class LegacyExchangeTests (VersionTest): """ Tests for the legacy (i.e. pre 1.0) AMQP exchanges and the filters defined for them and registered for AMQP 1.0. """ - def setup_connection(self): - return Connection.establish(self.broker, **self.connection_options()) - - def setup_session(self): - return self.conn.session() - def test_fanout(self): msgs = [Message(content=s, subject = s) for s in ['a','b','c','d']] Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py?rev=1513537&r1=1513536&r2=1513537&view=diff ============================================================================== --- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py (original) +++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py Tue Aug 13 15:06:54 2013 @@ -18,19 +18,13 @@ # from qpid.tests.messaging.implementation import * -from qpid.tests.messaging import Base +from qpid.tests.messaging import VersionTest -class SelectorTests (Base): +class SelectorTests (VersionTest): """ Tests for the selector filter registered for AMQP 1.0 under the apache namespace. """ - def setup_connection(self): - return Connection.establish(self.broker, **self.connection_options()) - - def setup_session(self): - return self.conn.session() - def basic_selection_test(self, node): properties = [(1, 'red','dog'), (2, 'black', 'cat'), (3, 'red', 'squirrel'), (4, 'grey', 'squirrel')] msgs = [Message(content="%s.%s" % (colour, creature), properties={'sequence':sequence,'colour':colour}) for sequence, colour, creature in properties] Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/translation.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/translation.py?rev=1513537&view=auto ============================================================================== --- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/translation.py (added) +++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/translation.py Tue Aug 13 15:06:54 2013 @@ -0,0 +1,87 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.tests.messaging.implementation import * +from qpid.tests.messaging import VersionTest + +class TranslationTests (VersionTest): + """ + Testing translation of messages between 1.0 and 0-10 + """ + def send_receive_messages(self, msgs, send_version, receive_version, address): + rcon = self.create_connection(receive_version, True) + rcv = rcon.session().receiver(address) + + scon = self.create_connection(send_version, True) + snd = scon.session().sender(rcv.source) + + for m in msgs: snd.send(m) + + for expected in msgs: + msg = rcv.fetch() + assert msg.content == expected.content, (msg.content, expected.content) + assert msg.subject == expected.subject, (msg.subject, expected.subject) + self.ssn.acknowledge(msg) + scon.close() + rcon.close() + + def send_receive(self, send_version, receive_version, address): + self.send_receive_messages([Message(content=s, subject = s) for s in ['a','b','c','d']], send_version, receive_version, address) + + def send_receive_map(self, send_version, receive_version, address): + self.send_receive_messages([Message(content={'s':'abc','i':10})], send_version, receive_version, address) + + def send_receive_list(self, send_version, receive_version, address): + self.send_receive_messages([Message(content=['a', 1, 'c'])], send_version, receive_version, address) + + def test_translation_queue_1(self): + self.send_receive("amqp0-10", "amqp1.0", '#') + + def test_translation_queue_2(self): + self.send_receive("amqp1.0", "amqp0-10", '#') + + def test_translation_exchange_1(self): + self.send_receive("amqp0-10", "amqp1.0", 'amq.fanout') + + def test_translation_exchange_2(self): + self.send_receive("amqp1.0", "amqp0-10", 'amq.fanout') + + def test_send_receive_queue_1(self): + self.send_receive("amqp1.0", "amqp1.0", '#') + + def test_send_receive_queue_2(self): + self.send_receive("amqp0-10", "amqp0-10", '#') + + def test_send_receive_exchange_1(self): + self.send_receive("amqp1.0", "amqp1.0", 'amq.fanout') + + def test_send_receive_exchange_2(self): + self.send_receive("amqp0-10", "amqp0-10", 'amq.fanout') + + def test_translate_map_1(self): + self.send_receive_map("amqp0-10", "amqp1.0", '#') + + def test_translate_map_2(self): + self.send_receive_map("amqp1.0", "amqp0-10", '#') + + def test_translate_list_1(self): + self.send_receive_list("amqp0-10", "amqp1.0", '#') + + def test_translate_list_2(self): + self.send_receive_list("amqp1.0", "amqp0-10", '#') --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org