qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1513537 - in /qpid/trunk/qpid: cpp/bindings/qpid/python/ cpp/src/qpid/broker/amqp/ python/qpid/tests/messaging/ tests/src/py/qpid_tests/broker_1_0/
Date Tue, 13 Aug 2013 15:06:55 GMT
Author: gsim
Date: Tue Aug 13 15:06:54 2013
New Revision: 1513537

URL: http://svn.apache.org/r1513537
Log:
QPID-4711: translate between structured content in AMQP 0-10 and 1.0

Added:
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/translation.py
Modified:
    qpid/trunk/qpid/cpp/bindings/qpid/python/python.i
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
    qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/general.py
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/legacy_exchanges.py
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py

Modified: qpid/trunk/qpid/cpp/bindings/qpid/python/python.i
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/bindings/qpid/python/python.i?rev=1513537&r1=1513536&r2=1513537&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/bindings/qpid/python/python.i (original)
+++ qpid/trunk/qpid/cpp/bindings/qpid/python/python.i Tue Aug 13 15:06:54 2013
@@ -351,6 +351,9 @@ QPID_EXCEPTION(UnauthorizedAccess, Sessi
                      self.setProperty(k, v)
 
          def _get_content(self) :
+             obj = self.getContentObject()
+             if obj:
+                 return obj
              if self.content_type == "amqp/list" :
                  return decodeList(self)
              if self.content_type == "amqp/map" :
@@ -365,9 +368,7 @@ QPID_EXCEPTION(UnauthorizedAccess, Sessi
              elif isinstance(content, list) or isinstance(content, dict) :
                  encode(content, self)
              else :
-                 # Not a type we can handle.  Try setting it anyway,
-                 # although this will probably lead to a swig error
-                 self.setContent(str(content))
+                 self.setContentObject(content)
          __swig_getmethods__["content"] = _get_content
          __swig_setmethods__["content"] = _set_content
          if _newclass: content = property(_get_content, _set_content)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp?rev=1513537&r1=1513536&r2=1513537&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp Tue Aug 13 15:06:54 2013
@@ -27,6 +27,7 @@
 #include "qpid/amqp/MessageEncoder.h"
 #include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/types/Variant.h"
+#include "qpid/types/encodings.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/log/Statement.h"
 #include <boost/lexical_cast.hpp>
@@ -38,6 +39,8 @@ namespace {
 
 const std::string EMPTY;
 const std::string FORWARD_SLASH("/");
+const std::string TEXT_PLAIN("text/plain");
+const std::string SUBJECT_KEY("qpid.subject");
 
 qpid::framing::ReplyTo translate(const std::string address, Broker* broker)
 {
@@ -98,8 +101,25 @@ class Properties_0_10 : public qpid::amq
     std::string getUserId() const { return messageProperties ? messageProperties->getUserId()
: EMPTY; }
     bool hasTo() const { return getDestination().size() || hasSubject(); }
     std::string getTo() const { return  getDestination().size() ? getDestination() : getSubject();
}
-    bool hasSubject() const { return deliveryProperties && getDestination().size()
&& deliveryProperties->hasRoutingKey(); }
-    std::string getSubject() const { return deliveryProperties && getDestination().size()
? deliveryProperties->getRoutingKey() : EMPTY; }
+    bool hasSubject() const
+    {
+        if (getDestination().empty()) {
+            return getApplicationProperties().isSet(SUBJECT_KEY);
+        } else {
+            return deliveryProperties && deliveryProperties->hasRoutingKey();
+        }
+    }
+    std::string getSubject() const
+    {
+        if (getDestination().empty()) {
+            //message was sent to default exchange, routing key is the queue name
+            return getApplicationProperties().getAsString(SUBJECT_KEY);
+        } else if (deliveryProperties) {
+            return deliveryProperties->getRoutingKey();
+        } else {
+            return EMPTY;
+        }
+    }
     bool hasReplyTo() const { return messageProperties && messageProperties->hasReplyTo();
}
     std::string getReplyTo() const { return messageProperties ? translate(messageProperties->getReplyTo())
: EMPTY; }
     bool hasCorrelationId() const { return messageProperties && messageProperties->hasCorrelationId();
}
@@ -119,7 +139,7 @@ class Properties_0_10 : public qpid::amq
     bool hasReplyToGroupId() const { return false; }
     std::string getReplyToGroupId() const { return EMPTY; }
 
-    const qpid::framing::FieldTable& getApplicationProperties() { return messageProperties->getApplicationHeaders();
}
+    const qpid::framing::FieldTable& getApplicationProperties() const { return messageProperties->getApplicationHeaders();
}
     Properties_0_10(const qpid::broker::amqp_0_10::MessageTransfer& t) : transfer(t),
                                                                          messageProperties(transfer.getProperties<qpid::framing::MessageProperties>()),
                                                                          deliveryProperties(transfer.getProperties<qpid::framing::DeliveryProperties>())
@@ -138,7 +158,6 @@ class Properties_0_10 : public qpid::amq
 
 Translation::Translation(const qpid::broker::Message& m, Broker* b) : original(m), broker(b)
{}
 
-
 boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation::getTransfer()
 {
     boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> t =
@@ -161,13 +180,38 @@ boost::intrusive_ptr<const qpid::broker:
             transfer->getFrames().append(method);
             transfer->getFrames().append(header);
 
-            qpid::amqp::CharSequence body = message->getBody();
-            content.castBody<qpid::framing::AMQContentBody>()->getData().assign(body.data,
body.size);
-            transfer->getFrames().append(content);
-
             qpid::framing::MessageProperties* props =
                 transfer->getFrames().getHeaders()->get<qpid::framing::MessageProperties>(true);
-            props->setContentLength(body.size);
+
+            if (message->isTypedBody()) {
+                qpid::types::Variant body = message->getTypedBody();
+                std::string& data = content.castBody<qpid::framing::AMQContentBody>()->getData();
+                if (body.getType() == qpid::types::VAR_MAP) {
+                    qpid::amqp_0_10::MapCodec::encode(body.asMap(), data);
+                    props->setContentType(qpid::amqp_0_10::MapCodec::contentType);
+                } else if (body.getType() == qpid::types::VAR_LIST) {
+                    qpid::amqp_0_10::ListCodec::encode(body.asList(), data);
+                    props->setContentType(qpid::amqp_0_10::ListCodec::contentType);
+                } else if (body.getType() == qpid::types::VAR_STRING) {
+                    data = body.getString();
+                    if (body.getEncoding() == qpid::types::encodings::UTF8 || body.getEncoding()
== qpid::types::encodings::ASCII) {
+                        props->setContentType(TEXT_PLAIN);
+                    }
+                } else {
+                    qpid::types::Variant::List container;
+                    container.push_back(body);
+                    qpid::amqp_0_10::ListCodec::encode(container, data);
+                    props->setContentType(qpid::amqp_0_10::ListCodec::contentType);
+                }
+                transfer->getFrames().append(content);
+                props->setContentLength(data.size());
+            } else {
+                qpid::amqp::CharSequence body = message->getBody();
+                content.castBody<qpid::framing::AMQContentBody>()->getData().assign(body.data,
body.size);
+                transfer->getFrames().append(content);
+
+                props->setContentLength(body.size);
+            }
 
             qpid::amqp::MessageId mid = message->getMessageId();
             qpid::framing::Uuid uuid;
@@ -215,7 +259,10 @@ boost::intrusive_ptr<const qpid::broker:
                 transfer->getFrames().getHeaders()->get<qpid::framing::DeliveryProperties>(true);
             dp->setPriority(message->getPriority());
             if (message->isPersistent()) dp->setDeliveryMode(2);
-            if (message->getRoutingKey().size()) dp->setRoutingKey(message->getRoutingKey());
+            if (message->getRoutingKey().size()) {
+                dp->setRoutingKey(message->getRoutingKey());
+                props->getApplicationHeaders().setString(SUBJECT_KEY, message->getRoutingKey());
+            }
 
             return transfer.get();
         } else {
@@ -246,14 +293,40 @@ void Translation::write(OutgoingFromQueu
             Properties_0_10 properties(*transfer);
             qpid::types::Variant::Map applicationProperties;
             qpid::amqp_0_10::translate(properties.getApplicationProperties(), applicationProperties);
-            std::string content = transfer->getContent();
-            size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties, applicationProperties,
content);
-            std::vector<char> buffer(size);
-            qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
-            encoder.writeProperties(properties);
-            encoder.writeApplicationProperties(applicationProperties);
-            if (content.size()) encoder.writeBinary(content, &qpid::amqp::message::DATA);
-            out.write(&buffer[0], encoder.getPosition());
+            if (properties.getContentType() == qpid::amqp_0_10::MapCodec::contentType) {
+                qpid::types::Variant::Map content;
+                qpid::amqp_0_10::MapCodec::decode(transfer->getContent(), content);
+                size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties);
+                size += qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties,
true) + 3;/*descriptor*/
+                size += qpid::amqp::MessageEncoder::getEncodedSize(content, true) + 3/*descriptor*/;
+                std::vector<char> buffer(size);
+                qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
+                encoder.writeProperties(properties);
+                encoder.writeApplicationProperties(applicationProperties);
+                encoder.writeMap(content, &qpid::amqp::message::AMQP_VALUE);
+                out.write(&buffer[0], encoder.getPosition());
+            } else if (properties.getContentType() == qpid::amqp_0_10::ListCodec::contentType)
{
+                qpid::types::Variant::List content;
+                qpid::amqp_0_10::ListCodec::decode(transfer->getContent(), content);
+                size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties);
+                size += qpid::amqp::MessageEncoder::getEncodedSize(applicationProperties,
true) + 3;/*descriptor*/
+                size += qpid::amqp::MessageEncoder::getEncodedSize(content, true) + 3/*descriptor*/;
+                std::vector<char> buffer(size);
+                qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
+                encoder.writeProperties(properties);
+                encoder.writeApplicationProperties(applicationProperties);
+                encoder.writeList(content, &qpid::amqp::message::AMQP_VALUE);
+                out.write(&buffer[0], encoder.getPosition());
+            } else {
+                std::string content = transfer->getContent();
+                size_t size = qpid::amqp::MessageEncoder::getEncodedSize(properties, applicationProperties,
content);
+                std::vector<char> buffer(size);
+                qpid::amqp::MessageEncoder encoder(&buffer[0], buffer.size());
+                encoder.writeProperties(properties);
+                encoder.writeApplicationProperties(applicationProperties);
+                if (content.size()) encoder.writeBinary(content, &qpid::amqp::message::DATA);
+                out.write(&buffer[0], encoder.getPosition());
+            }
         } else {
             QPID_LOG(error, "Could not write message data in AMQP 1.0 format");
         }

Modified: qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py?rev=1513537&r1=1513536&r2=1513537&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/messaging/__init__.py Tue Aug 13 15:06:54 2013
@@ -188,4 +188,17 @@ class Base(Test):
       return {"reconnect": self.reconnect(),
               "transport": self.transport()}
 
+class VersionTest (Base):
+  def create_connection(self, version="amqp1.0", force=False):
+    opts = self.connection_options()
+    if force or not 'protocol' in opts:
+      opts['protocol'] = version;
+    return Connection.establish(self.broker, **opts)
+
+  def setup_connection(self):
+    return self.create_connection()
+
+  def setup_session(self):
+    return self.conn.session()
+
 import address, endpoints, message

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py?rev=1513537&r1=1513536&r2=1513537&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/__init__.py Tue Aug 13 15:06:54 2013
@@ -19,6 +19,7 @@
 # under the License.
 #
 
+from general import *
 from legacy_exchanges import *
 from selector import *
-from general import *
+from translation import *

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/general.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/general.py?rev=1513537&r1=1513536&r2=1513537&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/general.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/general.py Tue Aug 13 15:06:54 2013
@@ -18,18 +18,12 @@
 #
 
 from qpid.tests.messaging.implementation import *
-from qpid.tests.messaging import Base
+from qpid.tests.messaging import VersionTest
 
-class GeneralTests (Base):
+class GeneralTests (VersionTest):
     """
     Miscellaneous tests for core AMQP 1.0 messaging behaviour.
     """
-    def setup_connection(self):
-        return Connection.establish(self.broker, **self.connection_options())
-
-    def setup_session(self):
-        return self.conn.session()
-
     def test_request_response(self):
         snd_request = self.ssn.sender("#")
         rcv_response = self.ssn.receiver("#")

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/legacy_exchanges.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/legacy_exchanges.py?rev=1513537&r1=1513536&r2=1513537&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/legacy_exchanges.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/legacy_exchanges.py Tue Aug 13 15:06:54
2013
@@ -18,19 +18,13 @@
 #
 
 from qpid.tests.messaging.implementation import *
-from qpid.tests.messaging import Base
+from qpid.tests.messaging import VersionTest
 
-class LegacyExchangeTests (Base):
+class LegacyExchangeTests (VersionTest):
     """
     Tests for the legacy (i.e. pre 1.0) AMQP exchanges and the filters
     defined for them and registered for AMQP 1.0.
     """
-    def setup_connection(self):
-        return Connection.establish(self.broker, **self.connection_options())
-
-    def setup_session(self):
-        return self.conn.session()
-
     def test_fanout(self):
         msgs = [Message(content=s, subject = s) for s in ['a','b','c','d']]
 

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py?rev=1513537&r1=1513536&r2=1513537&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py Tue Aug 13 15:06:54 2013
@@ -18,19 +18,13 @@
 #
 
 from qpid.tests.messaging.implementation import *
-from qpid.tests.messaging import Base
+from qpid.tests.messaging import VersionTest
 
-class SelectorTests (Base):
+class SelectorTests (VersionTest):
     """
     Tests for the selector filter registered for AMQP 1.0 under the
     apache namespace.
     """
-    def setup_connection(self):
-        return Connection.establish(self.broker, **self.connection_options())
-
-    def setup_session(self):
-        return self.conn.session()
-
     def basic_selection_test(self, node):
         properties = [(1, 'red','dog'), (2, 'black', 'cat'), (3, 'red', 'squirrel'), (4,
'grey', 'squirrel')]
         msgs = [Message(content="%s.%s" % (colour, creature), properties={'sequence':sequence,'colour':colour})
for sequence, colour, creature in properties]

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/translation.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/translation.py?rev=1513537&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/translation.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_1_0/translation.py Tue Aug 13 15:06:54
2013
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.tests.messaging.implementation import *
+from qpid.tests.messaging import VersionTest
+
+class TranslationTests (VersionTest):
+    """
+    Testing translation of messages between 1.0 and 0-10
+    """
+    def send_receive_messages(self, msgs, send_version, receive_version, address):
+        rcon = self.create_connection(receive_version, True)
+        rcv = rcon.session().receiver(address)
+
+        scon = self.create_connection(send_version, True)
+        snd = scon.session().sender(rcv.source)
+
+        for m in msgs: snd.send(m)
+
+        for expected in msgs:
+            msg = rcv.fetch()
+            assert msg.content == expected.content, (msg.content, expected.content)
+            assert msg.subject == expected.subject, (msg.subject, expected.subject)
+            self.ssn.acknowledge(msg)
+        scon.close()
+        rcon.close()
+
+    def send_receive(self, send_version, receive_version, address):
+        self.send_receive_messages([Message(content=s, subject = s) for s in ['a','b','c','d']],
send_version, receive_version, address)
+
+    def send_receive_map(self, send_version, receive_version, address):
+        self.send_receive_messages([Message(content={'s':'abc','i':10})], send_version, receive_version,
address)
+
+    def send_receive_list(self, send_version, receive_version, address):
+        self.send_receive_messages([Message(content=['a', 1, 'c'])], send_version, receive_version,
address)
+
+    def test_translation_queue_1(self):
+        self.send_receive("amqp0-10", "amqp1.0", '#')
+
+    def test_translation_queue_2(self):
+        self.send_receive("amqp1.0", "amqp0-10", '#')
+
+    def test_translation_exchange_1(self):
+        self.send_receive("amqp0-10", "amqp1.0", 'amq.fanout')
+
+    def test_translation_exchange_2(self):
+        self.send_receive("amqp1.0", "amqp0-10", 'amq.fanout')
+
+    def test_send_receive_queue_1(self):
+        self.send_receive("amqp1.0", "amqp1.0", '#')
+
+    def test_send_receive_queue_2(self):
+        self.send_receive("amqp0-10", "amqp0-10", '#')
+
+    def test_send_receive_exchange_1(self):
+        self.send_receive("amqp1.0", "amqp1.0", 'amq.fanout')
+
+    def test_send_receive_exchange_2(self):
+        self.send_receive("amqp0-10", "amqp0-10", 'amq.fanout')
+
+    def test_translate_map_1(self):
+        self.send_receive_map("amqp0-10", "amqp1.0", '#')
+
+    def test_translate_map_2(self):
+        self.send_receive_map("amqp1.0", "amqp0-10", '#')
+
+    def test_translate_list_1(self):
+        self.send_receive_list("amqp0-10", "amqp1.0", '#')
+
+    def test_translate_list_2(self):
+        self.send_receive_list("amqp1.0", "amqp0-10", '#')



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message