qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [2/2] qpid-proton git commit: PROTON-865: Updated and simplified blocking connection code.
Date Wed, 02 Sep 2015 15:19:25 GMT
PROTON-865: Updated and simplified blocking connection code.

Simple ownership for all blocking_ classes, no refcounting.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/9c25d4c2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/9c25d4c2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/9c25d4c2

Branch: refs/heads/cjansen-cpp-client
Commit: 9c25d4c21845357eb22bd66b2363f4da0c50ee1a
Parents: 2b6240b
Author: Alan Conway <aconway@redhat.com>
Authored: Wed Sep 2 10:56:16 2015 -0400
Committer: Alan Conway <aconway@redhat.com>
Committed: Wed Sep 2 11:16:04 2015 -0400

----------------------------------------------------------------------
 examples/cpp/CMakeLists.txt                     |  4 +-
 examples/cpp/example_test.py                    | 26 +++---
 examples/cpp/helloworld_blocking.cpp            | 11 ++-
 examples/cpp/sync_client.cpp                    |  6 +-
 proton-c/bindings/cpp/CMakeLists.txt            | 19 ++--
 proton-c/bindings/cpp/README.md                 | 14 +--
 .../cpp/include/proton/blocking_connection.hpp  | 68 +++++---------
 .../cpp/include/proton/blocking_link.hpp        | 28 +++---
 .../cpp/include/proton/blocking_receiver.hpp    | 40 +++-----
 .../cpp/include/proton/blocking_sender.hpp      | 30 +++---
 .../bindings/cpp/include/proton/comparable.hpp  | 16 +---
 proton-c/bindings/cpp/include/proton/config.hpp |  9 --
 .../bindings/cpp/include/proton/connection.hpp  |  3 +
 .../bindings/cpp/include/proton/container.hpp   | 12 +--
 proton-c/bindings/cpp/include/proton/data.hpp   |  3 +-
 .../bindings/cpp/include/proton/delivery.hpp    |  2 +-
 .../bindings/cpp/include/proton/duration.hpp    |  2 +-
 proton-c/bindings/cpp/include/proton/facade.hpp |  7 ++
 .../bindings/cpp/include/proton/handler.hpp     | 12 ++-
 proton-c/bindings/cpp/include/proton/link.hpp   | 10 +-
 proton-c/bindings/cpp/include/proton/memory.hpp | 33 +++++++
 .../bindings/cpp/include/proton/message.hpp     | 16 ++--
 .../cpp/include/proton/messaging_handler.hpp    |  4 +-
 .../bindings/cpp/include/proton/reactor.hpp     |  1 +
 .../bindings/cpp/include/proton/receiver.hpp    |  2 +
 proton-c/bindings/cpp/include/proton/sender.hpp |  4 +-
 .../bindings/cpp/include/proton/session.hpp     |  2 +
 .../include/proton/sync_request_response.hpp    | 34 ++++---
 .../bindings/cpp/src/blocking_connection.cpp    | 58 +-----------
 .../cpp/src/blocking_connection_impl.cpp        | 97 +++++++++-----------
 .../cpp/src/blocking_connection_impl.hpp        | 43 ++++-----
 proton-c/bindings/cpp/src/blocking_fetcher.cpp  | 66 +++++++++++++
 proton-c/bindings/cpp/src/blocking_fetcher.hpp  | 48 ++++++++++
 proton-c/bindings/cpp/src/blocking_link.cpp     | 47 +++++-----
 proton-c/bindings/cpp/src/blocking_receiver.cpp | 92 ++++++-------------
 proton-c/bindings/cpp/src/blocking_sender.cpp   | 38 ++++----
 proton-c/bindings/cpp/src/connection.cpp        |  2 +
 proton-c/bindings/cpp/src/container.cpp         |  4 -
 proton-c/bindings/cpp/src/fetcher.cpp           | 92 -------------------
 proton-c/bindings/cpp/src/fetcher.hpp           | 58 ------------
 proton-c/bindings/cpp/src/link.cpp              |  8 +-
 proton-c/bindings/cpp/src/message.cpp           |  7 ++
 proton-c/bindings/cpp/src/messaging_handler.cpp | 17 ++--
 proton-c/bindings/cpp/src/receiver.cpp          |  2 +
 proton-c/bindings/cpp/src/sender.cpp            |  4 +-
 proton-c/bindings/cpp/src/session.cpp           |  2 +-
 .../bindings/cpp/src/sync_request_response.cpp  | 55 ++++-------
 proton-c/bindings/cpp/src/uuid.hpp              |  2 +
 48 files changed, 516 insertions(+), 644 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/examples/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt
