qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r1486989 - in /qpid/trunk/qpid/cpp/src: qpid/broker/amqp/ProtocolPlugin.cpp qpid/broker/amqp/Translation.cpp qpid/broker/amqp/Translation.h qpid/messaging/amqp/SenderContext.cpp tests/interlink_tests.py tests/qpid-receive.cpp
Date Tue, 28 May 2013 16:26:54 GMT
Author: gsim
Date: Tue May 28 16:26:54 2013
New Revision: 1486989

URL: http://svn.apache.org/r1486989
Log:
QPID-4713: fix handling of reply to when converting from 1.0 to 0-10 format

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.h
    qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
    qpid/trunk/qpid/cpp/src/tests/interlink_tests.py
    qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp?rev=1486989&r1=1486988&r2=1486989&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp Tue May 28 16:26:54 2013
@@ -114,7 +114,7 @@ qpid::sys::ConnectionCodec* ProtocolImpl
 
 boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> ProtocolImpl::translate(const
qpid::broker::Message& m)
 {
-    qpid::broker::amqp::Translation t(m);
+    qpid::broker::amqp::Translation t(m, &broker);
     return t.getTransfer();
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp?rev=1486989&r1=1486988&r2=1486989&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.cpp Tue May 28 16:26:54 2013
@@ -21,6 +21,7 @@
 #include "qpid/broker/amqp/Translation.h"
 #include "qpid/broker/amqp/Outgoing.h"
 #include "qpid/broker/amqp_0_10/MessageTransfer.h"
+#include "qpid/broker/Broker.h"
 #include "qpid/amqp/Decoder.h"
 #include "qpid/amqp/descriptors.h"
 #include "qpid/amqp/MessageEncoder.h"
@@ -38,10 +39,30 @@ namespace {
 const std::string EMPTY;
 const std::string FORWARD_SLASH("/");
 
+qpid::framing::ReplyTo translate(const std::string address, Broker* broker)
+{
+    size_t i = address.find(FORWARD_SLASH);
+    if (i == std::string::npos) {
+        //is it a queue or an exchange?
+        if (broker && broker->getQueues().find(address)) {
+            return qpid::framing::ReplyTo(EMPTY, address);
+        } else if (broker && broker->getExchanges().find(address)) {
+            return qpid::framing::ReplyTo(address, EMPTY);
+        } else {
+            return qpid::framing::ReplyTo();
+        }
+    } else {
+        return qpid::framing::ReplyTo(i > 0 ? address.substr(0, i) : EMPTY, (i+1) <
address.size() ? address.substr(i+1) : EMPTY);
+    }
+}
+qpid::framing::ReplyTo translate(const qpid::amqp::CharSequence& address, Broker* broker)
+{
+    return translate(std::string(address.data, address.size), broker);
+}
 std::string translate(const qpid::framing::ReplyTo r)
 {
-    if (r.hasExchange()) {
-        if (r.hasRoutingKey()) return r.getExchange() + FORWARD_SLASH + r.getRoutingKey();
+    if (r.getExchange().size()) {
+        if (r.getRoutingKey().size()) return r.getExchange() + FORWARD_SLASH + r.getRoutingKey();
         else return r.getExchange();
     } else return r.getRoutingKey();
 }
@@ -115,7 +136,7 @@ class Properties_0_10 : public qpid::amq
 };
 }
 
-Translation::Translation(const qpid::broker::Message& m) : original(m) {}
+Translation::Translation(const qpid::broker::Message& m, Broker* b) : original(m), broker(b)
{}
 
 
 boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation::getTransfer()
@@ -175,10 +196,7 @@ boost::intrusive_ptr<const qpid::broker:
                 props->setCorrelationId(boost::lexical_cast<std::string>(cid.value.ulong));
                 break;
             }
