qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cliffjan...@apache.org
Subject [2/2] qpid-proton git commit: PROTON-865: sync_request_response
Date Thu, 30 Jul 2015 00:05:58 GMT
PROTON-865: sync_request_response


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/8e1757eb
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/8e1757eb
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/8e1757eb

Branch: refs/heads/cjansen-cpp-client
Commit: 8e1757ebcf200d1475ec927a0d8e7eb7e2a9e06e
Parents: 4339b1c
Author: Clifford Jansen <cliffjansen@apache.org>
Authored: Wed Jul 29 16:54:42 2015 -0700
Committer: Clifford Jansen <cliffjansen@apache.org>
Committed: Wed Jul 29 16:54:42 2015 -0700

----------------------------------------------------------------------
 examples/cpp/CMakeLists.txt                     |  2 +
 examples/cpp/example_test.py                    | 40 +++++++++
 examples/cpp/options.hpp                        |  2 +-
 examples/cpp/server.cpp                         | 95 ++++++++++++++++++++
 examples/cpp/simple_recv.cpp                    |  2 +-
 examples/cpp/sync_client.cpp                    | 65 ++++++++++++++
 proton-c/bindings/cpp/CMakeLists.txt            |  1 +
 .../cpp/include/proton/blocking_connection.hpp  |  5 +-
 .../cpp/include/proton/blocking_receiver.hpp    | 16 +++-
 proton-c/bindings/cpp/include/proton/link.hpp   |  2 +-
 .../include/proton/sync_request_response.hpp    | 58 ++++++++++++
 .../bindings/cpp/src/blocking_connection.cpp    | 23 +++--
 .../cpp/src/blocking_connection_impl.cpp        |  4 +
 .../cpp/src/blocking_connection_impl.hpp        |  1 +
 proton-c/bindings/cpp/src/blocking_receiver.cpp | 54 +++++++++--
 .../bindings/cpp/src/sync_request_response.cpp  | 77 ++++++++++++++++
 16 files changed, 423 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/examples/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt
index fde4eee..f32c38a 100644
--- a/examples/cpp/CMakeLists.txt
+++ b/examples/cpp/CMakeLists.txt
@@ -30,6 +30,8 @@ foreach(example
     simple_send
     direct_recv
     direct_send
+    sync_client
+    server
     encode_decode)
   add_executable(${example} ${example}.cpp)
   target_link_libraries(${example} qpid-proton-cpp)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/examples/cpp/example_test.py
----------------------------------------------------------------------
diff --git a/examples/cpp/example_test.py b/examples/cpp/example_test.py
index 8dd4341..38e3c2c 100644
--- a/examples/cpp/example_test.py
+++ b/examples/cpp/example_test.py
@@ -102,6 +102,31 @@ class Broker(object):
         except Exception as e:
             raise Exception("Error running %s: %s", cmd, e)
 
+class Server(object):
+    """Run the test server"""
+
+    @classmethod
+    def get(cls, addr):
+        if not hasattr(cls, "_server"):
+            cls._server = Server(addr)
+        return cls._server
+
+    @classmethod
+    def stop(cls):
+        if cls.get(None) and cls._server.process:
+            cls._server.process.kill()
+            cls._server = None
+
+    def __init__(self, addr):
+        self.addr = addr
+        cmd = [exe_name("server"), "-a", self.addr]
+        try:
+            self.process = Popen(cmd, stdout=NULL, stderr=NULL)
+            time.sleep(0.3)
+
+        except Exception as e:
+            raise Exception("Error running %s: %s", cmd, e)
+
 class ExampleTest(unittest.TestCase):
     """Run the examples, verify they behave as expected."""
 
@@ -161,6 +186,21 @@ class ExampleTest(unittest.TestCase):
         recv_expect += "".join(['{"sequence"=%s}\n' % (i+1) for i in range(100)])
         self.assertEqual(recv_expect, verify(recv))
 