index 40925f9..77ab816 100644
--- a/examples/cpp/CMakeLists.txt
+++ b/examples/cpp/CMakeLists.txt
@@ -24,13 +24,13 @@ include_directories(
 foreach(example
     broker
     helloworld
-    # helloworld_blocking
+    helloworld_blocking
     helloworld_direct
     simple_recv
     simple_send
     direct_recv
     direct_send
-    # sync_client
+    sync_client
     client
     server
     server_direct

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/examples/cpp/example_test.py
----------------------------------------------------------------------
diff --git a/examples/cpp/example_test.py b/examples/cpp/example_test.py
index 5059678..7922deb 100644
--- a/examples/cpp/example_test.py
+++ b/examples/cpp/example_test.py
@@ -32,7 +32,7 @@ def cmdline(*args):
         args[0] += ".exe"
     else:
         args[0] = "./" + args[0]
-    if "VALGRIND" in os.environ:
+    if "VALGRIND" in os.environ and os.environ["VALGRIND"]:
         args = [os.environ["VALGRIND"], "-q"] + args
     return args
 
@@ -119,11 +119,10 @@ class ExampleTest(unittest.TestCase):
         hw = execute("helloworld", b.addr)
         self.assertEqual('"Hello World!"\n', hw)
 
-        # FIXME aconway 2015-08-28: Restore blocking code and examples
-    # def test_helloworld_blocking(self):
-    #     b = Broker.get()
-    #     hw = execute("helloworld_blocking", b.addr, b.addr)
-    #     self.assertEqual('"Hello World!"\n', hw)
+    def test_helloworld_blocking(self):
+        b = Broker.get()
+        hw = execute("helloworld_blocking", b.addr, b.addr)
+        self.assertEqual('"Hello World!"\n', hw)
 
     def test_helloworld_direct(self):
         addr = pick_addr()
@@ -180,14 +179,13 @@ class ExampleTest(unittest.TestCase):
         finally:
             server.kill()
 
-        # FIXME aconway 2015-08-28: Restore blocking code and examples
-    # def test_sync_request_response(self):
-    #     b = Broker.get()
-    #     server = background("server", "-a", b.addr)
-    #     try:
-    #         self.assertEqual(execute("sync_client", "-a", b.addr), self.CLIENT_EXPECT)
-    #     finally:
-    #         server.kill()
+    def test_sync_request_response(self):
+        b = Broker.get()
+        server = background("server", "-a", b.addr)
+        try:
+            self.assertEqual(execute("sync_client", "-a", b.addr), self.CLIENT_EXPECT)
+        finally:
+            server.kill()
 
     def test_request_response_direct(self):
         addr = pick_addr()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/examples/cpp/helloworld_blocking.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/helloworld_blocking.cpp b/examples/cpp/helloworld_blocking.cpp
index 39cc10d..49094a3 100644
--- a/examples/cpp/helloworld_blocking.cpp
+++ b/examples/cpp/helloworld_blocking.cpp
@@ -20,6 +20,7 @@
  */
 
 #include "proton/messaging_handler.hpp"
+#include "proton/blocking_connection.hpp"
 #include "proton/blocking_sender.hpp"
 #include "proton/blocking_receiver.hpp"
 #include "proton/duration.hpp"
@@ -30,16 +31,16 @@ int main(int argc, char **argv) {
     try {
         proton::url url(argc > 1 ? argv[1] : "127.0.0.1:5672/examples");
         proton::blocking_connection conn(url);
-        proton::blocking_receiver receiver = conn.create_receiver(url.path());
-        proton::blocking_sender sender = conn.create_sender(url.path());
+        proton::blocking_receiver receiver(conn, url.path());
+        proton::blocking_sender sender(conn, url.path());
 
         proton::message_value m;
-        m->body("Hello World!");
-        sender.send(*m);
+        m.body("Hello World!");
+        sender.send(m);
 
         proton::duration timeout(30000);
         proton::message_value m2 = receiver.receive(timeout);
-        std::cout << m2->body() << std::endl;
+        std::cout << m2.body() << std::endl;
         receiver.accept();
 
         conn.close();

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/examples/cpp/sync_client.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/sync_client.cpp b/examples/cpp/sync_client.cpp
index 2bcb5c5..7bf08d9 100644
--- a/examples/cpp/sync_client.cpp
+++ b/examples/cpp/sync_client.cpp
@@ -22,9 +22,9 @@
 #include "options.hpp"
 
 #include "proton/container.hpp"
+#include "proton/blocking_connection.hpp"
 #include "proton/sync_request_response.hpp"
 #include "proton/url.hpp"
-#include "proton/value.hpp"
 #include "proton/types.hpp"
 
 #include <iostream>
@@ -53,8 +53,8 @@ int main(int argc, char **argv) {
         proton::sync_request_response client(conn, url.path());
         for (std::vector<std::string>::const_iterator i=requests.begin(); i != requests.end(); i++) {
             proton::message_value request;
-            request->body(*i);
-            proton::message_value response(client.call(*request));
+            request.body(*i);
+            proton::message_value response(client.call(request));
             std::cout << request.body() << " => " << response.body() << std::endl;
         }
         return 0;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index 5df220e..d4d0c18 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -39,12 +39,12 @@ include_directories(
 
 set(qpid-proton-cpp-source
   src/acceptor.cpp
-  # FIXME aconway 2015-08-28: Restore blocking code and examples
-  # src/blocking_connection.cpp
-  # src/blocking_connection_impl.cpp
-  # src/blocking_link.cpp
-  # src/blocking_receiver.cpp
-  # src/blocking_sender.cpp
+  src/blocking_connection.cpp
+  src/blocking_connection_impl.cpp
+  src/blocking_fetcher.cpp
+  src/blocking_link.cpp
+  src/blocking_receiver.cpp
+  src/blocking_sender.cpp
   src/connection.cpp
   src/connector.cpp
   src/container.cpp
@@ -58,8 +58,7 @@ set(qpid-proton-cpp-source
   src/endpoint.cpp
   src/error.cpp
   src/event.cpp
-  # FIXME aconway 2015-08-28: Restore blocking code and examples
-  # src/fetcher.cpp
+  src/facade.cpp
   src/handler.cpp
   src/link.cpp
   src/message.cpp
@@ -73,14 +72,12 @@ set(qpid-proton-cpp-source
   src/receiver.cpp
   src/sender.cpp
   src/session.cpp
-  # FIXME aconway 2015-08-28: Restore blocking code and examples
-  # src/sync_request_response.cpp
+  src/sync_request_response.cpp
   src/terminus.cpp
   src/transport.cpp
   src/types.cpp
   src/url.cpp
   src/uuid.cpp
-  src/facade.cpp
   )
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/README.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/README.md b/proton-c/bindings/cpp/README.md
index 4bb58c2..1b3ad82 100644
--- a/proton-c/bindings/cpp/README.md
+++ b/proton-c/bindings/cpp/README.md
@@ -15,13 +15,14 @@ Doc & website
 
 Tests
 - Interop/type testing: proton/tests/interop, new interop suite
-- Unit testing for reasonable code coverage.
-- Valgrind for automated unit and example tests.
+- More unit testing, measured code coverage.
 - Test examples against AM-Q and qpidd.
 
 Bugs
-- Error handling: examples exit silently on broker exit/not running, core on no-such-queue.
-- TODO notes in code.
+- Error handling:
+  - examples exit silently on broker exit/not running, core on no-such-queue (e.g. with qpidd)
+  - TODO/FIXME notes in code.
+- const correctness: consistent use of const where semantically appropriate in C++ APIs.
 
 Features
 - SASL/SSL support with interop tests.
@@ -32,11 +33,12 @@ Features
 - Durable subscriptions & demos (see python changes)
 - Transactions
 - Heartbeats
-- Tasks
 
 # Nice to have
 
+- C++11 lambda version of handlers.
 - Helpers (or at least doc) for multi-threaded use (container per connection)
 - Usable support for decimal types.
-- Endpoint conditions
+- Expose endpoint conditions as C++ proton::condition error class.
 - Selectables and 3rd party event loop support
+- More efficient shared_ptr (single family per proton object.)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/include/proton/blocking_connection.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/blocking_connection.hpp b/proton-c/bindings/cpp/include/proton/blocking_connection.hpp
index c9a5133..87710a0 100644
--- a/proton-c/bindings/cpp/include/proton/blocking_connection.hpp
+++ b/proton-c/bindings/cpp/include/proton/blocking_connection.hpp
@@ -1,5 +1,5 @@
-#ifndef PROTON_CPP_BLOCKINGCONNECTION_H
-#define PROTON_CPP_BLOCKINGCONNECTION_H
+#ifndef PROTON_CPP_BLOCKING_CONNECTION_H
+#define PROTON_CPP_BLOCKING_CONNECTION_H
 
 /*
  *
@@ -21,67 +21,41 @@
  * under the License.
  *
  */
+
+#include "proton/memory.hpp"
 #include "proton/export.hpp"
-#include "proton/facade.hpp"
 #include "proton/duration.hpp"
-#include <string>
+#include "proton/memory.hpp"
 
-struct pn_connection_t;
+#include <string>
 
 namespace proton {
 class url;
-class ssl_domain;
-class blocking_sender;
-class blocking_receiver;
+class connection;
+class blocking_connection_impl;
 
 // TODO documentation
-class blocking_connection : public handle<blocking_connection_impl>
+// Note: must not be deleted while there are proton::blocking_link instances that depend on it.
+class blocking_connection
 {
   public:
-    PN_CPP_EXTERN blocking_connection();
-    PN_CPP_EXTERN blocking_connection(const blocking_connection& c);
-    PN_CPP_EXTERN blocking_connection& operator=(const blocking_connection& c);
-    PN_CPP_EXTERN ~blocking_connection();
-
-    PN_CPP_EXTERN blocking_connection(const proton::url &url, duration = duration::FOREVER,
-                                      ssl_domain *ssld=0, container *c=0);
+    PN_CPP_EXTERN blocking_connection(const proton::url &url, duration timeout = duration::FOREVER);
     PN_CPP_EXTERN void close();
-
-    PN_CPP_EXTERN blocking_sender create_sender(const std::string &address, handler *h=0);
-    PN_CPP_EXTERN blocking_receiver create_receiver(const std::string &address, int credit = 0,
-                                                    bool dynamic = false, handler *h=0, std::string name = std::string());
-
-    /// Abstract condition class for wait.
-    struct condition {
-        virtual ~condition() {}
-        virtual bool operator()() = 0;
-    };
-
-    /** Wait till cond returns true. 
-     * C must be copyable and callable with no arguments and bool return value.
-     * Wait up to timeout if specified or blocking_connection::timeout() if not.
-     * @throws timeout_error with message msg if timeout is exceeded.
-     */
-    template <class C> void wait(C cond, const std::string &msg="", duration timeout=duration(-1)) {
-        condition_impl<C> c(cond);
-        wait(dynamic_cast<condition&>(c), msg, timeout);
-    }
-
     PN_CPP_EXTERN duration timeout();
-  private:
-
-    PN_CPP_EXTERN void wait(condition &, const std::string &msg="", duration timeout=duration(-1));
+    class connection& connection();
 
+  private:
+    blocking_connection(const blocking_connection&);
+    blocking_connection& operator=(const blocking_connection&);
 
-    template <class C> struct condition_impl : public condition {
-        C cond_;
-        condition_impl(C c) : cond_(c) {}
-        bool operator()() { return cond_(); }
-    };
+    PN_UNIQUE_PTR<blocking_connection_impl> impl_;
 
-  friend class private_impl_ref<blocking_connection>;
+  friend class blocking_link;
+  friend class blocking_sender;
+  friend class blocking_receiver;
+  friend class sync_request_response;
 };
 
 }
 
-#endif  /*!PROTON_CPP_BLOCKINGCONNECTION_H*/
+#endif  /*!PROTON_CPP_BLOCKING_CONNECTION_H*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/include/proton/blocking_link.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/blocking_link.hpp b/proton-c/bindings/cpp/include/proton/blocking_link.hpp
index 3ec945a..6913654 100644
--- a/proton-c/bindings/cpp/include/proton/blocking_link.hpp
+++ b/proton-c/bindings/cpp/include/proton/blocking_link.hpp
@@ -22,16 +22,13 @@
  *
  */
 #include "proton/export.hpp"
-#include "proton/endpoint.hpp"
-#include "proton/container.hpp"
+#include "proton/counted_ptr.hpp"
 #include "proton/duration.hpp"
-#include "proton/messaging_handler.hpp"
-#include "proton/blocking_connection.hpp"
-#include "proton/types.h"
+
 #include <string>
 
 namespace proton {
-
+class link;
 class blocking_connection;
 
 // TODO documentation
@@ -40,16 +37,19 @@ class blocking_link
   public:
     PN_CPP_EXTERN void close();
     PN_CPP_EXTERN ~blocking_link();
+
   protected:
-    PN_CPP_EXTERN blocking_link(blocking_connection *c, pn_link_t *l);
-    PN_CPP_EXTERN void wait_for_closed(duration timeout=duration::SECOND);
+    blocking_link(blocking_connection&);
+    void open(link&);
+    void check_closed();
+    void wait_for_closed();
+
+    blocking_connection& connection_;
+    counted_ptr<link> link_;
+
   private:
-    blocking_connection connection_;
-    link link_;
-    PN_CPP_EXTERN void check_closed();
-    friend class blocking_connection;
-    friend class blocking_sender;
-    friend class blocking_receiver;
+    blocking_link(const blocking_link&);
+    blocking_link& operator=(const blocking_link&);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/include/proton/blocking_receiver.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/blocking_receiver.hpp b/proton-c/bindings/cpp/include/proton/blocking_receiver.hpp
index dbcc9d0..0057cef 100644
--- a/proton-c/bindings/cpp/include/proton/blocking_receiver.hpp
+++ b/proton-c/bindings/cpp/include/proton/blocking_receiver.hpp
@@ -21,50 +21,40 @@
  * under the License.
  *
  */
+#include "proton/memory.hpp"
 #include "proton/export.hpp"
-#include "proton/container.hpp"
+#include "proton/delivery.hpp"
+#include "proton/message.hpp"
 #include "proton/blocking_link.hpp"
-#include "proton/duration.hpp"
-#include "proton/messaging_handler.hpp"
-#include "proton/types.h"
-#include "proton/delivery.h"
+
 #include <string>
 
 namespace proton {
-
+class receiver;
 class blocking_connection;
-class blocking_link;
-class fetcher;
+class blocking_fetcher;
 
+// TODO documentation
 class blocking_receiver : public blocking_link
 {
   public:
-    PN_CPP_EXTERN blocking_receiver(const blocking_receiver&);
-    PN_CPP_EXTERN blocking_receiver& operator=(const blocking_receiver&);
+    PN_CPP_EXTERN blocking_receiver(
+        blocking_connection&, const std::string &address, int credit = 0, bool dynamic = false);
     PN_CPP_EXTERN ~blocking_receiver();
-    PN_CPP_EXTERN message receive();
-    PN_CPP_EXTERN message receive(duration timeout);
+
+    PN_CPP_EXTERN message_value receive();
+    PN_CPP_EXTERN message_value receive(duration timeout);
+
     PN_CPP_EXTERN void accept();
     PN_CPP_EXTERN void reject();
     PN_CPP_EXTERN void release(bool delivered = true);
     PN_CPP_EXTERN void settle();
     PN_CPP_EXTERN void settle(delivery::state state);
     PN_CPP_EXTERN void flow(int count);
-    /** Credit available on the receiver link */
-    PN_CPP_EXTERN int credit();
-    /** Local source of the receiver link */
-    PN_CPP_EXTERN terminus source();
-    /** Local target of the receiver link */
-    PN_CPP_EXTERN terminus target();
-    /** Remote source of the receiver link */
-    PN_CPP_EXTERN terminus remote_source();
-    /** Remote target of the receiver link */
-    PN_CPP_EXTERN terminus remote_target();
 
+    PN_CPP_EXTERN class receiver& receiver();
   private:
-    blocking_receiver(blocking_connection &c, receiver &l, fetcher *f, int credit);
-    fetcher *fetcher_;
-    friend class blocking_connection;
+    PN_UNIQUE_PTR<blocking_fetcher> fetcher_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/include/proton/blocking_sender.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/blocking_sender.hpp b/proton-c/bindings/cpp/include/proton/blocking_sender.hpp
index 55e340e..334faad 100644
--- a/proton-c/bindings/cpp/include/proton/blocking_sender.hpp
+++ b/proton-c/bindings/cpp/include/proton/blocking_sender.hpp
@@ -1,5 +1,5 @@
-#ifndef PROTON_CPP_BLOCKINGSENDER_H
-#define PROTON_CPP_BLOCKINGSENDER_H
+#ifndef PROTON_CPP_BLOCKING_SENDER_H
+#define PROTON_CPP_BLOCKING_SENDER_H
 
 /*
  *
@@ -22,31 +22,29 @@
  *
  */
 #include "proton/export.hpp"
-#include "proton/endpoint.hpp"
-#include "proton/container.hpp"
-#include "proton/duration.hpp"
-#include "proton/messaging_handler.hpp"
+#include "proton/delivery.hpp"
+#include "proton/message.hpp"
 #include "proton/blocking_link.hpp"
-#include "proton/types.h"
-#include "proton/delivery.h"
+
 #include <string>
 
 namespace proton {
-
+class sender;
 class blocking_connection;
-class blocking_link;
 
 // TODO documentation
 class blocking_sender : public blocking_link
 {
   public:
-    PN_CPP_EXTERN delivery send(message &msg);
-    PN_CPP_EXTERN delivery send(message &msg, duration timeout);
-  private:
-    PN_CPP_EXTERN blocking_sender(blocking_connection &c, sender &l);
-    friend class blocking_connection;
+    PN_CPP_EXTERN blocking_sender(class blocking_connection &c, const std::string &address);
+    PN_CPP_EXTERN ~blocking_sender();
+
+    PN_CPP_EXTERN delivery& send(const message_value &msg);
+    PN_CPP_EXTERN delivery& send(const message_value &msg, duration timeout);
+
+    PN_CPP_EXTERN class sender& sender();
 };
 
 }
 
-#endif  /*!PROTON_CPP_BLOCKINGSENDER_H*/
+#endif  /*!PROTON_CPP_BLOCKING_SENDER_H*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/include/proton/comparable.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/comparable.hpp b/proton-c/bindings/cpp/include/proton/comparable.hpp
index b409859..2649a8d 100644
--- a/proton-c/bindings/cpp/include/proton/comparable.hpp
+++ b/proton-c/bindings/cpp/include/proton/comparable.hpp
@@ -23,20 +23,14 @@ namespace proton {
 
 ///@cond INTERNAL
 
-/// Internal base class to provide comparison operators.
+/// Internal base class to provide comparison operators. T must provide < and ==.
 template <class T> struct comparable {
+  friend bool operator>(const T &a, const T &b) { return b < a; }
+  friend bool operator<=(const T &a, const T &b) { return !(a > b); }
+  friend bool operator>=(const T &a, const T &b) { return !(a < b); }
+  friend bool operator!=(const T &a, const T &b) { return !(a == b); }
 };
 
-template<class T> bool operator<(const comparable<T>& a, const comparable<T>& b) {
-    return static_cast<const T&>(a) < static_cast<const T&>(b); // operator < provided by type T
-}
-
-template<class T> bool operator>(const comparable<T>& a, const comparable<T>& b) { return b < a; }
-template<class T> bool operator<=(const comparable<T>& a, const comparable<T>& b) { return !(a > b); }
-template<class T> bool operator>=(const comparable<T>& a, const comparable<T>& b) { return !(a < b); }
-template<class T> bool operator==(const comparable<T>& a, const comparable<T>& b) { return a <= b && b <= a; }
-template<class T> bool operator!=(const comparable<T>& a, const comparable<T>& b) { return !(a == b); }
-
 ///@endcond
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/include/proton/config.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/config.hpp b/proton-c/bindings/cpp/include/proton/config.hpp
index 6436d1a..7f0a8f4 100644
--- a/proton-c/bindings/cpp/include/proton/config.hpp
+++ b/proton-c/bindings/cpp/include/proton/config.hpp
@@ -39,13 +39,4 @@
 #endif
 #endif
 
-#if PN_USE_CPP11
-// Simple ownership pointer, use std::unique_ptr in C++11 std::auto_ptr otherwise.
-#define PN_UNIQUE_PTR std::unique_ptr
-#else
-// Simple ownership pointer, use std::unique_ptr in C++11 std::auto_ptr otherwise.
-#define PN_UNIQUE_PTR std::auto_ptr
-#endif
-
-
 #endif // CONFIG_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/include/proton/connection.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection.hpp b/proton-c/bindings/cpp/include/proton/connection.hpp
index 03e58d5..bd1d9f5 100644
--- a/proton-c/bindings/cpp/include/proton/connection.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection.hpp
@@ -71,6 +71,9 @@ class connection : public counted_facade<pn_connection_t, connection>, public en
      * @see link::next, endpoint::state
      */
     PN_CPP_EXTERN link* link_head(endpoint::state mask);
+
+    /** Get the endpoint state */
+    PN_CPP_EXTERN endpoint::state state();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/include/proton/container.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp
index b6b7f18..dc87082 100644
--- a/proton-c/bindings/cpp/include/proton/container.hpp
+++ b/proton-c/bindings/cpp/include/proton/container.hpp
@@ -25,6 +25,7 @@
 #include "proton/export.hpp"
 #include "proton/reactor.hpp"
 #include "proton/url.hpp"
+#include "proton/memory.hpp"
 
 #include <string>
 
@@ -71,20 +72,15 @@ class container
     /// Identifier for the container
     PN_CPP_EXTERN std::string container_id();
 
-    /// Get timeout, process() will return if there is no activity within the timeout.
-    PN_CPP_EXTERN duration timeout();
-
-    /// Set timeout, process() will return if there is no activity within the timeout.
-    PN_CPP_EXTERN void timeout(duration timeout);
-
-    PN_CPP_EXTERN class reactor& reactor();
-
     /// Set the prefix to be used when generating link names. @see proton::session
     PN_CPP_EXTERN void link_prefix(const std::string&);
 
     /// Get the prefix to be used when generating link names. @see proton::session
     PN_CPP_EXTERN std::string link_prefix();
 
+    /// The reactor associated with this container.
+    PN_CPP_EXTERN class reactor& reactor();
+
   private:
     PN_UNIQUE_PTR<container_impl> impl_;
 };

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/include/proton/data.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/data.hpp b/proton-c/bindings/cpp/include/proton/data.hpp
index 6f6ff58..0448f89 100644
--- a/proton-c/bindings/cpp/include/proton/data.hpp
+++ b/proton-c/bindings/cpp/include/proton/data.hpp
@@ -23,6 +23,7 @@
 #include "proton/facade.hpp"
 #include "proton/decoder.hpp"
 #include "proton/encoder.hpp"
+#include "proton/memory.hpp"
 #include <iosfwd>
 
 struct pn_data_t;
@@ -75,7 +76,7 @@ class data : public facade<pn_data_t, data>, public comparable<data> {
 };
 
 /** data with normal value semantics: copy, assign etc. */
-class data_value : public comparable<data_value> {
+class data_value {
   public:
     data_value() : data_(data::create()) {}
     data_value(const data_value& x) : data_(data::create()) { *data_ = *x.data_; }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/include/proton/delivery.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/delivery.hpp b/proton-c/bindings/cpp/include/proton/delivery.hpp
index 5528677..ca90831 100644
--- a/proton-c/bindings/cpp/include/proton/delivery.hpp
+++ b/proton-c/bindings/cpp/include/proton/delivery.hpp
@@ -31,7 +31,7 @@ namespace proton {
 
 /** delivery status of a message */
 class delivery : public counted_facade<pn_delivery_t, delivery> {
-
+  public:
     /** Delivery state of a message */
     enum state {
         NONE = 0, ///< Unknown state

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/include/proton/duration.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/duration.hpp b/proton-c/bindings/cpp/include/proton/duration.hpp
index 8b7d1ad..387cf2c 100644
--- a/proton-c/bindings/cpp/include/proton/duration.hpp
+++ b/proton-c/bindings/cpp/include/proton/duration.hpp
@@ -32,7 +32,7 @@ class duration : public comparable<duration>
 {
   public:
     std::uint64_t milliseconds;
-    explicit duration(std::uint64_t ms) : milliseconds(ms) {}
+    explicit duration(std::uint64_t ms = 0) : milliseconds(ms) {}
 
     bool operator<(duration d) { return milliseconds < d.milliseconds; }
     bool operator==(duration d) { return milliseconds == d.milliseconds; }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/include/proton/facade.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/facade.hpp b/proton-c/bindings/cpp/include/proton/facade.hpp
index 7455f8d..d9696f6 100644
--- a/proton-c/bindings/cpp/include/proton/facade.hpp
+++ b/proton-c/bindings/cpp/include/proton/facade.hpp
@@ -96,6 +96,13 @@ template <class T> typename T::pn_type* pn_cast(const T* p) {
     return reinterpret_cast<typename T::pn_type*>(const_cast<T*>(p));
 }
 
+/** Cast a counted pointer to a facade type to the C struct type.
+ * Allow casting away const, the underlying pn structs have not constness.
+ */
+template <class T> typename T::pn_type* pn_cast(const counted_ptr<T>& p) {
+    return reinterpret_cast<typename T::pn_type*>(const_cast<T*>(p.get()));
+}
+
 
 ///@cond INTERNAL
 class pn_counted {};

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/include/proton/handler.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/handler.hpp b/proton-c/bindings/cpp/include/proton/handler.hpp
index c1d4f49..cfd2689 100644
--- a/proton-c/bindings/cpp/include/proton/handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/handler.hpp
@@ -32,9 +32,15 @@ namespace proton {
  *
  * A handler can have child handlers which are called in order, after the parent handler.
  *
- * Note: Event handlers are not deleted by the proton library, and they must not be
- * deleted while they are still in use. You can allocate a handler with `new` and
- * call `delete this` in the appropriate `on_*_closed` or `on_*_final` event if you wish.
+ * Note: handlers are not deleted automatically. They must not be deleted while
+ * they are still in use.
+ *
+ * There are two simple strategies you can use:
+ *
+ * 1. Destroy handlers only after the container that uses them is closed.
+ *
+ * 2. Allocate handlers with `new` and call `delete this` in the appropriate
+ * `on_*_closed` or `on_*_final` event that indicates the handler is no longer needed.
  *
  */
 class handler : public std::vector<handler*> {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/include/proton/link.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/link.hpp b/proton-c/bindings/cpp/include/proton/link.hpp
index 1ff8a4d..3ef84c7 100644
--- a/proton-c/bindings/cpp/include/proton/link.hpp
+++ b/proton-c/bindings/cpp/include/proton/link.hpp
@@ -89,8 +89,14 @@ class link : public counted_facade<pn_link_t, link>, public endpoint
     /** Connection that owns this link */
     PN_CPP_EXTERN class connection &connection();
 
-    /** Set a custom handler for this link */
-    PN_CPP_EXTERN void handler(class handler&);
+    /** Set a custom handler for this link. */
+    PN_CPP_EXTERN void handler(class handler &);
+
+    /** Unset any custom handler */
+    PN_CPP_EXTERN void detach_handler();
+
+    /** Get the endpoint state */
+    PN_CPP_EXTERN endpoint::state state();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/include/proton/memory.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/memory.hpp b/proton-c/bindings/cpp/include/proton/memory.hpp
new file mode 100644
index 0000000..0a3b05d
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/memory.hpp
@@ -0,0 +1,33 @@
+#ifndef UNIQUE_PTR_HPP
+#define UNIQUE_PTR_HPP
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "proton/config.hpp"
+#include <memory>
+
+#if PN_USE_CPP11
+// Simple ownership pointer, use std::unique_ptr in C++11 std::auto_ptr otherwise.
+#define PN_UNIQUE_PTR std::unique_ptr
+#else
+// Simple ownership pointer, use std::unique_ptr in C++11 std::auto_ptr otherwise.
+#define PN_UNIQUE_PTR std::auto_ptr
+#endif
+
+#endif // UNIQUE_PTR_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/include/proton/message.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/message.hpp b/proton-c/bindings/cpp/include/proton/message.hpp
index 0e38a2e..f94f9da 100644
--- a/proton-c/bindings/cpp/include/proton/message.hpp
+++ b/proton-c/bindings/cpp/include/proton/message.hpp
@@ -24,6 +24,8 @@
 #include "proton/export.hpp"
 #include "proton/data.hpp"
 #include "proton/facade.hpp"
+#include "proton/memory.hpp"
+
 #include <string>
 
 struct pn_message_t;
@@ -33,9 +35,7 @@ namespace proton {
 class link;
 class delivery;
 
-class message;
-
-/** An AMQP message. Not directly construct-able, use create() or value<message>.*/
+/** An AMQP message. Not directly construct-able, use create() or message_value.*/
 class message : public facade<pn_message_t, message>
 {
   public:
@@ -44,7 +44,7 @@ class message : public facade<pn_message_t, message>
     /// Copy data from m to this.
     message& operator=(const message& m);
 
-    /** Clear the message content */
+    /** Clear the message content and properties. */
     PN_CPP_EXTERN void clear();
 
     ///@name Message properties
@@ -133,6 +133,8 @@ class message_value {
     message_value& operator=(const message_value& x) { *message_ = *x.message_; return *this; }
     message_value& operator=(const message& x) { *message_ = x; return *this; }
 
+    // TODO aconway 2015-09-02: C++11 move semantics.
+
     operator message&() { return *message_; }
     operator const message&() const { return *message_; }
 
@@ -212,10 +214,12 @@ class message_value {
     /// Decode the message from link corresponding to delivery.
     void decode(proton::link& l, proton::delivery& d) { message_->decode(l, d); }
 
+    void swap(message_value& x);
+
   private:
-    PN_UNIQUE_PTR<message> message_;
+    PN_UNIQUE_PTR<class message> message_;
 };
-    
+
 }
 
 #endif  /*!PROTON_CPP_MESSAGE_H*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/messaging_handler.hpp b/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
index 55d1867..389b73f 100644
--- a/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
+++ b/proton-c/bindings/cpp/include/proton/messaging_handler.hpp
@@ -95,8 +95,8 @@ class messaging_handler : public proton_handler
     bool auto_accept_;
     bool auto_settle_;
     bool peer_close_iserror_;
-    messaging_adapter *messaging_adapter_;
-    handler *flow_controller_;
+    PN_UNIQUE_PTR<messaging_adapter> messaging_adapter_;
+    PN_UNIQUE_PTR<handler> flow_controller_;
     PN_CPP_EXTERN messaging_handler(
         bool raw_handler, int prefetch=10, bool auto_accept=true,
         bool auto_settle=true, bool peer_close_is_error=false);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/include/proton/reactor.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/reactor.hpp b/proton-c/bindings/cpp/include/proton/reactor.hpp
index b07f10a..37dec6e 100644
--- a/proton-c/bindings/cpp/include/proton/reactor.hpp
+++ b/proton-c/bindings/cpp/include/proton/reactor.hpp
@@ -21,6 +21,7 @@
 
 #include "proton/facade.hpp"
 #include "proton/duration.hpp"
+#include "proton/memory.hpp"
 
 struct pn_reactor_t;
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/include/proton/receiver.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/receiver.hpp b/proton-c/bindings/cpp/include/proton/receiver.hpp
index 05bd263..4d1d9ab 100644
--- a/proton-c/bindings/cpp/include/proton/receiver.hpp
+++ b/proton-c/bindings/cpp/include/proton/receiver.hpp
@@ -37,6 +37,8 @@ class receiver : public link, public ptr_convertible<receiver>
   public:
     /// Add credit to the link
     PN_CPP_EXTERN void flow(int count);
+
+    PN_CPP_EXTERN receiver* cast(pn_type*);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/include/proton/sender.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/sender.hpp b/proton-c/bindings/cpp/include/proton/sender.hpp
index 65e38c2..03a52c9 100644
--- a/proton-c/bindings/cpp/include/proton/sender.hpp
+++ b/proton-c/bindings/cpp/include/proton/sender.hpp
@@ -38,7 +38,9 @@ class sender : public link, public ptr_convertible<sender>
 {
   public:
     /// Send a message on the link.
-    PN_CPP_EXTERN delivery& send(message &m);
+    PN_CPP_EXTERN delivery& send(const message &m);
+
+    PN_CPP_EXTERN sender* cast(pn_type*);
 };
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/include/proton/session.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/session.hpp b/proton-c/bindings/cpp/include/proton/session.hpp
index 4e3131f..bf4c96d 100644
--- a/proton-c/bindings/cpp/include/proton/session.hpp
+++ b/proton-c/bindings/cpp/include/proton/session.hpp
@@ -74,6 +74,8 @@ class session : public counted_facade<pn_session_t, session>, public endpoint
     /** Create and open a receiver with target=addr and optional handler h */
     PN_CPP_EXTERN receiver& create_receiver(const std::string &addr, bool dynamic=false, handler *h=0);
 
+    /** Get the endpoint state */
+    PN_CPP_EXTERN endpoint::state state();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/include/proton/sync_request_response.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/sync_request_response.hpp b/proton-c/bindings/cpp/include/proton/sync_request_response.hpp
index d91929c..8c54d64 100644
--- a/proton-c/bindings/cpp/include/proton/sync_request_response.hpp
+++ b/proton-c/bindings/cpp/include/proton/sync_request_response.hpp
@@ -1,5 +1,5 @@
-#ifndef PROTON_CPP_SYNCREQUESTRESPONSE_H
-#define PROTON_CPP_SYNCREQUESTRESPONSE_H
+#ifndef PROTON_CPP_SYNC_REQUEST_RESPONSE_H
+#define PROTON_CPP_SYNC_REQUEST_RESPONSE_H
 
 /*
  *
@@ -25,6 +25,8 @@
 #include "proton/messaging_handler.hpp"
 #include "proton/blocking_receiver.hpp"
 #include "proton/blocking_sender.hpp"
+#include "proton/memory.hpp"
+
 #include <string>
 
 struct pn_message_t;
@@ -32,26 +34,30 @@ struct pn_data_t;
 
 namespace proton {
 
-/// An implementation of the synchronous request-response pattern (aka RPC).
-class sync_request_response : public messaging_handler
+/**
+ * An implementation of the synchronous request-response pattern (aka RPC).
+ */
+class sync_request_response
 {
   public:
-    PN_CPP_EXTERN sync_request_response(blocking_connection &, const std::string address=std::string());
-    /** Send a request message, wait for and return the response message. */
-    PN_CPP_EXTERN message call(message &);
+    PN_CPP_EXTERN sync_request_response(
+        blocking_connection &, const std::string address=std::string());
+    /**
+     * Send a request message, wait for and return the response message.
+     * Modifies the message to set `address` (if not already set), `reply_to` and `correlation_id`.
+     */
+    PN_CPP_EXTERN message_value call(message &);
     /** Return the dynamic address of our receiver. */
     PN_CPP_EXTERN std::string reply_to();
-    /** Called when we receive a message for our receiver. */
-    void on_message(event &e);
+
   private:
-    blocking_connection connection_;
+    blocking_connection &connection_;
     std::string address_;
-    blocking_sender sender_;
-    blocking_receiver receiver_;
-    PN_UNIQUE_OR_AUTO_PTR<message> response_;
+    PN_UNIQUE_PTR<blocking_sender> sender_;
+    PN_UNIQUE_PTR<blocking_receiver> receiver_;
     amqp_ulong correlation_id_;
 };
 
 }
 
-#endif  /*!PROTON_CPP_SYNCREQUESTRESPONSE_H*/
+#endif  /*!PROTON_CPP_SYNC_REQUEST_RESPONSE_H*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/src/blocking_connection.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_connection.cpp b/proton-c/bindings/cpp/src/blocking_connection.cpp
index 6d1c051..5be59c8 100644
--- a/proton-c/bindings/cpp/src/blocking_connection.cpp
+++ b/proton-c/bindings/cpp/src/blocking_connection.cpp
@@ -20,66 +20,18 @@
  */
 #include "proton/container.hpp"
 #include "proton/blocking_connection.hpp"
-#include "proton/blocking_sender.hpp"
-#include "proton/blocking_receiver.hpp"
-#include "proton/messaging_handler.hpp"
-#include "proton/sender.hpp"
-#include "proton/receiver.hpp"
-#include "proton/url.hpp"
-#include "proton/error.hpp"
-#include "fetcher.hpp"
-#include "msg.hpp"
 #include "blocking_connection_impl.hpp"
 
 namespace proton {
 
-template class handle<blocking_connection_impl>;
-typedef private_impl_ref<blocking_connection> PI;
-
-blocking_connection::blocking_connection() {PI::ctor(*this, 0); }
-
-blocking_connection::blocking_connection(const blocking_connection& c) : handle<blocking_connection_impl>() { PI::copy(*this, c); }
-
-blocking_connection& blocking_connection::operator=(const blocking_connection& c) { return PI::assign(*this, c); }
-blocking_connection::~blocking_connection() { PI::dtor(*this); }
-
-blocking_connection::blocking_connection(const proton::url &url, duration d, ssl_domain *ssld, container *c) {
-    blocking_connection_impl *cimpl = new blocking_connection_impl(url, d,ssld, c);
-    PI::ctor(*this, cimpl);
-}
+blocking_connection::blocking_connection(const proton::url &url, duration timeout) :
+    impl_(new blocking_connection_impl(url, timeout))
+{}
 
 void blocking_connection::close() { impl_->close(); }
 
-void blocking_connection::wait(condition &cond, const std::string &msg, duration timeout) {
-    return impl_->wait(cond, msg, timeout);
-}
-
-blocking_sender blocking_connection::create_sender(const std::string &address, handler *h) {
-    counted_ptr<sender> snd = impl_->container_.create_sender(impl_->connection_, address, h);
-    return blocking_sender(*this, snd);
-}
-
-namespace {
-struct fetcher_guard{
-    fetcher_guard(fetcher *f) : fetcher_(f) { if (fetcher_) fetcher_->incref(); }
-    ~fetcher_guard() { if (fetcher_) fetcher_->decref(); }
-    fetcher* fetcher_;
-};
-}
-
-blocking_receiver blocking_connection::create_receiver(const std::string &address, int credit,
-                                                       bool dynamic, handler *handler, std::string name) {
-    fetcher *f = NULL;
-    if (!handler) {
-        f = new fetcher(*this, credit);
-        handler = f;
-    }
-    fetcher_guard fg(f);
-    counted_ptr<receiver> rcv = impl_->container_.create_receiver(impl_->connection_, address, dynamic, handler);
-    blocking_receiver brcv(*this, rcv, f, credit);
-    return brcv;
-}
+duration blocking_connection::timeout() { return impl_->container_->reactor().timeout(); }
 
-duration blocking_connection::timeout() { return impl_->timeout(); }
+connection& blocking_connection::connection() { return *impl_->connection_; }
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/src/blocking_connection_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_connection_impl.cpp b/proton-c/bindings/cpp/src/blocking_connection_impl.cpp
index e552830..e18e78b 100644
--- a/proton-c/bindings/cpp/src/blocking_connection_impl.cpp
+++ b/proton-c/bindings/cpp/src/blocking_connection_impl.cpp
@@ -21,92 +21,81 @@
 #include "proton/container.hpp"
 #include "proton/messaging_handler.hpp"
 #include "proton/duration.hpp"
+#include "proton/connection.h"
 #include "proton/error.hpp"
+
 #include "blocking_connection_impl.hpp"
 #include "msg.hpp"
 #include "contexts.hpp"
 
-#include "proton/connection.h"
 
 namespace proton {
 
-void blocking_connection_impl::incref(blocking_connection_impl *impl_) {
-    impl_->refcount_++;
-}
-
-void blocking_connection_impl::decref(blocking_connection_impl *impl_) {
-    impl_->refcount_--;
-    if (impl_->refcount_ == 0)
-        delete impl_;
-}
-
 namespace {
-struct connection_opening {
+struct connection_opening : public blocking_connection_impl::condition {
     connection_opening(pn_connection_t *c) : pn_connection(c) {}
-    bool operator()() { return (pn_connection_state(pn_connection) & PN_REMOTE_UNINIT); }
+    bool operator()() const { return (pn_connection_state(pn_connection) & PN_REMOTE_UNINIT); }
     pn_connection_t *pn_connection;
 };
 
-struct connection_closed {
+struct connection_closed : public blocking_connection_impl::condition {
     connection_closed(pn_connection_t *c) : pn_connection(c) {}
-    bool operator()() { return !(pn_connection_state(pn_connection) & PN_REMOTE_ACTIVE); }
+    bool operator()() const { return !(pn_connection_state(pn_connection) & PN_REMOTE_ACTIVE); }
     pn_connection_t *pn_connection;
 };
-
 }
 
-
-blocking_connection_impl::blocking_connection_impl(const url &u, duration timeout0, ssl_domain *ssld, container *c)
-    : url_(u), timeout_(timeout0), refcount_(0)
+blocking_connection_impl::blocking_connection_impl(const url& url, duration timeout) :
+    container_(new container())
 {
-    if (c)
-        container_ = *c;
-    container_.start();
-    container_.timeout(timeout_);
-    // Create connection and send the connection events here
-    connection_ = container_.connect(url_, static_cast<handler *>(this));
-    connection_opening cond(connection_.pn_connection());
-    wait(cond);
+    container_->reactor().start();
+    container_->reactor().timeout(timeout);
+    connection_ = container_->connect(url, this); // Set this as handler.
+    wait(connection_opening(pn_cast(connection_.get())));
 }
 
-blocking_connection_impl::~blocking_connection_impl() {
-    container_ = container();
-}
+blocking_connection_impl::~blocking_connection_impl() {}
 
 void blocking_connection_impl::close() {
-    connection_.close();
-    connection_closed cond(connection_.pn_connection());
-    wait(cond);
+    connection_->close();
+    wait(connection_closed(pn_cast(connection_.get())));
 }
 
-void blocking_connection_impl::wait(blocking_connection::condition &condition, const std::string &msg, duration wait_timeout) {
-    if (wait_timeout == duration(-1)) wait_timeout = timeout_;
+namespace {
+struct save_timeout {
+    reactor& reactor_;
+    duration timeout_;
+    save_timeout(reactor& r) : reactor_(r), timeout_(r.timeout()) {}
+    ~save_timeout() { reactor_.timeout(timeout_); }
+};
+}
+
+void blocking_connection_impl::wait(const condition &condition, const std::string &msg, duration wait_timeout)
+{
+    reactor& reactor = container_->reactor();
+
+    if (wait_timeout == duration(-1))
+        wait_timeout = container_->reactor().timeout();
+
     if (wait_timeout == duration::FOREVER) {
         while (!condition()) {
-            container_.process();
+            reactor.process();
         }
-    }
-
-    pn_reactor_t *reactor = container_.reactor();
-    pn_millis_t orig_timeout = pn_reactor_get_timeout(reactor);
-    pn_reactor_set_timeout(reactor, wait_timeout.milliseconds);
-    try {
-        pn_timestamp_t now = pn_reactor_mark(reactor);
-        pn_timestamp_t deadline = now + wait_timeout.milliseconds;
+    } else {
+        save_timeout st(reactor);
+        reactor.timeout(wait_timeout);
+        pn_timestamp_t deadline = pn_reactor_mark(pn_cast(&reactor)) + wait_timeout.milliseconds;
         while (!condition()) {
-            container_.process();
-            if (deadline < pn_reactor_mark(reactor)) {
-                std::string txt = "connection timed out";
-                if (!msg.empty())
-                    txt += ": " + msg;
-                throw timeout_error(txt);
+            reactor.process();
+            if (!condition()) {
+                pn_timestamp_t now = pn_reactor_mark(pn_cast(&reactor));
+                if (now < deadline)
+                    reactor.timeout(duration(deadline - now));
+                else
+                    throw timeout_error("connection timed out " + msg);
             }
         }
-    } catch (...) {
-        pn_reactor_set_timeout(reactor, orig_timeout);
-        throw;
     }
-    pn_reactor_set_timeout(reactor, orig_timeout);
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/src/blocking_connection_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_connection_impl.hpp b/proton-c/bindings/cpp/src/blocking_connection_impl.hpp
index e53dd81..d3570b9 100644
--- a/proton-c/bindings/cpp/src/blocking_connection_impl.hpp
+++ b/proton-c/bindings/cpp/src/blocking_connection_impl.hpp
@@ -1,5 +1,5 @@
-#ifndef PROTON_CPP_BLOCKINGCONNECTIONIMPL_H
-#define PROTON_CPP_BLOCKINGCONNECTIONIMPL_H
+#ifndef PROTON_CPP_BLOCKING_CONNECTION_IMPL_H
+#define PROTON_CPP_BLOCKING_CONNECTION_IMPL_H
 
 /*
  *
@@ -22,9 +22,9 @@
  *
  */
 #include "proton/export.hpp"
-#include "proton/endpoint.hpp"
 #include "proton/container.hpp"
-#include "proton/blocking_connection.hpp"
+#include "proton/messaging_handler.hpp"
+#include "proton/connection.hpp"
 #include "proton/types.h"
 #include <string>
 
@@ -34,36 +34,27 @@ namespace proton {
 
 class handler;
 class container;
-class ssl_domain;
 
- class blocking_connection_impl : public messaging_handler
+class blocking_connection_impl : public messaging_handler
 {
   public:
-    PN_CPP_EXTERN blocking_connection_impl(const url &url, duration d, ssl_domain *ssld, container *c);
-    PN_CPP_EXTERN ~blocking_connection_impl();
-    PN_CPP_EXTERN void close();
-    template <class C> void wait(C c, const std::string &msg="", duration timeout=duration(-1)) {
-        blocking_connection::condition_impl<C> cond(c);
-        wait(dynamic_cast<blocking_connection::condition&>(cond), msg, timeout);
-    }
+    blocking_connection_impl(const url &url, duration d);
+    ~blocking_connection_impl();
 
-    PN_CPP_EXTERN pn_connection_t *pn_blocking_connection();
-    duration timeout() { return timeout_; }
-    static void incref(blocking_connection_impl *);
-    static void decref(blocking_connection_impl *);
-  private:
-    friend class blocking_connection;
-    PN_CPP_EXTERN void wait(blocking_connection::condition &, const std::string & ="",
-                            duration=duration(-1));
+    void close();
 
-    container container_;
+    struct condition {
+        virtual ~condition() {}
+        virtual bool operator()() const = 0;
+    };
+
+    void wait(const condition&, const std::string & ="", duration=duration(-1));
+
+    PN_UNIQUE_PTR<container> container_;
     counted_ptr<connection> connection_;
-    url url_;
-    duration timeout_;
-    int refcount_;
 };
 
 
 }
 
-#endif  /*!PROTON_CPP_BLOCKINGCONNECTIONIMPL_H*/
+#endif  /*!PROTON_CPP_BLOCKING_CONNECTION_IMPL_H*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/src/blocking_fetcher.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_fetcher.cpp b/proton-c/bindings/cpp/src/blocking_fetcher.cpp
new file mode 100644
index 0000000..e170a88
--- /dev/null
+++ b/proton-c/bindings/cpp/src/blocking_fetcher.cpp
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "blocking_fetcher.hpp"
+#include "msg.hpp"
+
+#include "proton/event.hpp"
+
+namespace proton {
+
+blocking_fetcher::blocking_fetcher(int prefetch) : messaging_handler(prefetch, false /*auto_accept*/) {}
+
+void blocking_fetcher::on_message(event &e) {
+    messages_.push_back(e.message());
+    deliveries_.push_back(e.delivery());
+    // Wake up enclosing connection.wait()
+    e.container().reactor().yield();
+}
+
+void blocking_fetcher::on_link_error(event &e) {
+    link& lnk = e.link();
+    if (pn_link_state(pn_cast(&lnk)) & PN_LOCAL_ACTIVE) {
+        lnk.close();
+        throw error(MSG("Link detached: " << lnk.name()));
+    }
+}
+
+bool blocking_fetcher::has_message() { return !messages_.empty(); }
+
+message_value blocking_fetcher::pop() {
+    if (messages_.empty())
+        throw error(MSG("receiver has no messages"));
+    counted_ptr<delivery> dlv(deliveries_.front());
+    if (!dlv->settled())
+        unsettled_.push_back(dlv);
+    message_value m = messages_.front();
+    messages_.pop_front();
+    deliveries_.pop_front();
+    return m;
+}
+
+void blocking_fetcher::settle(delivery::state state) {
+    counted_ptr<delivery> dlv = unsettled_.front();
+    if (state)
+        dlv->update(state);
+    dlv->settle();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/src/blocking_fetcher.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_fetcher.hpp b/proton-c/bindings/cpp/src/blocking_fetcher.hpp
new file mode 100644
index 0000000..f0a8d17
--- /dev/null
+++ b/proton-c/bindings/cpp/src/blocking_fetcher.hpp
@@ -0,0 +1,48 @@
+#ifndef PROTON_CPP_FETCHER_H
+#define PROTON_CPP_FETCHER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/messaging_handler.hpp"
+#include "proton/message.hpp"
+#include <string>
+#include <deque>
+
+namespace proton {
+
+class blocking_fetcher : public messaging_handler {
+  public:
+    blocking_fetcher(int prefetch);
+    void on_message(event &e);
+    void on_link_error(event &e);
+    bool has_message();
+    message_value pop();
+    void settle(delivery::state state = delivery::NONE);
+
+  private:
+    std::deque<message_value> messages_;
+    std::deque<counted_ptr<delivery> > deliveries_;
+    std::deque<counted_ptr<delivery> > unsettled_;
+};
+
+}
+
+#endif  /*!PROTON_CPP_FETCHER_H*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/src/blocking_link.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_link.cpp b/proton-c/bindings/cpp/src/blocking_link.cpp
index d8110c6..5fdb8ec 100644
--- a/proton-c/bindings/cpp/src/blocking_link.cpp
+++ b/proton-c/bindings/cpp/src/blocking_link.cpp
@@ -20,65 +20,66 @@
  */
 #include "proton/blocking_link.hpp"
 #include "proton/blocking_connection.hpp"
-#include "proton/messaging_handler.hpp"
+#include "proton/connection.hpp"
 #include "proton/error.hpp"
+#include "proton/link.hpp"
+
+#include "blocking_connection_impl.hpp"
 #include "msg.hpp"
 
+#include <proton/link.h>
 
 namespace proton {
 
 namespace {
-struct link_opened {
+struct link_opened : public blocking_connection_impl::condition {
     link_opened(pn_link_t *l) : pn_link(l) {}
-    bool operator()() { return !(pn_link_state(pn_link) & PN_REMOTE_UNINIT); }
+    bool operator()() const { return !(pn_link_state(pn_link) & PN_REMOTE_UNINIT); }
     pn_link_t *pn_link;
 };
 
-struct link_closed {
+struct link_closed : public blocking_connection_impl::condition {
     link_closed(pn_link_t *l) : pn_link(l) {}
-    bool operator()() { return (pn_link_state(pn_link) & PN_REMOTE_CLOSED); }
+    bool operator()() const { return (pn_link_state(pn_link) & PN_REMOTE_CLOSED); }
     pn_link_t *pn_link;
 };
 
-struct link_not_open {
+struct link_not_open : public blocking_connection_impl::condition {
     link_not_open(pn_link_t *l) : pn_link(l) {}
-    bool operator()() { return !(pn_link_state(pn_link) & PN_REMOTE_ACTIVE); }
+    bool operator()() const { return !(pn_link_state(pn_link) & PN_REMOTE_ACTIVE); }
     pn_link_t *pn_link;
 };
 
-
 } // namespace
 
+blocking_link::blocking_link(blocking_connection &c) : connection_(c) {}
 
-blocking_link::blocking_link(blocking_connection *c, pn_link_t *pnl) : connection_(*c), link_(pnl) {
-    std::string msg = "Opening link " + link_.name();
-    link_opened link_opened(pn_cast(link_));
-    connection_.wait(link_opened, msg);
+void blocking_link::open(proton::link& l) {
+    link_ = l;
+    connection_.impl_->wait(link_opened(pn_cast(link_.get())), "opening link " + link_->name());
     check_closed();
 }
 
 blocking_link::~blocking_link() {}
 
-void blocking_link::wait_for_closed(duration timeout) {
-    std::string msg = "Closing link " + link_.name();
-    link_closed link_closed(pn_cast(link_));
-    connection_.wait(link_closed, msg);
+void blocking_link::wait_for_closed() {
+    link_closed link_closed(pn_cast(link_.get()));
+    connection_.impl_->wait(link_closed, "closing link " + link_->name());
     check_closed();
 }
 
 void blocking_link::check_closed() {
-    pn_link_t * pn_link = pn_cast(link_);
+    pn_link_t * pn_link = pn_cast(link_.get());
     if (pn_link_state(pn_link) & PN_REMOTE_CLOSED) {
-        link_.close();
-        throw error(MSG("Link detached: " << link_.name()));
+        link_->close();
+        throw error(MSG("Link detached: " << link_->name()));
     }
 }
 
 void blocking_link::close() {
-    link_.close();
-    std::string msg = "Closing link " + link_.name();
-    link_not_open link_not_open(pn_cast(link_));
-    connection_.wait(link_not_open, msg);
+    link_->close();
+    link_not_open link_not_open(pn_cast(link_.get()));
+    connection_.impl_->wait(link_not_open, "closing link " + link_->name());
 }
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/src/blocking_receiver.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_receiver.cpp b/proton-c/bindings/cpp/src/blocking_receiver.cpp
index 2f7b277..10a3585 100644
--- a/proton-c/bindings/cpp/src/blocking_receiver.cpp
+++ b/proton-c/bindings/cpp/src/blocking_receiver.cpp
@@ -18,11 +18,15 @@
  * under the License.
  *
  */
-#include "proton/blocking_receiver.hpp"
 #include "proton/blocking_connection.hpp"
+#include "proton/blocking_receiver.hpp"
+#include "proton/receiver.hpp"
+#include "proton/connection.hpp"
 #include "proton/receiver.hpp"
 #include "proton/error.hpp"
-#include "fetcher.hpp"
+
+#include "blocking_connection_impl.hpp"
+#include "blocking_fetcher.hpp"
 #include "msg.hpp"
 
 
@@ -30,62 +34,42 @@ namespace proton {
 
 namespace {
 
-struct fetcher_has_message {
-    fetcher_has_message(fetcher &f) : fetcher_(f) {}
-    bool operator()() { return fetcher_.has_message(); }
-    fetcher &fetcher_;
+struct fetcher_has_message : public blocking_connection_impl::condition {
+    fetcher_has_message(blocking_fetcher &f) : fetcher_(f) {}
+    bool operator()() const { return fetcher_.has_message(); }
+    blocking_fetcher &fetcher_;
 };
 
 } // namespace
 
-
-blocking_receiver::blocking_receiver(blocking_connection &c, receiver &l, fetcher *f, int credit)
-    : blocking_link(&c, &l), fetcher_(f) {
-    std::string sa = link_.source().address();
-    std::string rsa = link_.remote_source().address();
+blocking_receiver::blocking_receiver(
+    class blocking_connection &c, const std::string& addr, int credit, bool dynamic) :
+    blocking_link(c), fetcher_(new blocking_fetcher(credit))
+{
+    open(c.impl_->connection_->create_receiver(addr, dynamic, fetcher_.get()));
+    std::string sa = link_->source().address();
+    std::string rsa = link_->remote_source().address();
     if (!sa.empty() && sa.compare(rsa) != 0) {
         wait_for_closed();
-        link_.close();
-        std::string txt = "Failed to open receiver " + link_.name() + ", source does not match";
+        link_->close();
+        std::string txt = "Failed to open receiver " + link_->name() + ", source does not match";
         throw error(MSG(txt));
     }
     if (credit)
         pn_link_flow(pn_cast(link_), credit);
-    if (fetcher_)
-        fetcher_->incref();
 }
 
-blocking_receiver::blocking_receiver(const blocking_receiver& r) : blocking_link(r), fetcher_(r.fetcher_) {
-    if (fetcher_)
-        fetcher_->incref();
-}
-blocking_receiver& blocking_receiver::operator=(const blocking_receiver& r) {
-    if (this == &r) return *this;
-    fetcher_ = r.fetcher_;
-    if (fetcher_)
-        fetcher_->incref();
-    return *this;
-}
-blocking_receiver::~blocking_receiver() {
-    if (fetcher_)
-        fetcher_->decref();
-}
-
-
+blocking_receiver::~blocking_receiver() { link_->detach_handler(); }
 
-message blocking_receiver::receive(duration timeout) {
-    if (!fetcher_)
-        throw error(MSG("Can't call receive on this receiver as a handler was provided"));
-    receiver rcv(pn_cast(link_));
-    if (!rcv.credit())
-        rcv.flow(1);
-    std::string txt = "Receiving on receiver " + link_.name();
+message_value blocking_receiver::receive(duration timeout) {
+    if (!receiver().credit())
+        receiver().flow(1);
     fetcher_has_message cond(*fetcher_);
-    connection_.wait(cond, txt, timeout);
+    connection_.impl_->wait(cond, "receiving on receiver " + link_->name(), timeout);
     return fetcher_->pop();
 }
 
-message blocking_receiver::receive() {
+message_value blocking_receiver::receive() {
     // Use default timeout
     return receive(connection_.timeout());
 }
@@ -106,35 +90,13 @@ void blocking_receiver::release(bool delivered) {
 }
 
 void blocking_receiver::settle(delivery::state state = delivery::NONE) {
-    if (!fetcher_)
-        throw error(MSG("Can't call accept/reject etc on this receiver as a handler was provided"));
     fetcher_->settle(state);
 }
 
 void blocking_receiver::flow(int count) {
-    receiver rcv(pn_cast(link_));
-    rcv.flow(count);
+    receiver().flow(count);
 }
 
-int blocking_receiver::credit() {
-    return link_.credit();
-}
-
-terminus blocking_receiver::source() {
-    return link_.source();
-}
-
-terminus blocking_receiver::target() {
-    return link_.target();
-}
+receiver& blocking_receiver::receiver() { return link_->receiver(); }
 
-terminus blocking_receiver::remote_source() {
-    return link_.remote_source();
 }
-
-terminus blocking_receiver::remote_target() {
-    return link_.remote_target();
-}
-
-
-} // namespace

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/src/blocking_sender.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_sender.cpp b/proton-c/bindings/cpp/src/blocking_sender.cpp
index 5f0ff45..3d12fce 100644
--- a/proton-c/bindings/cpp/src/blocking_sender.cpp
+++ b/proton-c/bindings/cpp/src/blocking_sender.cpp
@@ -23,44 +23,46 @@
 #include "proton/sender.hpp"
 #include "proton/receiver.hpp"
 #include "proton/error.hpp"
+#include "blocking_connection_impl.hpp"
 #include "msg.hpp"
 
-
 namespace proton {
 
 namespace {
-struct delivery_settled {
+struct delivery_settled : public blocking_connection_impl::condition {
     delivery_settled(pn_delivery_t *d) : pn_delivery(d) {}
-    bool operator()() { return pn_delivery_settled(pn_delivery); }
+    bool operator()() const { return pn_delivery_settled(pn_delivery); }
     pn_delivery_t *pn_delivery;
 };
-
 } // namespace
 
-
-blocking_sender::blocking_sender(blocking_connection &c, sender &l) : blocking_link(&c, pn_cast(l)) {
-    std::string ta = link_.target().address();
-    std::string rta = link_.remote_target().address();
-    if (ta.empty() || ta.compare(rta) != 0) {
+blocking_sender::blocking_sender(blocking_connection &c, const std::string &address) :
+    blocking_link(c)
+{
+    open(c.impl_->connection_->create_sender(address));
+    std::string ta = link_->target().address();
+    std::string rta = link_->remote_target().address();
+    if (ta.empty() || ta.compare(rta) != 0) { 
         wait_for_closed();
-        link_.close();
-        std::string txt = "Failed to open sender " + link_.name() + ", target does not match";
+        link_->close();
+        std::string txt = "Failed to open sender " + link_->name() + ", target does not match";
         throw error(MSG(txt));
     }
 }
 
-counted_ptr<delivery> blocking_sender::send(message &msg, duration timeout) {
-    sender snd(pn_cast(link_));
-    counted_ptr<delivery> dlv = snd.send(msg);
-    std::string txt = "Sending on sender " + link_.name();
-    delivery_settled cond(pn_cast(dlv.get()));
-    connection_.wait(cond, txt, timeout);
+blocking_sender::~blocking_sender() {}
+
+delivery& blocking_sender::send(const message_value &msg, duration timeout) {
+    delivery& dlv = sender().send(msg);
+    connection_.impl_->wait(delivery_settled(pn_cast(&dlv)), "sending on sender " + link_->name(), timeout);
     return dlv;
 }
 
-delivery blocking_sender::send(message &msg) {
+delivery& blocking_sender::send(const message_value &msg) {
     // Use default timeout
     return send(msg, connection_.timeout());
 }
 
+sender& blocking_sender::sender() { return link_->sender(); }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/src/connection.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection.cpp b/proton-c/bindings/cpp/src/connection.cpp
index fc48b68..8948bd9 100644
--- a/proton-c/bindings/cpp/src/connection.cpp
+++ b/proton-c/bindings/cpp/src/connection.cpp
@@ -77,4 +77,6 @@ receiver& connection::create_receiver(const std::string &addr, bool dynamic, han
     return default_session().create_receiver(addr, dynamic, h);
 }
 
+endpoint::state connection::state() { return pn_connection_state(pn_cast(this)); }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/src/container.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container.cpp b/proton-c/bindings/cpp/src/container.cpp
index 9d9bc85..574f5a7 100644
--- a/proton-c/bindings/cpp/src/container.cpp
+++ b/proton-c/bindings/cpp/src/container.cpp
@@ -53,12 +53,8 @@ reactor &container::reactor() { return *impl_->reactor_; }
 
 std::string container::container_id() { return impl_->container_id_; }
 
-duration container::timeout() { return impl_->reactor_->timeout(); }
-void container::timeout(duration timeout) { impl_->reactor_->timeout(timeout); }
-
 void container::run() { impl_->reactor_->run(); }
 
-
 sender& container::create_sender(const proton::url &url) {
     return impl_->create_sender(url);
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/src/fetcher.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/fetcher.cpp b/proton-c/bindings/cpp/src/fetcher.cpp
deleted file mode 100644
index ec716f4..0000000
--- a/proton-c/bindings/cpp/src/fetcher.cpp
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "fetcher.hpp"
-#include "proton/event.hpp"
-
-namespace proton {
-
-fetcher::fetcher(blocking_connection &c, int prefetch) :
-    messaging_handler(prefetch, false), // no auto_accept
-    connection_(c), refcount_(0), pn_link_(0) {
-}
-
-void fetcher::incref() { refcount_++; }
-void fetcher::decref() {
-    refcount_--;
-    if (!refcount_) {
-        // fetcher needs to outlive its blocking_receiver unless disconnected from reactor
-        if (pn_link_) {
-            pn_record_set_handler(pn_link_attachments(pn_link_), 0);
-            pn_decref(pn_link_);
-        }
-        delete this;
-        return;
-    }
-}
-
-void fetcher::on_link_init(event &e) {
-    pn_link_ = pn_cast(&e.link());
-    pn_incref(pn_link_);
-}
-
-void fetcher::on_message(event &e) {
-    messages_.push_back(e.message());
-    deliveries_.push_back(e.delivery());
-    // Wake up enclosing blocking_connection.wait()
-    e.container().yield();
-}
-
-void fetcher::on_link_error(event &e) {
-    link& lnk = e.link();
-    if (pn_link_state(pn_cast(&lnk)) & PN_LOCAL_ACTIVE) {
-        lnk.close();
-        throw error(MSG("Link detached: " << lnk.name()));
-    }
-}
-
-void fetcher::on_connection_error(event &e) {
-    throw error(MSG("Connection closed"));
-}
-
-bool fetcher::has_message() {
-    return !messages_.empty();
-}
-
-message fetcher::pop() {
-    if (messages_.empty())
-        throw error(MSG("blocking_receiver has no messages"));
-    counted_ptr<delivery> dlv(deliveries_.front());
-    if (!dlv->settled())
-        unsettled_.push_back(dlv);
-    message m = messages_.front();
-    messages_.pop_front();
-    deliveries_.pop_front();
-    return m;
-}
-
-void fetcher::settle(delivery::state state) {
-    counted_ptr<delivery> dlv = unsettled_.front();
-    if (state)
-        dlv->update(state);
-    dlv->settle();
-}
-
-} // namespace

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/src/fetcher.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/fetcher.hpp b/proton-c/bindings/cpp/src/fetcher.hpp
deleted file mode 100644
index bb83a63..0000000
--- a/proton-c/bindings/cpp/src/fetcher.hpp
+++ /dev/null
@@ -1,58 +0,0 @@
-#ifndef PROTON_CPP_FETCHER_H
-#define PROTON_CPP_FETCHER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "proton/container.hpp"
-#include "proton/messaging_handler.hpp"
-#include "proton/blocking_connection.hpp"
-#include "proton/error.hpp"
-#include "msg.hpp"
-#include <string>
-#include <deque>
-
-namespace proton {
-
-class fetcher : public messaging_handler {
-  private:
-    blocking_connection connection_;
-    std::deque<message> messages_;
-    std::deque<counted_ptr<delivery> > deliveries_;
-    std::deque<counted_ptr<delivery> > unsettled_;
-    int refcount_;
-    pn_link_t *pn_link_;
-  public:
-    fetcher(blocking_connection &c, int p);
-    void incref();
-    void decref();
-    void on_message(event &e);
-    void on_link_error(event &e);
-    void on_connection_error(event &e);
-    void on_link_init(event &e);
-    bool has_message();
-    message pop();
-    void settle(delivery::state state = delivery::NONE);
-};
-
-
-}
-
-#endif  /*!PROTON_CPP_FETCHER_H*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/src/link.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/link.cpp b/proton-c/bindings/cpp/src/link.cpp
index 2f309e3..12c8790 100644
--- a/proton-c/bindings/cpp/src/link.cpp
+++ b/proton-c/bindings/cpp/src/link.cpp
@@ -76,11 +76,17 @@ link* link::next(endpoint::state mask) {
     return link::cast(pn_link_next(pn_cast(this), (pn_state_t) mask));
 }
 
-void link::handler(class handler& h) {
+void link::handler(class handler &h) {
     pn_record_t *record = pn_link_attachments(pn_cast(this));
     connection_context& cc(connection_context::get(pn_cast(&connection())));
     counted_ptr<pn_handler_t> chandler = cc.container_impl->cpp_handler(&h);
     pn_record_set_handler(record, chandler.get());
 }
 
+void link::detach_handler() {
+    pn_record_t *record = pn_link_attachments(pn_cast(this));
+    pn_record_set_handler(record, 0);
+}
+
+endpoint::state link::state() { return pn_link_state(pn_cast(this)); }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/src/message.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/message.cpp b/proton-c/bindings/cpp/src/message.cpp
index 8e5c0e4..9b26bac 100644
--- a/proton-c/bindings/cpp/src/message.cpp
+++ b/proton-c/bindings/cpp/src/message.cpp
@@ -204,5 +204,12 @@ void message::decode(proton::link &link, proton::delivery &delivery) {
     pn_link_advance(pn_cast(&link));
 }
 
+void message_value::swap(message_value& x) {
+    // This works with unique_ptr and auto_ptr (which has no swap)
+    message* a = message_.release();
+    message* b = x.message_.release();
+    message_.reset(b);
+    x.message_.reset(a);
+}
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/src/messaging_handler.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/messaging_handler.cpp b/proton-c/bindings/cpp/src/messaging_handler.cpp
index 188d739..01226f7 100644
--- a/proton-c/bindings/cpp/src/messaging_handler.cpp
+++ b/proton-c/bindings/cpp/src/messaging_handler.cpp
@@ -22,17 +22,19 @@
 #include "proton/proton_event.hpp"
 #include "proton/messaging_adapter.hpp"
 #include "proton/handlers.h"
+#include <algorithm>
 
 namespace proton {
 
 namespace {
-class Cflow_controller : public proton_handler
+class c_flow_controller : public proton_handler
 {
   public:
     pn_handler_t *flowcontroller;
 
-    Cflow_controller(int window) : flowcontroller(pn_flowcontroller(window)) {}
-    ~Cflow_controller() {
+    // TODO: pn_flowcontroller requires a window > 1. 
+    c_flow_controller(int window) : flowcontroller(pn_flowcontroller(std::max(window, 2))) {}
+    ~c_flow_controller() {
         pn_decref(flowcontroller);
     }
 
@@ -68,17 +70,14 @@ messaging_handler::messaging_handler(bool raw_handler, int prefetch0, bool auto_
 
 void messaging_handler::create_helpers() {
     if (prefetch_ > 0) {
-        flow_controller_ = new Cflow_controller(prefetch_);
+        flow_controller_.reset(new c_flow_controller(prefetch_));
         add_child_handler(*flow_controller_);
     }
-    messaging_adapter_ = new messaging_adapter(*this);
+    messaging_adapter_.reset(new messaging_adapter(*this));
     add_child_handler(*messaging_adapter_);
 }
 
-messaging_handler::~messaging_handler(){
-    if (flow_controller_) delete flow_controller_;
-    if (messaging_adapter_) delete messaging_adapter_;
-}
+messaging_handler::~messaging_handler(){}
 
 void messaging_handler::on_abort(event &e) { on_unhandled(e); }
 void messaging_handler::on_accepted(event &e) { on_unhandled(e); }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/src/receiver.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/receiver.cpp b/proton-c/bindings/cpp/src/receiver.cpp
index 8d1fdbe..96a116d 100644
--- a/proton-c/bindings/cpp/src/receiver.cpp
+++ b/proton-c/bindings/cpp/src/receiver.cpp
@@ -33,4 +33,6 @@ void receiver::flow(int count) {
     pn_link_flow(pn_cast(this), count);
 }
 
+receiver* receiver::cast(pn_type* p) { return &link::cast(p)->receiver(); }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/src/sender.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/sender.cpp b/proton-c/bindings/cpp/src/sender.cpp
index 5c1d26f..08e24d7 100644
--- a/proton-c/bindings/cpp/src/sender.cpp
+++ b/proton-c/bindings/cpp/src/sender.cpp
@@ -41,7 +41,7 @@ namespace {
 std::uint64_t tag_counter = 0;
 }
 
-delivery& sender::send(message &message) {
+delivery& sender::send(const message &message) {
     std::uint64_t id = ++tag_counter;
     pn_delivery_t *dlv =
         pn_delivery(pn_cast(this), pn_dtag(reinterpret_cast<const char*>(&id), sizeof(id)));
@@ -54,4 +54,6 @@ delivery& sender::send(message &message) {
     return *delivery::cast(dlv);
 }
 
+sender* sender::cast(pn_type* p) { return &link::cast(p)->sender(); }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/src/session.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/session.cpp b/proton-c/bindings/cpp/src/session.cpp
index 11d5f30..c9d8fcb 100644
--- a/proton-c/bindings/cpp/src/session.cpp
+++ b/proton-c/bindings/cpp/src/session.cpp
@@ -74,5 +74,5 @@ receiver& session::create_receiver(const std::string &addr, bool dynamic, handle
     return rcv;
 }
 
-
+endpoint::state session::state() { return pn_session_state(pn_cast(this)); }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/src/sync_request_response.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/sync_request_response.cpp b/proton-c/bindings/cpp/src/sync_request_response.cpp
index 5098a6f..1c9b220 100644
--- a/proton-c/bindings/cpp/src/sync_request_response.cpp
+++ b/proton-c/bindings/cpp/src/sync_request_response.cpp
@@ -18,59 +18,40 @@
  * under the License.
  *
  */
+#include "proton/blocking_connection.hpp"
 #include "proton/sync_request_response.hpp"
 #include "proton/event.hpp"
 #include "proton/error.hpp"
+#include "proton/data.hpp"
+#include "blocking_connection_impl.hpp"
 #include "msg.hpp"
 
 namespace proton {
 
-namespace {
-amqp_ulong global_correlation_id = 0;
-
-struct response_received {
-    response_received(PN_UNIQUE_OR_AUTO_PTR<message>& m, amqp_ulong id) : message_(m), id_(id) {}
-    bool operator()() { return message_.get() && message_->correlation_id() == id_; }
-    PN_UNIQUE_OR_AUTO_PTR<message>& message_;
-    value id_;
-};
-
-}
-
 sync_request_response::sync_request_response(blocking_connection &conn, const std::string addr):
     connection_(conn), address_(addr),
-    sender_(connection_.create_sender(addr)),
-    receiver_(connection_.create_receiver("", 1/*credit*/, true/*dynamic*/, this))
-{
-}
+    sender_(new blocking_sender(connection_, addr)),
+    receiver_(new blocking_receiver(connection_, "", 1/*credit*/, true/*dynamic*/)),
+    correlation_id_(0)
+{}
 
-message sync_request_response::call(message &request) {
+message_value sync_request_response::call(message &request) {
     if (address_.empty() && request.address().empty())
         throw error(MSG("Request message has no address"));
-    // TODO: thread safe increment.
-    correlation_id_ = global_correlation_id++;
-    request.correlation_id(value(correlation_id_));
+    // TODO: atomic increment.
+    data_value cid(++correlation_id_);
+    request.correlation_id(cid);
     request.reply_to(this->reply_to());
-    sender_.send(request);
-    std::string txt("Waiting for response");
-    response_received cond(response_, correlation_id_);
-    connection_.wait(cond, txt);
-    message resp = *response_;
-    response_.reset(0);
-    receiver_.flow(1);
-    return resp;
+    sender_->send(request);
+    message_value response;
+    while (response.correlation_id() != cid) {
+        response = receiver_->receive();
+    }
+    return response;
 }
 
 std::string sync_request_response::reply_to() {
-    return receiver_.remote_source().address();
+    return receiver_->receiver().remote_source().address();
 }
 
-void sync_request_response::on_message(event &e) {
-    response_.reset(new message);
-    std::swap(*response_, e.message());
-    // Wake up enclosing blocking_connection.wait() to handle the message
-    e.container().yield();
 }
-
-
-} // namespace

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c25d4c2/proton-c/bindings/cpp/src/uuid.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/uuid.hpp b/proton-c/bindings/cpp/src/uuid.hpp
index 67ed581..59cf121 100644
--- a/proton-c/bindings/cpp/src/uuid.hpp
+++ b/proton-c/bindings/cpp/src/uuid.hpp
@@ -19,6 +19,8 @@
  * under the License.
  */
 
+#include <proton/types.hpp>
+
 #include <string>
 
 namespace proton {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message