-            // TODO: ReplyTo - there is no way to reliably determine
-            // the type of the node from just its name, unless we
-            // query the brokers registries
-
+            if (message->getReplyTo()) props->setReplyTo(translate(message->getReplyTo(),
broker));
             if (message->getContentType()) props->setContentType(translate(message->getContentType()));
             if (message->getContentEncoding()) props->setContentEncoding(translate(message->getContentEncoding()));
             props->setUserId(message->getUserId());

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.h?rev=1486989&r1=1486988&r2=1486989&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp/Translation.h Tue May 28 16:26:54 2013
@@ -25,6 +25,7 @@
 
 namespace qpid {
 namespace broker {
+class Broker;
 class Message;
 namespace amqp_0_10 {
 class MessageTransfer;
@@ -38,7 +39,7 @@ class OutgoingFromQueue;
 class Translation
 {
   public:
-    Translation(const qpid::broker::Message& message);
+    Translation(const qpid::broker::Message& message, Broker* broker = 0);
 
     /**
      * @returns a pointer to an AMQP 0-10 message transfer suitable
@@ -52,6 +53,7 @@ class Translation
     void write(OutgoingFromQueue&);
   private:
     const qpid::broker::Message& original;
+    Broker* broker;
 };
 }}} // namespace qpid::broker::amqp
 

Modified: qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp?rev=1486989&r1=1486988&r2=1486989&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp Tue May 28 16:26:54 2013
@@ -134,6 +134,7 @@ class HeaderAdapter : public qpid::amqp:
     const qpid::messaging::MessageImpl& msg;
 };
 const std::string EMPTY;
+const std::string FORWARD_SLASH("/");
 
 class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties
 {
@@ -185,7 +186,12 @@ class PropertiesAdapter : public qpid::a
 
     std::string getReplyTo() const
     {
-        return msg.getReplyTo().str();
+        Address a = msg.getReplyTo();
+        if (a.getSubject().size()) {
+            return a.getName() + FORWARD_SLASH + a.getSubject();
+        } else {
+            return a.getName();
+        }
     }
 
     bool hasCorrelationId() const

Modified: qpid/trunk/qpid/cpp/src/tests/interlink_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/interlink_tests.py?rev=1486989&r1=1486988&r2=1486989&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/interlink_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/interlink_tests.py Tue May 28 16:26:54 2013
@@ -50,13 +50,15 @@ class AmqpBrokerTest(BrokerTest):
         self.default_config = Config(self.broker)
         self.agent = BrokerAgent(self.broker.connect())
 
-    def sender(self, config):
+    def sender(self, config, reply_to=None):
         cmd = ["qpid-send",
                "--broker", config.url,
                "--address", config.address,
                "--connection-options", "{protocol:%s}" % config.version,
                "--content-stdin", "--send-eos=1"
                ]
+        if reply_to:
+            cmd.append( "--reply-to=%s" % reply_to)
         return self.popen(cmd, stdin=PIPE)
 
     def receiver(self, config):
@@ -68,13 +70,31 @@ class AmqpBrokerTest(BrokerTest):
                ]
         return self.popen(cmd, stdout=PIPE)
 
-    def send_and_receive(self, send_config=None, recv_config=None, count=1000, debug=False):
+    def ready_receiver(self, config):
+        s = self.broker.connect().session()
+        r = s.receiver("readyq; {create:always}")
+        cmd = ["qpid-receive",
+               "--broker", config.url,
+               "--address", config.address,
+               "--connection-options", "{protocol:%r}" % config.version,
+               "--timeout=10", "--ready-address=readyq;{create:always}"
+               ]
+        result = self.popen(cmd, stdout=PIPE)
+        r.fetch(timeout=1) # wait until receiver is actually ready
+        s.acknowledge()
+        s.close()
+        return result
+
+    def send_and_receive(self, send_config=None, recv_config=None, count=1000, reply_to=None,
wait_for_receiver=False, debug=False):
         if debug:
             print "sender config is %s" % (send_config or self.default_config)
             print "receiver config is %s" % (recv_config or self.default_config)
-        sender = self.sender(send_config or self.default_config)
+        sender = self.sender(send_config or self.default_config, reply_to)
         sender._set_cloexec_flag(sender.stdin) #required for older python, see http://bugs.python.org/issue4112
-        receiver = self.receiver(recv_config or self.default_config)
+        if wait_for_receiver:
+            receiver = self.ready_receiver(recv_config or self.default_config)
+        else:
+            receiver = self.receiver(recv_config or self.default_config)
 
         messages = ["message-%s" % (i+1) for i in range(count)]
         for m in messages:
@@ -137,6 +157,60 @@ class AmqpBrokerTest(BrokerTest):
     def test_translate_empty_2(self):
         self.send_and_receive_empty(send_config=Config(self.broker, version="amqp0-10"))
 
+    def request_response(self, reply_to, send_config=None, request_config=None, response_config=None,
count=1000, wait_for_receiver=False):
+        rconfig = request_config or self.default_config
+        echo_cmd = ["qpid-receive",
+               "--broker", rconfig.url,
+               "--address=%s" % rconfig.address,
+               "--connection-options={protocol:%s}" % rconfig.version,
+               "--timeout=10", "--print-content=false", "--print-headers=false"
+               ]
+        requests = self.popen(echo_cmd)
+        self.send_and_receive(send_config, response_config, count, reply_to=reply_to, wait_for_receiver=wait_for_receiver)
+        requests.wait()
+
+    def request_response_local(self, request_address, response_address, wait_for_receiver=False,
request_version="amqp1.0", echo_version="amqp1.0"):
+        self.request_response(response_address, send_config=Config(self.broker, address=request_address,
version=request_version), request_config=Config(self.broker, address=request_address, version=echo_version),
response_config=Config(self.broker, address=response_address, version=request_version), wait_for_receiver=wait_for_receiver)
+
+    def test_request_reponse_queue(self):
+        self.agent.create("queue", "q1")
+        self.agent.create("queue", "q2")
+        self.request_response_local("q1", "q2")
+
+    def test_request_reponse_queue_translated1(self):
+        self.agent.create("queue", "q1")
+        self.agent.create("queue", "q2")
+        self.request_response_local("q1", "q2", request_version="amqp0-10", echo_version="amqp1.0")
+
+    def test_request_reponse_queue_translated2(self):
+        self.agent.create("queue", "q1")
+        self.agent.create("queue", "q2")
+        self.request_response_local("q1", "q2", request_version="amqp1.0", echo_version="amqp0-10")
+
+    def test_request_reponse_exchange(self):
+        self.agent.create("queue", "q1")
+        self.request_response_local("q1", "amq.fanout", wait_for_receiver=True)
+
+    def test_request_reponse_exchange_translated1(self):
+        self.agent.create("queue", "q1")
+        self.request_response_local("q1", "amq.fanout", wait_for_receiver=True, request_version="amqp0-10",
echo_version="amqp1.0")
+
+    def test_request_reponse_exchange_translated2(self):
+        self.agent.create("queue", "q1")
+        self.request_response_local("q1", "amq.fanout", wait_for_receiver=True, request_version="amqp1.0",
echo_version="amqp0-10")
+
+    def test_request_reponse_exchange_with_subject(self):
+        self.agent.create("queue", "q1")
+        self.request_response_local("q1", "amq.topic/abc; {node:{type:topic}}", wait_for_receiver=True)
+
+    def test_request_reponse_exchange_with_subject_translated1(self):
+        self.agent.create("queue", "q1")
+        self.request_response_local("q1", "amq.topic/abc; {node:{type:topic}}", wait_for_receiver=True,
request_version="amqp0-10", echo_version="amqp1.0")
+
+    def test_request_reponse_exchange_with_subject_translated2(self):
+        self.agent.create("queue", "q1")
+        self.request_response_local("q1", "amq.topic/abc; {node:{type:topic}}", wait_for_receiver=True,
request_version="amqp1.0", echo_version="amqp0-10")
+
     def test_domain(self):
         brokerB = self.amqp_broker()
         self.agent.create("domain", "BrokerB", {"url":brokerB.host_port()})

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp?rev=1486989&r1=1486988&r2=1486989&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-receive.cpp Tue May 28 16:26:54 2013
@@ -250,10 +250,9 @@ int main(int argc, char ** argv)
                     if (s.isNull()) {
                         s = session.createSender(msg.getReplyTo());
                         s.setCapacity(opts.capacity);
+                        replyTo[msg.getReplyTo().str()] = s;
                     }
-                    if (!opts.replyto.empty()) {
-                        msg.setReplyTo(Address(opts.replyto));
-                    }
+                    msg.setReplyTo(Address(opts.replyto));
                     s.send(msg);
                 }
                 if (opts.receiveRate) {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message