+    def test_sync_request_response(self):
+        """Start server first, then run sync_client"""
+        b = Broker.get()
+        s = Server.get(b.addr)
+        expect = """
+"Twas brillig, and the slithy toves" => "TWAS BRILLIG, AND THE SLITHY TOVES"
+"Did gire and gymble in the wabe." => "DID GIRE AND GYMBLE IN THE WABE."
+"All mimsy were the borogroves," => "ALL MIMSY WERE THE BOROGROVES,"
+"And the mome raths outgrabe." => "AND THE MOME RATHS OUTGRABE."
+"""
+        sc = "\n"
+        sc += execute("sync_client", "-a", b.addr)
+        self.assertEqual(expect, sc)
+        Server.stop()
+
     def test_encode_decode(self):
         expect="""
 == Simple values: int, string, bool

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/examples/cpp/options.hpp
----------------------------------------------------------------------
diff --git a/examples/cpp/options.hpp b/examples/cpp/options.hpp
index 5378507..bd477b5 100644
--- a/examples/cpp/options.hpp
+++ b/examples/cpp/options.hpp
@@ -125,7 +125,7 @@ class options {
             if (arg.compare(0, long_.size(), long_) == 0 && arg[long_.size()] ==
'=' ) {
                 set_value(long_, arg.substr(long_.size()+1));
                 return true;
-            } 
+            }
             return false;
         }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/examples/cpp/server.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/server.cpp b/examples/cpp/server.cpp
new file mode 100644
index 0000000..78b78d3
--- /dev/null
+++ b/examples/cpp/server.cpp
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include "proton/container.hpp"
+#include "proton/messaging_handler.hpp"
+#include "proton/url.hpp"
+
+#include <iostream>
+#include <map>
+#include <string>
+
+class server : public proton::messaging_handler {
+  private:
+    typedef std::map<std::string, proton::sender> sender_map;
+    proton::url url;
+    proton::connection connection;
+    sender_map senders;
+
+  public:
+
+    server(const proton::url &u) : url(u) {}
+
+    void on_start(proton::event &e) {
+        connection = e.container().connect(url);
+        e.container().create_receiver(connection, url.path());
+        std::cout << "server listening on " << url << std::endl;
+    }
+
+    std::string to_upper(const std::string &s) {
+        std::string uc(s);
+        size_t l = uc.size();
+        for (size_t i=0; i<l; i++) uc[i] = std::toupper(uc[i]);
+        return uc;
+    }
+
+    // TODO: on_connection_opened() and ANONYMOUS-RELAY
+
+    void on_message(proton::event &e) {
+        proton::sender sender;
+        proton::message msg = e.message();
+        std::cout << "Received " << msg.body() << std::endl;
+        std::string sender_id = msg.reply_to();
+        sender_map::iterator it = senders.find(sender_id);
+        if (it == senders.end()) {
+            sender = e.container().create_sender(connection, sender_id);
+            senders[sender_id] = sender;
+        }
+        else {
+            sender = it->second;
+        }
+        proton::message reply;
+        reply.body(to_upper(msg.body().get<std::string>()));
+        reply.correlation_id(msg.correlation_id());
+        reply.address(sender_id);
+        sender.send(reply);
+    }
+};
+
+int main(int argc, char **argv) {
+    // Command line options
+    proton::url url("amqp://127.0.0.1:5672/examples");
+    options opts(argc, argv);
+    opts.add_value(url, 'a', "address", "listen on URL", "URL");
+    try {
+        opts.parse();
+        server server(url);
+        proton::container(server).run();
+        return 0;
+    } catch (const bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/examples/cpp/simple_recv.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/simple_recv.cpp b/examples/cpp/simple_recv.cpp
index b2cec63..94cd398 100644
--- a/examples/cpp/simple_recv.cpp
+++ b/examples/cpp/simple_recv.cpp
@@ -77,7 +77,7 @@ int main(int argc, char **argv) {
         opts.parse();
         simple_recv recv(address, message_count);
         proton::container(recv).run();
-        return 0; 
+        return 0;
     } catch (const bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;
     } catch (const std::exception& e) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/examples/cpp/sync_client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/sync_client.cpp b/examples/cpp/sync_client.cpp
new file mode 100644
index 0000000..e141a3e
--- /dev/null
+++ b/examples/cpp/sync_client.cpp
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include "proton/container.hpp"
+#include "proton/sync_request_response.hpp"
+#include "proton/url.hpp"
+#include "proton/value.hpp"
+#include "proton/types.hpp"
+
+#include <iostream>
+#include <string>
+
+int main(int argc, char **argv) {
+    // Command line options
+    proton::url url("127.0.0.1:5672/examples");
+    uint64_t timeout(5000);
+    options opts(argc, argv);
+    opts.add_value(url, 'a', "address", "connect to URL", "URL");
+    opts.add_value(timeout, 't', "timeout", "give up after this TIMEOUT (milliseconds)",
"TIMEOUT");
+
+    std::string requests[] = { "Twas brillig, and the slithy toves",
+                               "Did gire and gymble in the wabe.",
+                               "All mimsy were the borogroves,",
+                               "And the mome raths outgrabe." };
+    int requests_size=4;
+
+    try {
+        opts.parse();
+        proton::duration d(timeout);
+        proton::blocking_connection conn(url, d);
+        proton::sync_request_response client(conn, url.path());
+        for (int i=0; i<requests_size; i++) {
+            proton::message request;
+            request.body(requests[i]);
+            proton::message response = client.call(request);
+            std::cout << request.body() << " => " << response.body()
<< std::endl;
+        }
+        return 0;
+    } catch (const bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index fcb0fb1..d1b1ebc 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -70,6 +70,7 @@ set(qpid-proton-cpp-source
   src/blocking_link.cpp
   src/blocking_sender.cpp
   src/blocking_receiver.cpp
+  src/sync_request_response.cpp
   src/contexts.cpp
   src/types.cpp
   )

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/proton-c/bindings/cpp/include/proton/blocking_connection.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/blocking_connection.hpp b/proton-c/bindings/cpp/include/proton/blocking_connection.hpp
index b6f0499..cba5f60 100644
--- a/proton-c/bindings/cpp/include/proton/blocking_connection.hpp
+++ b/proton-c/bindings/cpp/include/proton/blocking_connection.hpp
@@ -56,9 +56,10 @@ class blocking_connection : public handle<blocking_connection_impl>
 
     PN_CPP_EXTERN blocking_sender create_sender(const std::string &address, handler *h=0);
     PN_CPP_EXTERN blocking_receiver create_receiver(const std::string &address, int credit
= 0,
-                                                    bool dynamic = false, std::string name
= std::string());
+                                                    bool dynamic = false, handler *h=0, std::string
name = std::string());
     PN_CPP_EXTERN void wait(wait_condition &condition);
-    PN_CPP_EXTERN void wait(wait_condition &condition, std::string &msg, duration
timeout=duration::FOREVER);
+    PN_CPP_EXTERN void wait(wait_condition &condition, std::string &msg);
+    PN_CPP_EXTERN void wait(wait_condition &condition, std::string &msg, duration
timeout);
     PN_CPP_EXTERN duration timeout();
   private:
     friend class private_impl_ref<blocking_connection>;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/proton-c/bindings/cpp/include/proton/blocking_receiver.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/blocking_receiver.hpp b/proton-c/bindings/cpp/include/proton/blocking_receiver.hpp
index 319fd93..dbcc9d0 100644
--- a/proton-c/bindings/cpp/include/proton/blocking_receiver.hpp
+++ b/proton-c/bindings/cpp/include/proton/blocking_receiver.hpp
@@ -49,9 +49,21 @@ class blocking_receiver : public blocking_link
     PN_CPP_EXTERN void release(bool delivered = true);
     PN_CPP_EXTERN void settle();
     PN_CPP_EXTERN void settle(delivery::state state);
+    PN_CPP_EXTERN void flow(int count);
+    /** Credit available on the receiver link */
+    PN_CPP_EXTERN int credit();
+    /** Local source of the receiver link */
+    PN_CPP_EXTERN terminus source();
+    /** Local target of the receiver link */
+    PN_CPP_EXTERN terminus target();
+    /** Remote source of the receiver link */
+    PN_CPP_EXTERN terminus remote_source();
+    /** Remote target of the receiver link */
+    PN_CPP_EXTERN terminus remote_target();
+
   private:
-    blocking_receiver(blocking_connection &c, receiver &l, fetcher &f, int credit);
-    fetcher &fetcher_;
+    blocking_receiver(blocking_connection &c, receiver &l, fetcher *f, int credit);
+    fetcher *fetcher_;
     friend class blocking_connection;
 };
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/proton-c/bindings/cpp/include/proton/link.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/link.hpp b/proton-c/bindings/cpp/include/proton/link.hpp
index feabfec..46b8894 100644
--- a/proton-c/bindings/cpp/include/proton/link.hpp
+++ b/proton-c/bindings/cpp/include/proton/link.hpp
@@ -64,7 +64,7 @@ class link : public endpoint, public proton_handle<pn_link_t>
     PN_CPP_EXTERN terminus target();
     /** Remote source of the link */
     PN_CPP_EXTERN terminus remote_source();
-    /** Remote source of the link */
+    /** Remote target of the link */
     PN_CPP_EXTERN terminus remote_target();
     /** Link name */
     PN_CPP_EXTERN std::string name();

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/proton-c/bindings/cpp/include/proton/sync_request_response.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/sync_request_response.hpp b/proton-c/bindings/cpp/include/proton/sync_request_response.hpp
new file mode 100644
index 0000000..604af4b
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/sync_request_response.hpp
@@ -0,0 +1,58 @@
+#ifndef PROTON_CPP_SYNCREQUESTRESPONSE_H
+#define PROTON_CPP_SYNCREQUESTRESPONSE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/export.hpp"
+#include "proton/messaging_handler.hpp"
+#include "proton/blocking_receiver.hpp"
+#include "proton/blocking_sender.hpp"
+#include "proton/wait_condition.hpp"
+#include <string>
+
+struct pn_message_t;
+struct pn_data_t;
+
+namespace proton {
+
+/// An implementation of the synchronous request-response pattern (aka RPC).
+class sync_request_response : public messaging_handler
+{
+  public:
+    PN_CPP_EXTERN sync_request_response(blocking_connection &, const std::string address=std::string());
+    /** Send a request message, wait for and return the response message. */
+    PN_CPP_EXTERN message call(message &);
+    /** Return the dynamic address of our receiver. */
+    PN_CPP_EXTERN std::string reply_to();
+    /** Called when we receive a message for our receiver. */
+    void on_message(event &e);
+  private:
+    blocking_connection connection_;
+    std::string address_;
+    blocking_sender sender_;
+    blocking_receiver receiver_;
+    message response_;
+    amqp_ulong correlation_id_;
+};
+
+}
+
+#endif  /*!PROTON_CPP_SYNCREQUESTRESPONSE_H*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/proton-c/bindings/cpp/src/blocking_connection.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_connection.cpp b/proton-c/bindings/cpp/src/blocking_connection.cpp
index dd1aebb..c0c1477 100644
--- a/proton-c/bindings/cpp/src/blocking_connection.cpp
+++ b/proton-c/bindings/cpp/src/blocking_connection.cpp
@@ -50,6 +50,9 @@ blocking_connection::blocking_connection(const proton::url &url, duration
d, ssl
 void blocking_connection::close() { impl_->close(); }
 
 void blocking_connection::wait(wait_condition &cond) { return impl_->wait(cond); }
+void blocking_connection::wait(wait_condition &cond, std::string &msg) {
+    return impl_->wait(cond, msg);
+}
 void blocking_connection::wait(wait_condition &cond, std::string &msg, duration timeout)
{
     return impl_->wait(cond, msg, timeout);
 }
@@ -61,18 +64,22 @@ blocking_sender blocking_connection::create_sender(const std::string &address,
h
 
 namespace {
 struct fetcher_guard{
-    fetcher_guard(fetcher &f) : fetcher_(f) { fetcher_.incref(); }
-    ~fetcher_guard() { fetcher_.decref(); }
-    fetcher& fetcher_;
+    fetcher_guard(fetcher *f) : fetcher_(f) { if (fetcher_) fetcher_->incref(); }
+    ~fetcher_guard() { if (fetcher_) fetcher_->decref(); }
+    fetcher* fetcher_;
 };
 }
 
 blocking_receiver blocking_connection::create_receiver(const std::string &address, int
credit,
-                                                       bool dynamic, std::string name) {
-    fetcher *f = new fetcher(*this, credit);
-    fetcher_guard fg(*f);
-    receiver receiver = impl_->container_.create_receiver(impl_->connection_, address,
dynamic, f);
-    blocking_receiver brcv(*this, receiver, *f, credit);
+                                                       bool dynamic, handler *handler, std::string
name) {
+    fetcher *f = NULL;
+    if (!handler) {
+        f = new fetcher(*this, credit);
+        handler = f;
+    }
+    fetcher_guard fg(f);
+    receiver receiver = impl_->container_.create_receiver(impl_->connection_, address,
dynamic, handler);
+    blocking_receiver brcv(*this, receiver, f, credit);
     return brcv;
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/proton-c/bindings/cpp/src/blocking_connection_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_connection_impl.cpp b/proton-c/bindings/cpp/src/blocking_connection_impl.cpp
index 26d0e78..0e78541 100644
--- a/proton-c/bindings/cpp/src/blocking_connection_impl.cpp
+++ b/proton-c/bindings/cpp/src/blocking_connection_impl.cpp
@@ -88,6 +88,10 @@ void blocking_connection_impl::wait(wait_condition &condition) {
     wait(condition, empty, timeout_);
 }
 
+void blocking_connection_impl::wait(wait_condition &condition, std::string &msg)
{
+    wait(condition, msg, timeout_);
+}
+
 void blocking_connection_impl::wait(wait_condition &condition, std::string &msg,
duration wait_timeout) {
     if (wait_timeout == duration::FOREVER) {
         while (!condition.achieved()) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/proton-c/bindings/cpp/src/blocking_connection_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_connection_impl.hpp b/proton-c/bindings/cpp/src/blocking_connection_impl.hpp
index 7e8c031..11ff4fd 100644
--- a/proton-c/bindings/cpp/src/blocking_connection_impl.hpp
+++ b/proton-c/bindings/cpp/src/blocking_connection_impl.hpp
@@ -42,6 +42,7 @@ class ssl_domain;
     PN_CPP_EXTERN ~blocking_connection_impl();
     PN_CPP_EXTERN void close();
     PN_CPP_EXTERN void wait(wait_condition &condition);
+    PN_CPP_EXTERN void wait(wait_condition &condition, std::string &msg);
     PN_CPP_EXTERN void wait(wait_condition &condition, std::string &msg, duration
timeout);
     PN_CPP_EXTERN pn_connection_t *pn_blocking_connection();
     duration timeout() { return timeout_; }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/proton-c/bindings/cpp/src/blocking_receiver.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_receiver.cpp b/proton-c/bindings/cpp/src/blocking_receiver.cpp
index 042eb29..3ca53c0 100644
--- a/proton-c/bindings/cpp/src/blocking_receiver.cpp
+++ b/proton-c/bindings/cpp/src/blocking_receiver.cpp
@@ -39,11 +39,11 @@ struct fetcher_has_message : public wait_condition {
 } // namespace
 
 
-blocking_receiver::blocking_receiver(blocking_connection &c, receiver &l, fetcher
&f, int credit)
+blocking_receiver::blocking_receiver(blocking_connection &c, receiver &l, fetcher
*f, int credit)
     : blocking_link(&c, l.pn_link()), fetcher_(f) {
     std::string sa = link_.source().address();
     std::string rsa = link_.remote_source().address();
-    if (sa.empty() || sa.compare(rsa) != 0) {
+    if (!sa.empty() && sa.compare(rsa) != 0) {
         wait_for_closed();
         link_.close();
         std::string txt = "Failed to open receiver " + link_.name() + ", source does not
match";
@@ -51,30 +51,38 @@ blocking_receiver::blocking_receiver(blocking_connection &c, receiver
&l, fetche
     }
     if (credit)
         pn_link_flow(link_.pn_link(), credit);
-    fetcher_.incref();
+    if (fetcher_)
+        fetcher_->incref();
 }
 
 blocking_receiver::blocking_receiver(const blocking_receiver& r) : blocking_link(r),
fetcher_(r.fetcher_) {
-    fetcher_.incref();
+    if (fetcher_)
+        fetcher_->incref();
 }
 blocking_receiver& blocking_receiver::operator=(const blocking_receiver& r) {
     if (this == &r) return *this;
     fetcher_ = r.fetcher_;
-    fetcher_.incref();
+    if (fetcher_)
+        fetcher_->incref();
     return *this;
 }
-blocking_receiver::~blocking_receiver() { fetcher_.decref(); }
+blocking_receiver::~blocking_receiver() {
+    if (fetcher_)
+        fetcher_->decref();
+}
 
 
 
 message blocking_receiver::receive(duration timeout) {
+    if (!fetcher_)
+        throw error(MSG("Can't call receive on this receiver as a handler was provided"));
     receiver rcv = link_;
     if (!rcv.credit())
         rcv.flow(1);
     std::string txt = "Receiving on receiver " + link_.name();
-    fetcher_has_message cond(fetcher_);
+    fetcher_has_message cond(*fetcher_);
     connection_.wait(cond, txt, timeout);
-    return fetcher_.pop();
+    return fetcher_->pop();
 }
 
 message blocking_receiver::receive() {
@@ -98,7 +106,35 @@ void blocking_receiver::release(bool delivered) {
 }
 
 void blocking_receiver::settle(delivery::state state = delivery::NONE) {
-    fetcher_.settle(state);
+    if (!fetcher_)
+        throw error(MSG("Can't call accept/reject etc on this receiver as a handler was provided"));
+    fetcher_->settle(state);
+}
+
+void blocking_receiver::flow(int count) {
+    receiver rcv(link_);
+    rcv.flow(count);
+}
+
+int blocking_receiver::credit() {
+    return link_.credit();
+}
+
+terminus blocking_receiver::source() {
+    return link_.source();
 }
 
+terminus blocking_receiver::target() {
+    return link_.target();
+}
+
+terminus blocking_receiver::remote_source() {
+    return link_.remote_source();
+}
+
+terminus blocking_receiver::remote_target() {
+    return link_.remote_target();
+}
+
+
 } // namespace

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/proton-c/bindings/cpp/src/sync_request_response.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/sync_request_response.cpp b/proton-c/bindings/cpp/src/sync_request_response.cpp
new file mode 100644
index 0000000..eebd62d
--- /dev/null
+++ b/proton-c/bindings/cpp/src/sync_request_response.cpp
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/sync_request_response.hpp"
+#include "proton/event.hpp"
+#include "proton/error.hpp"
+#include "msg.hpp"
+
+namespace proton {
+
+namespace {
+amqp_ulong global_correlation_id = 0;
+message null_message;
+
+struct response_received : public wait_condition {
+    response_received(message &m, amqp_ulong id) : message_(m), id_(id) {}
+    bool achieved() { return message_ && message_.correlation_id() == id_; }
+    message &message_;
+    value id_;
+};
+
+}
+
+sync_request_response::sync_request_response(blocking_connection &conn, const std::string
addr):
+    connection_(conn), address_(addr),
+    sender_(connection_.create_sender(addr)),
+    receiver_(connection_.create_receiver("", 1, true, this)), // credit=1, dynamic=true
+    response_(null_message)
+{
+}
+
+message sync_request_response::call(message &request) {
+    if (address_.empty() && request.address().empty())
+        throw error(MSG("Request message has no address: " << request));
+    // TODO: thread safe increment.
+    correlation_id_ = global_correlation_id++;
+    request.correlation_id(value(correlation_id_));
+    request.reply_to(this->reply_to());
+    sender_.send(request);
+    std::string txt("Waiting for response");
+    response_received cond(response_, correlation_id_);
+    connection_.wait(cond, txt);
+    message resp = response_;
+    response_ = null_message;
+    receiver_.flow(1);
+    return resp;
+}
+
+std::string sync_request_response::reply_to() {
+    return receiver_.remote_source().address();
+}
+
+void sync_request_response::on_message(event &e) {
+    response_ = e.message();
+    // Wake up enclosing blocking_connection.wait() to handle the message
+    e.container().yield();
+}
+
+
+} // namespace


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message