qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cliffjan...@apache.org
Subject qpid-proton git commit: PROTON-865: Blocking sender functionality and handler per connection
Date Wed, 20 May 2015 14:54:26 GMT
Repository: qpid-proton
Updated Branches:
  refs/heads/cjansen-cpp-client e0db6ea98 -> ef29b07c3


PROTON-865: Blocking sender functionality and handler per connection


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ef29b07c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ef29b07c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ef29b07c

Branch: refs/heads/cjansen-cpp-client
Commit: ef29b07c3bbc11d30d24e34a8dc9d10a7af2d495
Parents: e0db6ea
Author: Clifford Jansen <cliffjansen@apache.org>
Authored: Wed May 20 07:54:03 2015 -0700
Committer: Clifford Jansen <cliffjansen@apache.org>
Committed: Wed May 20 07:54:03 2015 -0700

----------------------------------------------------------------------
 proton-c/bindings/cpp/CMakeLists.txt            |   7 +
 .../cpp/examples/HelloWorldBlocking.cpp         |  65 +++++++
 .../cpp/include/proton/cpp/BlockingConnection.h |  67 +++++++
 .../cpp/include/proton/cpp/BlockingLink.h       |  59 +++++++
 .../cpp/include/proton/cpp/BlockingSender.h     |  54 ++++++
 .../cpp/include/proton/cpp/Connection.h         |   2 +-
 .../bindings/cpp/include/proton/cpp/Container.h |  15 +-
 .../bindings/cpp/include/proton/cpp/Delivery.h  |   4 +-
 .../bindings/cpp/include/proton/cpp/Duration.h  |  56 ++++++
 proton-c/bindings/cpp/include/proton/cpp/Link.h |   1 +
 .../cpp/include/proton/cpp/MessagingAdapter.h   |   3 -
 .../cpp/include/proton/cpp/MessagingHandler.h   |   6 +
 .../bindings/cpp/include/proton/cpp/Sender.h    |   3 +-
 .../cpp/include/proton/cpp/WaitCondition.h      |  45 +++++
 proton-c/bindings/cpp/src/Connection.cpp        |   4 +-
 proton-c/bindings/cpp/src/ConnectionImpl.cpp    |  28 ++-
 proton-c/bindings/cpp/src/ConnectionImpl.h      |   3 +-
 proton-c/bindings/cpp/src/Container.cpp         |  25 ++-
 proton-c/bindings/cpp/src/ContainerImpl.cpp     | 175 +++++++++++--------
 proton-c/bindings/cpp/src/ContainerImpl.h       |  24 ++-
 proton-c/bindings/cpp/src/Duration.cpp          |  55 ++++++
 proton-c/bindings/cpp/src/Link.cpp              |   4 +
 proton-c/bindings/cpp/src/MessagingAdapter.cpp  |   6 +-
 proton-c/bindings/cpp/src/MessagingHandler.cpp  |  57 +++++-
 proton-c/bindings/cpp/src/Sender.cpp            |   3 +-
 .../cpp/src/blocking/BlockingConnection.cpp     |  62 +++++++
 .../cpp/src/blocking/BlockingConnectionImpl.cpp | 124 +++++++++++++
 .../cpp/src/blocking/BlockingConnectionImpl.h   |  63 +++++++
 .../bindings/cpp/src/blocking/BlockingLink.cpp  |  86 +++++++++
 .../cpp/src/blocking/BlockingSender.cpp         |  66 +++++++
 30 files changed, 1060 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index fad26ea..2506203 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -40,6 +40,7 @@ set (qpid-proton-cpp-core
     src/Terminus.cpp
     src/Acceptor.cpp
     src/Url.cpp
+    src/Duration.cpp
     src/Message.cpp
     src/MessagingAdapter.cpp
     src/MessagingEvent.cpp
@@ -55,6 +56,10 @@ set (qpid-proton-cpp-core
     src/Logger.cpp
     src/contexts.cpp
     src/exceptions.cpp
+    src/blocking/BlockingConnection.cpp
+    src/blocking/BlockingConnectionImpl.cpp
+    src/blocking/BlockingLink.cpp
+    src/blocking/BlockingSender.cpp
   )
 
 #set_source_files_properties (
@@ -102,6 +107,8 @@ add_executable (SimpleSend examples/SimpleSend.cpp)
 target_link_libraries (SimpleSend qpid-proton-cpp)
 add_executable (Broker examples/Broker.cpp)
 target_link_libraries (Broker qpid-proton-cpp)
+add_executable (HelloWorldBlocking examples/HelloWorldBlocking.cpp)
+target_link_libraries (HelloWorldBlocking qpid-proton-cpp)
 
 
 install (TARGETS qpid-proton-cpp

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/examples/HelloWorldBlocking.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/examples/HelloWorldBlocking.cpp b/proton-c/bindings/cpp/examples/HelloWorldBlocking.cpp
new file mode 100644
index 0000000..a3f729c
--- /dev/null
+++ b/proton-c/bindings/cpp/examples/HelloWorldBlocking.cpp
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/cpp/Container.h"
+#include "proton/cpp/MessagingHandler.h"
+#include "proton/cpp/BlockingSender.h"
+
+#include <iostream>
+
+
+using namespace proton::reactor;
+
+class HelloWorldBlocking : public MessagingHandler {
+  private:
+    std::string server;
+    std::string address;
+  public:
+
+    HelloWorldBlocking(const std::string &s, const std::string &addr) : server(s), address(addr) {}
+
+    void onStart(Event &e) {
+        Connection conn = e.getContainer().connect(server);
+        e.getContainer().createReceiver(conn, address);
+    }
+
+    void onMessage(Event &e) {
+        std::string body = e.getMessage().getBody();
+        std::cout << body << std::endl;
+        e.getConnection().close();
+    }
+
+};
+
+int main(int argc, char **argv) {
+    std::string url("localhost:5672");
+    std::string addr("examples");
+    BlockingConnection conn = BlockingConnection(url);
+    BlockingSender sender = conn.createSender(addr);
+    Message m;
+    m.setBody("Hello World!");
+    sender.send(m);
+    conn.close();
+
+    // Temporary hack until blocking receiver available
+    HelloWorldBlocking hw("localhost:5672", "examples");
+    Container(hw).run();
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/BlockingConnection.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/BlockingConnection.h b/proton-c/bindings/cpp/include/proton/cpp/BlockingConnection.h
new file mode 100644
index 0000000..aa268db
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/cpp/BlockingConnection.h
@@ -0,0 +1,67 @@
+#ifndef PROTON_CPP_BLOCKINGCONNECTION_H
+#define PROTON_CPP_BLOCKINGCONNECTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/cpp/ImportExport.h"
+#include "proton/cpp/Handle.h"
+#include "proton/cpp/Endpoint.h"
+#include "proton/cpp/Container.h"
+#include "proton/cpp/Duration.h"
+#include "proton/cpp/MessagingHandler.h"
+#include "proton/types.h"
+#include <string>
+
+struct pn_connection_t;
+
+namespace proton {
+namespace reactor {
+
+class Container;
+class BlockingConnectionImpl;
+class SslDomain;
+class BlockingSender;
+class WaitCondition;
+
+class BlockingConnection : public Handle<BlockingConnectionImpl>
+{
+  public:
+    PROTON_CPP_EXTERN BlockingConnection();
+    PROTON_CPP_EXTERN BlockingConnection(const BlockingConnection& c);
+    PROTON_CPP_EXTERN BlockingConnection& operator=(const BlockingConnection& c);
+    PROTON_CPP_EXTERN ~BlockingConnection();
+
+    PROTON_CPP_EXTERN BlockingConnection(std::string &url, Duration = Duration::FOREVER,
+                                         SslDomain *ssld=0, Container *c=0);
+    PROTON_CPP_EXTERN void close();
+
+    PROTON_CPP_EXTERN BlockingSender createSender(std::string &address, Handler *h=0);
+    PROTON_CPP_EXTERN void wait(WaitCondition &condition);
+    PROTON_CPP_EXTERN void wait(WaitCondition &condition, std::string &msg, Duration timeout=Duration::FOREVER);
+    PROTON_CPP_EXTERN Duration getTimeout();
+  private:
+    friend class PrivateImplRef<BlockingConnection>;
+};
+
+
+}} // namespace proton::reactor
+
+#endif  /*!PROTON_CPP_BLOCKINGCONNECTION_H*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/BlockingLink.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/BlockingLink.h b/proton-c/bindings/cpp/include/proton/cpp/BlockingLink.h
new file mode 100644
index 0000000..7f84ce8
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/cpp/BlockingLink.h
@@ -0,0 +1,59 @@
+#ifndef PROTON_CPP_BLOCKINGLINK_H
+#define PROTON_CPP_BLOCKINGLINK_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/cpp/ImportExport.h"
+#include "proton/cpp/Handle.h"
+#include "proton/cpp/Endpoint.h"
+#include "proton/cpp/Container.h"
+#include "proton/cpp/Duration.h"
+#include "proton/cpp/MessagingHandler.h"
+#include "proton/cpp/BlockingConnection.h"
+#include "proton/types.h"
+#include <string>
+
+namespace proton {
+namespace reactor {
+
+class BlockingConnection;
+
+class BlockingLink
+{
+  public:
+    PROTON_CPP_EXTERN void close();
+    ~BlockingLink();
+  protected:
+    PROTON_CPP_EXTERN BlockingLink(BlockingConnection *c, pn_link_t *l);
+    PROTON_CPP_EXTERN void waitForClosed(Duration timeout=Duration::SECOND);
+  private:
+    BlockingConnection connection;
+    Link link;
+    void checkClosed();
+    friend class BlockingConnection;
+    friend class BlockingSender;
+    friend class BlockingReceiver;
+};
+
+
+}} // namespace proton::reactor
+
+#endif  /*!PROTON_CPP_BLOCKINGLINK_H*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/BlockingSender.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/BlockingSender.h b/proton-c/bindings/cpp/include/proton/cpp/BlockingSender.h
new file mode 100644
index 0000000..d4ddeae
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/cpp/BlockingSender.h
@@ -0,0 +1,54 @@
+#ifndef PROTON_CPP_BLOCKINGSENDER_H
+#define PROTON_CPP_BLOCKINGSENDER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/cpp/ImportExport.h"
+#include "proton/cpp/Handle.h"
+#include "proton/cpp/Endpoint.h"
+#include "proton/cpp/Container.h"
+#include "proton/cpp/Duration.h"
+#include "proton/cpp/MessagingHandler.h"
+#include "proton/cpp/BlockingLink.h"
+#include "proton/types.h"
+#include "proton/delivery.h"
+#include <string>
+
+namespace proton {
+namespace reactor {
+
+class BlockingConnection;
+class BlockingLink;
+
+class BlockingSender : public BlockingLink
+{
+  public:
+    PROTON_CPP_EXTERN Delivery send(Message &msg);
+    PROTON_CPP_EXTERN Delivery send(Message &msg, Duration timeout);
+  private:
+    PROTON_CPP_EXTERN BlockingSender(BlockingConnection &c, Sender &l);
+    friend class BlockingConnection;
+};
+
+
+}} // namespace proton::reactor
+
+#endif  /*!PROTON_CPP_BLOCKINGSENDER_H*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/Connection.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Connection.h b/proton-c/bindings/cpp/include/proton/cpp/Connection.h
index 86abbe6..f3397ce 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/Connection.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/Connection.h
@@ -47,7 +47,7 @@ class Connection : public Endpoint, public Handle<ConnectionImpl>
     PROTON_CPP_EXTERN Connection& operator=(const Connection& c);
     PROTON_CPP_EXTERN ~Connection();
 
-    PROTON_CPP_EXTERN Connection(Container &c);
+    PROTON_CPP_EXTERN Connection(Container &c, Handler *h = 0);
     PROTON_CPP_EXTERN Transport &getTransport();
     PROTON_CPP_EXTERN Handler *getOverride();
     PROTON_CPP_EXTERN void setOverride(Handler *h);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/Container.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Container.h b/proton-c/bindings/cpp/include/proton/cpp/Container.h
index d596ab1..1d7284d 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/Container.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/Container.h
@@ -24,6 +24,7 @@
 #include "proton/cpp/ImportExport.h"
 #include "proton/cpp/Handle.h"
 #include "proton/cpp/Acceptor.h"
+#include "proton/cpp/Duration.h"
 #include <proton/reactor.h>
 #include <string>
 
@@ -39,6 +40,7 @@ class MessagingHandler;
 class Sender;
 class Receiver;
 class Link;
+ class Handler;
 
 class Container : public Handle<ContainerImpl>
 {
@@ -48,17 +50,24 @@ class Container : public Handle<ContainerImpl>
     PROTON_CPP_EXTERN Container& operator=(const Container& c);
     PROTON_CPP_EXTERN ~Container();
 
+    PROTON_CPP_EXTERN Container();
     PROTON_CPP_EXTERN Container(MessagingHandler &mhandler);
-    PROTON_CPP_EXTERN Connection connect(std::string &host);
+    PROTON_CPP_EXTERN Connection connect(std::string &host, Handler *h=0);
     PROTON_CPP_EXTERN void run();
+    PROTON_CPP_EXTERN void start();
+    PROTON_CPP_EXTERN bool process();
+    PROTON_CPP_EXTERN void stop();
+    PROTON_CPP_EXTERN void wakeup();
+    PROTON_CPP_EXTERN bool isQuiesced();
     PROTON_CPP_EXTERN pn_reactor_t *getReactor();
-    PROTON_CPP_EXTERN pn_handler_t *getGlobalHandler();
-    PROTON_CPP_EXTERN Sender createSender(Connection &connection, std::string &addr);
+    PROTON_CPP_EXTERN Sender createSender(Connection &connection, std::string &addr, Handler *h=0);
     PROTON_CPP_EXTERN Sender createSender(std::string &url);
     PROTON_CPP_EXTERN Receiver createReceiver(Connection &connection, std::string &addr);
     PROTON_CPP_EXTERN Receiver createReceiver(const std::string &url);
     PROTON_CPP_EXTERN Acceptor listen(const std::string &url);
     PROTON_CPP_EXTERN std::string getContainerId();
+    PROTON_CPP_EXTERN Duration getTimeout();
+    PROTON_CPP_EXTERN void setTimeout(Duration timeout);
   private:
    friend class PrivateImplRef<Container>;
 };

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/Delivery.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Delivery.h b/proton-c/bindings/cpp/include/proton/cpp/Delivery.h
index a1965f6..8171dd5 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/Delivery.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/Delivery.h
@@ -22,9 +22,11 @@
  *
  */
 #include "proton/cpp/ImportExport.h"
-#include "proton/cpp/Link.h"
+#include "proton/cpp/ProtonHandle.h"
 
 #include "ProtonImplRef.h"
+
+#include "proton/delivery.h"
 #include "proton/disposition.h"
 
 namespace proton {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/Duration.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Duration.h b/proton-c/bindings/cpp/include/proton/cpp/Duration.h
new file mode 100644
index 0000000..4e8f474
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/cpp/Duration.h
@@ -0,0 +1,56 @@
+#ifndef PROTON_CPP_DURATION_H
+#define PROTON_CPP_DURATION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/cpp/ImportExport.h"
+#include "proton/types.h"
+
+namespace proton {
+namespace reactor {
+
+/**   \ingroup C++ 
+ * A duration is a time in milliseconds.
+ */
+class Duration
+{
+  public:
+    PROTON_CPP_EXTERN explicit Duration(uint64_t milliseconds);
+    PROTON_CPP_EXTERN uint64_t getMilliseconds() const;
+    PROTON_CPP_EXTERN static const Duration FOREVER;
+    PROTON_CPP_EXTERN static const Duration IMMEDIATE;
+    PROTON_CPP_EXTERN static const Duration SECOND;
+    PROTON_CPP_EXTERN static const Duration MINUTE;
+  private:
+    uint64_t milliseconds;
+};
+
+PROTON_CPP_EXTERN Duration operator*(const Duration& duration,
+                                         uint64_t multiplier);
+PROTON_CPP_EXTERN Duration operator*(uint64_t multiplier,
+                                         const Duration& duration);
+PROTON_CPP_EXTERN bool operator==(const Duration& a, const Duration& b);
+PROTON_CPP_EXTERN bool operator!=(const Duration& a, const Duration& b);
+
+}} // namespace proton::reactor
+
+#endif  /*!PROTON_CPP_DURATION_H*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/Link.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Link.h b/proton-c/bindings/cpp/include/proton/cpp/Link.h
index 265d80d..391e5fc 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/Link.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/Link.h
@@ -50,6 +50,7 @@ class Link : public Endpoint, public ProtonHandle<pn_link_t>
     PROTON_CPP_EXTERN Terminus getTarget();
     PROTON_CPP_EXTERN Terminus getRemoteSource();
     PROTON_CPP_EXTERN Terminus getRemoteTarget();
+    PROTON_CPP_EXTERN std::string getName();
     PROTON_CPP_EXTERN pn_link_t *getPnLink() const;
     virtual PROTON_CPP_EXTERN Connection &getConnection();
     PROTON_CPP_EXTERN Link getNext(Endpoint::State mask);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h b/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h
index 36a92e4..280df5b 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h
@@ -71,9 +71,6 @@ class MessagingAdapter : public MessagingHandler
     PROTON_CPP_EXTERN virtual void onTransportTailClosed(Event &e);
   private:
     MessagingHandler &delegate;  // The handler for generated MessagingEvent's
-    bool autoSettle;
-    bool autoAccept;
-    bool peerCloseIsError;
 };
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h b/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h
index 89d582c..b157360 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h
@@ -30,6 +30,7 @@ namespace proton {
 namespace reactor {
 
 class Event;
+class MessagingAdapter;
 
 class PROTON_CPP_EXTERN MessagingHandler : public ProtonHandler , public Acking
 {
@@ -80,9 +81,14 @@ class PROTON_CPP_EXTERN MessagingHandler : public ProtonHandler , public Acking
     bool autoSettle;
     bool autoAccept;
     bool peerCloseIsError;
+    MessagingAdapter *messagingAdapter;
+    Handler *flowController;
+    PROTON_CPP_EXTERN MessagingHandler(bool rawHandler, int prefetch=10, bool autoAccept=true, bool autoSettle=true, 
+                                       bool peerCloseIsError=false);
   private:
     friend class ContainerImpl;
     friend class MessagingAdapter;
+    void createHelpers();
 };
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/Sender.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Sender.h b/proton-c/bindings/cpp/include/proton/cpp/Sender.h
index 9b8683d..c63161c 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/Sender.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/Sender.h
@@ -22,6 +22,7 @@
  *
  */
 #include "proton/cpp/ImportExport.h"
+#include "proton/cpp/Delivery.h"
 #include "proton/cpp/Link.h"
 #include "proton/cpp/Message.h"
 
@@ -40,7 +41,7 @@ class Sender : public Link
     PROTON_CPP_EXTERN Sender(pn_link_t *lnk);
     PROTON_CPP_EXTERN Sender();
     PROTON_CPP_EXTERN Sender(const Link& c);
-    PROTON_CPP_EXTERN void send(Message &m);
+    PROTON_CPP_EXTERN Delivery send(Message &m);
   protected:
     virtual void verifyType(pn_link_t *l);
 };

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/include/proton/cpp/WaitCondition.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/WaitCondition.h b/proton-c/bindings/cpp/include/proton/cpp/WaitCondition.h
new file mode 100644
index 0000000..f4c7cb5
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/cpp/WaitCondition.h
@@ -0,0 +1,45 @@
+#ifndef PROTON_CPP_WAITCONDITION_H
+#define PROTON_CPP_WAITCONDITION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/cpp/ImportExport.h"
+
+namespace proton {
+namespace reactor {
+
+// Interface class to indicates that an expected contion has been
+// achieved, i.e. for BlockingConnection.wait()
+
+class WaitCondition
+{
+  public:
+    PROTON_CPP_EXTERN virtual ~WaitCondition();
+
+    // Overide this member function to indicate whether an expected
+    // condition is achieved and requires no further waiting.
+    virtual bool achieved() = 0;
+};
+
+
+}} // namespace proton::reactor
+
+#endif  /*!PROTON_CPP_WAITCONDITION_H*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/Connection.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Connection.cpp b/proton-c/bindings/cpp/src/Connection.cpp
index 1db8fbc..67e7d0c 100644
--- a/proton-c/bindings/cpp/src/Connection.cpp
+++ b/proton-c/bindings/cpp/src/Connection.cpp
@@ -42,8 +42,8 @@ Connection::Connection(const Connection& c) : Handle<ConnectionImpl>() { PI::cop
 Connection& Connection::operator=(const Connection& c) { return PI::assign(*this, c); }
 Connection::~Connection() { PI::dtor(*this); }
 
-Connection::Connection(Container &c) {
-    ConnectionImpl *cimpl = new ConnectionImpl(c);
+Connection::Connection(Container &c, Handler *h) {
+    ConnectionImpl *cimpl = new ConnectionImpl(c, h);
     PI::ctor(*this, cimpl);
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/ConnectionImpl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/ConnectionImpl.cpp b/proton-c/bindings/cpp/src/ConnectionImpl.cpp
index 9cadffe..f7cc5f9 100644
--- a/proton-c/bindings/cpp/src/ConnectionImpl.cpp
+++ b/proton-c/bindings/cpp/src/ConnectionImpl.cpp
@@ -25,6 +25,8 @@
 #include "proton/cpp/Transport.h"
 #include "Msg.h"
 #include "contexts.h"
+#include "PrivateImplRef.h"
+#include "ContainerImpl.h"
 
 #include "proton/connection.h"
 
@@ -41,12 +43,25 @@ void ConnectionImpl::decref(ConnectionImpl *impl) {
         delete impl;
 }
 
-ConnectionImpl::ConnectionImpl(Container &c, pn_connection_t *pnConn) : container(c), refCount(0), override(0), transport(0), defaultSession(0),
-                                               pnConnection(pnConn),
-                                               reactorReference(this)
+ConnectionImpl::ConnectionImpl(Container &c, pn_connection_t &pnConn)
+    : container(c), refCount(0), override(0), transport(0), defaultSession(0),
+      pnConnection(&pnConn), reactorReference(this)
 {
-    if (!pnConnection)
-        pnConnection = pn_reactor_connection(container.getReactor(), NULL);
+    setConnectionContext(pnConnection, this);
+}
+
+ConnectionImpl::ConnectionImpl(Container &c, Handler *handler)
+    : container(c), refCount(0), override(0), transport(0), defaultSession(0),
+      reactorReference(this)
+{
+    pn_handler_t *chandler = 0;
+    if (handler) {
+        ContainerImpl *containerImpl = PrivateImplRef<Container>::get(c);
+        chandler = containerImpl->wrapHandler(handler);
+    }
+    pnConnection = pn_reactor_connection(container.getReactor(), chandler);
+    if (chandler)
+        pn_decref(chandler);
     setConnectionContext(pnConnection, this);
 }
 
@@ -112,7 +127,7 @@ Connection &ConnectionImpl::getReactorReference(pn_connection_t *conn) {
         Container container(getContainerContext(reactor));
         if (!container)  // can't be one created by our container
             throw ProtonException(MSG("Unknown Proton connection specifier"));
-        impl = new ConnectionImpl(container, conn);
+        impl = new ConnectionImpl(container, *conn);
     }
     return impl->reactorReference;
 }
@@ -121,5 +136,4 @@ Link ConnectionImpl::getLinkHead(Endpoint::State mask) {
     return Link(pn_link_head(pnConnection, mask));
 }
 
-
 }} // namespace proton::reactor

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/ConnectionImpl.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/ConnectionImpl.h b/proton-c/bindings/cpp/src/ConnectionImpl.h
index 11b5765..48210a3 100644
--- a/proton-c/bindings/cpp/src/ConnectionImpl.h
+++ b/proton-c/bindings/cpp/src/ConnectionImpl.h
@@ -39,7 +39,8 @@ class Container;
 class ConnectionImpl : public Endpoint
 {
   public:
-    PROTON_CPP_EXTERN ConnectionImpl(Container &c, pn_connection_t *pnConn = 0);
+    PROTON_CPP_EXTERN ConnectionImpl(Container &c, pn_connection_t &pnConn);
+    PROTON_CPP_EXTERN ConnectionImpl(Container &c, Handler *h = 0);
     PROTON_CPP_EXTERN ~ConnectionImpl();
     PROTON_CPP_EXTERN Transport &getTransport();
     PROTON_CPP_EXTERN Handler *getOverride();

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/Container.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Container.cpp b/proton-c/bindings/cpp/src/Container.cpp
index 4eccb15..e72f484 100644
--- a/proton-c/bindings/cpp/src/Container.cpp
+++ b/proton-c/bindings/cpp/src/Container.cpp
@@ -53,17 +53,23 @@ Container::Container(MessagingHandler &mhandler) {
     PI::ctor(*this, cimpl);
 }
 
-Connection Container::connect(std::string &host) { return impl->connect(host); }
+Container::Container() {
+    ContainerImpl *cimpl = new ContainerImpl();
+    PI::ctor(*this, cimpl);
+}
 
-pn_reactor_t *Container::getReactor() { return impl->getReactor(); }
+Connection Container::connect(std::string &host, Handler *h) { return impl->connect(host, h); }
 
-pn_handler_t *Container::getGlobalHandler() { return impl->getGlobalHandler(); }
+pn_reactor_t *Container::getReactor() { return impl->getReactor(); }
 
 std::string Container::getContainerId() { return impl->getContainerId(); }
 
+Duration Container::getTimeout() { return impl->getTimeout(); }
+void Container::setTimeout(Duration timeout) { impl->setTimeout(timeout); }
+
 
-Sender Container::createSender(Connection &connection, std::string &addr) {
-    return impl->createSender(connection, addr);
+Sender Container::createSender(Connection &connection, std::string &addr, Handler *h) {
+    return impl->createSender(connection, addr, h);
 }
 
 Sender Container::createSender(std::string &urlString) {
@@ -83,8 +89,11 @@ Acceptor Container::listen(const std::string &urlString) {
 }
 
 
-void Container::run() {
-    impl->run();
-}
+void Container::run() { impl->run(); }
+void Container::start() { impl->start(); }
+bool Container::process() { return impl->process(); }
+void Container::stop() { impl->stop(); }
+void Container::wakeup() { impl->wakeup(); }
+bool Container::isQuiesced() { return impl->isQuiesced(); }
 
 }} // namespace proton::reactor

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/ContainerImpl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/ContainerImpl.cpp b/proton-c/bindings/cpp/src/ContainerImpl.cpp
index 0b75f1c..1424dbb 100644
--- a/proton-c/bindings/cpp/src/ContainerImpl.cpp
+++ b/proton-c/bindings/cpp/src/ContainerImpl.cpp
@@ -116,41 +116,19 @@ class OverrideHandler : public Handler
         pn_handler_dispatch(baseHandler, cevent, (pn_event_type_t) type);
 
         if (conn && type == PN_CONNECTION_FINAL) {
-            //  TODO:  this must be the last acation of the last handler looking at
+            //  TODO:  this must be the last action of the last handler looking at
             //  connection events. Better: generate a custom FINAL event (or task).  Or move to
             //  separate event streams per connection as part of multi threading support.
             ConnectionImpl *cimpl = getConnectionContext(conn);
             if (cimpl)
                 cimpl->reactorDetach();
-            // TODO: remember all connections and do reactorDetach of zombies connections
-            // not pn_connection_release'd at PN_REACTOR_FINAL.
+            // TODO: remember all connections and do reactorDetach of zombie connections
+            // not yet pn_connection_release'd at PN_REACTOR_FINAL.
         }
     }
 };
 
 
-class CFlowController : public ProtonHandler
-{
-  public:
-    pn_handler_t *flowcontroller;
-
-    CFlowController(int window) : flowcontroller(pn_flowcontroller(window)) {}
-    ~CFlowController() {
-        pn_decref(flowcontroller);
-    }
-
-    void redirect(Event &e) {
-        ProtonEvent *pne = dynamic_cast<ProtonEvent *>(&e);
-        pn_handler_dispatch(flowcontroller, pne->getPnEvent(), (pn_event_type_t) pne->getType());
-    }
-
-    virtual void onLinkLocalOpen(Event &e) { redirect(e); }
-    virtual void onLinkRemoteOpen(Event &e) { redirect(e); }
-    virtual void onLinkFlow(Event &e) { redirect(e); }
-    virtual void onDelivery(Event &e) { redirect(e); }
-};
-
-
 namespace {
 
 // TODO: configurable policy.  SessionPerConnection for now.
@@ -165,7 +143,6 @@ Session getDefaultSession(pn_connection_t *conn, pn_session_t **ses) {
 
 struct InboundContext {
     ContainerImpl *containerImpl;
-    Container containerRef;  // create only once for all inbound events
     Handler *cppHandler;
 };
 
@@ -174,11 +151,6 @@ ContainerImpl *getContainerImpl(pn_handler_t *c_handler) {
     return ctxt->containerImpl;
 }
 
-Container &getContainerRef(pn_handler_t *c_handler) {
-    struct InboundContext *ctxt = (struct InboundContext *) pn_handler_mem(c_handler);
-    return ctxt->containerRef;
-}
-
 Handler &getCppHandler(pn_handler_t *c_handler) {
     struct InboundContext *ctxt = (struct InboundContext *) pn_handler_mem(c_handler);
     return *ctxt->cppHandler;
@@ -186,21 +158,19 @@ Handler &getCppHandler(pn_handler_t *c_handler) {
 
 void cpp_handler_dispatch(pn_handler_t *c_handler, pn_event_t *cevent, pn_event_type_t type)
 {
-    MessagingEvent mevent(cevent, type, getContainerRef(c_handler));
+    Container c(getContainerImpl(c_handler)); // Ref counted per event, but when is the last event if stop() never called?
+    MessagingEvent mevent(cevent, type, c);
     mevent.dispatch(getCppHandler(c_handler));
 }
 
 void cpp_handler_cleanup(pn_handler_t *c_handler)
 {
-    struct InboundContext *ctxt = (struct InboundContext *) pn_handler_mem(c_handler);
-    ctxt->containerRef.~Container();
 }
 
 pn_handler_t *cpp_handler(ContainerImpl *c, Handler *h)
 {
     pn_handler_t *handler = pn_handler_new(cpp_handler_dispatch, sizeof(struct InboundContext), cpp_handler_cleanup);
     struct InboundContext *ctxt = (struct InboundContext *) pn_handler_mem(handler);
-    new (&ctxt->containerRef) Container(c);
     ctxt->containerImpl = c;
     ctxt->cppHandler = h;
     return handler;
@@ -220,18 +190,29 @@ void ContainerImpl::decref(ContainerImpl *impl) {
         delete impl;
 }
 
-ContainerImpl::ContainerImpl(MessagingHandler &mhandler) :
-    reactor(0), globalHandler(0), messagingHandler(mhandler), containerId(generateUuid()),
+ContainerImpl::ContainerImpl(Handler &h) :
+    reactor(0), handler(&h), messagingAdapter(0),
+    overrideHandler(0), flowController(0), containerId(generateUuid()),
     refCount(0)
-{
-}
+{}
+
+ContainerImpl::ContainerImpl() :
+    reactor(0), handler(0), messagingAdapter(0),
+    overrideHandler(0), flowController(0), containerId(generateUuid()),
+    refCount(0)
+{}
 
-ContainerImpl::~ContainerImpl() {}
+ContainerImpl::~ContainerImpl() {
+    delete overrideHandler;
+    delete flowController;
+    delete messagingAdapter;
+    pn_reactor_free(reactor);
+}
 
-Connection ContainerImpl::connect(std::string &host) {
-    if (!reactor) throw ProtonException(MSG("Container not initialized"));
+Connection ContainerImpl::connect(std::string &host, Handler *h) {
+    if (!reactor) throw ProtonException(MSG("Container not started"));
     Container cntnr(this);
-    Connection connection(cntnr);
+    Connection connection(cntnr, handler);
     Connector *connector = new Connector(connection);
     // Connector self-deletes depending on reconnect logic
     connector->setAddress(host);  // TODO: url vector
@@ -242,15 +223,36 @@ Connection ContainerImpl::connect(std::string &host) {
 
 pn_reactor_t *ContainerImpl::getReactor() { return reactor; }
 
-pn_handler_t *ContainerImpl::getGlobalHandler() { return globalHandler; }
 
 std::string ContainerImpl::getContainerId() { return containerId; }
 
+Duration ContainerImpl::getTimeout() {
+    pn_millis_t tmo = pn_reactor_get_timeout(reactor);
+    if (tmo == PN_MILLIS_MAX)
+        return Duration::FOREVER;
+    return Duration(tmo);
+}
 
-Sender ContainerImpl::createSender(Connection &connection, std::string &addr) {
+void ContainerImpl::setTimeout(Duration timeout) {
+    if (timeout == Duration::FOREVER || timeout.getMilliseconds() > PN_MILLIS_MAX)
+        pn_reactor_set_timeout(reactor, PN_MILLIS_MAX);
+    else {
+        pn_millis_t tmo = timeout.getMilliseconds();
+        pn_reactor_set_timeout(reactor, tmo);
+    }
+}
+
+
+Sender ContainerImpl::createSender(Connection &connection, std::string &addr, Handler *h) {
+    if (!reactor) throw ProtonException(MSG("Container not started"));
     Session session = getDefaultSession(connection.getPnConnection(), &getImpl(connection)->defaultSession);
     Sender snd = session.createSender(containerId  + '-' + addr);
-    pn_terminus_set_address(pn_link_target(snd.getPnLink()), addr.c_str());
+    pn_link_t *lnk = snd.getPnLink();
+    pn_terminus_set_address(pn_link_target(lnk), addr.c_str());
+    if (h) {
+        pn_record_t *record = pn_link_attachments(lnk);
+        pn_record_set_handler(record, wrapHandler(h));
+    }
     snd.open();
 
     ConnectionImpl *connImpl = getImpl(connection);
@@ -258,7 +260,8 @@ Sender ContainerImpl::createSender(Connection &connection, std::string &addr) {
 }
 
 Sender ContainerImpl::createSender(std::string &urlString) {
-    Connection conn = connect(urlString);
+    if (!reactor) throw ProtonException(MSG("Container not started"));
+    Connection conn = connect(urlString, 0);
     Session session = getDefaultSession(conn.getPnConnection(), &getImpl(conn)->defaultSession);
     std::string path = Url(urlString).getPath();
     Sender snd = session.createSender(containerId + '-' + path);
@@ -270,6 +273,7 @@ Sender ContainerImpl::createSender(std::string &urlString) {
 }
 
 Receiver ContainerImpl::createReceiver(Connection &connection, std::string &addr) {
+    if (!reactor) throw ProtonException(MSG("Container not started"));
     ConnectionImpl *connImpl = getImpl(connection);
     Session session = getDefaultSession(connImpl->pnConnection, &connImpl->defaultSession);
     Receiver rcv = session.createReceiver(containerId + '-' + addr);
@@ -279,8 +283,9 @@ Receiver ContainerImpl::createReceiver(Connection &connection, std::string &addr
 }
 
 Receiver ContainerImpl::createReceiver(const std::string &urlString) {
+    if (!reactor) throw ProtonException(MSG("Container not started"));
     // TODO: const cleanup of API
-    Connection conn = connect(const_cast<std::string &>(urlString));
+    Connection conn = connect(const_cast<std::string &>(urlString), 0);
     Session session = getDefaultSession(conn.getPnConnection(), &getImpl(conn)->defaultSession);
     std::string path = Url(urlString).getPath();
     Receiver rcv = session.createReceiver(containerId + '-' + path);
@@ -298,50 +303,76 @@ Acceptor ContainerImpl::acceptor(const std::string &host, const std::string &por
 }
 
 Acceptor ContainerImpl::listen(const std::string &urlString) {
+    if (!reactor) throw ProtonException(MSG("Container not started"));
     Url url(urlString);
     // TODO: SSL
     return acceptor(url.getHost(), url.getPort());
 }
 
 
-void ContainerImpl::run() {
+pn_handler_t *ContainerImpl::wrapHandler(Handler *h) {
+    return cpp_handler(this, h);
+}
+
+
+void ContainerImpl::initializeReactor() {
+    if (reactor) throw ProtonException(MSG("Container already running"));
     reactor = pn_reactor();
 
     // Set our context on the reactor
     setContainerContext(reactor, this);
 
-    int prefetch = messagingHandler.prefetch; 
-    Handler *flowController = 0;
-
-    // Set the reactor's main/default handler (see note below)
-    if (prefetch) {
-        flowController = new CFlowController(prefetch);
-        messagingHandler.addChildHandler(*flowController);
+    if (handler) {
+        pn_handler_t *cppHandler = cpp_handler(this, handler);
+        pn_reactor_set_handler(reactor, cppHandler);
+        pn_decref(cppHandler);
     }
-    MessagingAdapter messagingAdapter(messagingHandler);
-    messagingHandler.addChildHandler(messagingAdapter);
-    pn_handler_t *cppHandler = cpp_handler(this, &messagingHandler);
-    pn_reactor_set_handler(reactor, cppHandler);
 
     // Set our own global handler that "subclasses" the existing one
-    pn_handler_t *cGlobalHandler = pn_reactor_get_global_handler(reactor);
-    pn_incref(cGlobalHandler);
-    OverrideHandler overrideHandler(cGlobalHandler);
-    pn_handler_t *cppGlobalHandler = cpp_handler(this, &overrideHandler);
+    pn_handler_t *globalHandler = pn_reactor_get_global_handler(reactor);
+    overrideHandler = new OverrideHandler(globalHandler);
+    pn_handler_t *cppGlobalHandler = cpp_handler(this, overrideHandler);
     pn_reactor_set_global_handler(reactor, cppGlobalHandler);
+    pn_decref(cppGlobalHandler);
 
     // Note: we have just set up the following 4/5 handlers that see events in this order:
     // messagingHandler (Proton C events), pn_flowcontroller (optional), messagingAdapter,
-    // messagingHandler (Messaging events from the messagingAdapter), connector override,
-    // the reactor's default globalhandler (pn_iohandler)
+    // messagingHandler (Messaging events from the messagingAdapter, i.e. the delegate),
+    // connector override, the reactor's default globalhandler (pn_iohandler)
+}
+
+void ContainerImpl::run() {
+    initializeReactor();
     pn_reactor_run(reactor);
+}
 
-    pn_decref(cppHandler);
-    pn_decref(cppGlobalHandler);
-    pn_decref(cGlobalHandler);
-    pn_reactor_free(reactor);
-    reactor = 0;
-    delete(flowController);
+void ContainerImpl::start() {
+    initializeReactor();
+    pn_reactor_start(reactor);
+}
+
+bool ContainerImpl::process() {
+    if (!reactor) throw ProtonException(MSG("Container not started"));
+    bool result = pn_reactor_process(reactor);
+    // TODO: check errors
+    return result;
+}
+
+void ContainerImpl::stop() {
+    if (!reactor) throw ProtonException(MSG("Container not started"));
+    pn_reactor_stop(reactor);
+    // TODO: check errors
+}
+
+void ContainerImpl::wakeup() {
+    if (!reactor) throw ProtonException(MSG("Container not started"));
+    pn_reactor_wakeup(reactor);
+    // TODO: check errors
+}
+
+bool ContainerImpl::isQuiesced() {
+    if (!reactor) throw ProtonException(MSG("Container not started"));
+    return pn_reactor_quiesced(reactor);
 }
 
 }} // namespace proton::reactor

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/ContainerImpl.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/ContainerImpl.h b/proton-c/bindings/cpp/src/ContainerImpl.h
index f7b5b9e..65a6651 100644
--- a/proton-c/bindings/cpp/src/ContainerImpl.h
+++ b/proton-c/bindings/cpp/src/ContainerImpl.h
@@ -25,6 +25,7 @@
 #include "proton/cpp/MessagingHandler.h"
 #include "proton/cpp/Connection.h"
 #include "proton/cpp/Link.h"
+#include "proton/cpp/Duration.h"
 
 #include "proton/reactor.h"
 
@@ -40,26 +41,37 @@ class Acceptor;
 class ContainerImpl
 {
   public:
-    PROTON_CPP_EXTERN ContainerImpl(MessagingHandler &mhandler);
+    PROTON_CPP_EXTERN ContainerImpl(Handler &h);
+    PROTON_CPP_EXTERN ContainerImpl();
     PROTON_CPP_EXTERN ~ContainerImpl();
-    PROTON_CPP_EXTERN Connection connect(std::string &host);
+    PROTON_CPP_EXTERN Connection connect(std::string &host, Handler *h);
     PROTON_CPP_EXTERN void run();
     PROTON_CPP_EXTERN pn_reactor_t *getReactor();
-    PROTON_CPP_EXTERN pn_handler_t *getGlobalHandler();
-    PROTON_CPP_EXTERN Sender createSender(Connection &connection, std::string &addr);
+    PROTON_CPP_EXTERN Sender createSender(Connection &connection, std::string &addr, Handler *h);
     PROTON_CPP_EXTERN Sender createSender(std::string &url);
     PROTON_CPP_EXTERN Receiver createReceiver(Connection &connection, std::string &addr);
     PROTON_CPP_EXTERN Receiver createReceiver(const std::string &url);
     PROTON_CPP_EXTERN Acceptor listen(const std::string &url);
     PROTON_CPP_EXTERN std::string getContainerId();
+    PROTON_CPP_EXTERN Duration getTimeout();
+    PROTON_CPP_EXTERN void setTimeout(Duration timeout);
+    void start();
+    bool process();
+    void stop();
+    void wakeup();
+    bool isQuiesced();
+    pn_handler_t *wrapHandler(Handler *h);
     static void incref(ContainerImpl *);
     static void decref(ContainerImpl *);
   private:
     void dispatch(pn_event_t *event, pn_event_type_t type);
     Acceptor acceptor(const std::string &host, const std::string &port);
+    void initializeReactor();
     pn_reactor_t *reactor;
-    pn_handler_t *globalHandler;
-    MessagingHandler &messagingHandler;
+    Handler *handler;
+    MessagingAdapter *messagingAdapter;
+    Handler *overrideHandler;
+    Handler *flowController;
     std::string containerId;
     int refCount;
 };

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/Duration.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Duration.cpp b/proton-c/bindings/cpp/src/Duration.cpp
new file mode 100644
index 0000000..dac7899
--- /dev/null
+++ b/proton-c/bindings/cpp/src/Duration.cpp
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/cpp/Duration.h"
+#include <limits>
+
+namespace proton {
+namespace reactor {
+
+Duration::Duration(uint64_t ms) : milliseconds(ms) {}
+uint64_t Duration::getMilliseconds() const { return milliseconds; }
+
+Duration operator*(const Duration& duration, uint64_t multiplier)
+{
+    return Duration(duration.getMilliseconds() * multiplier);
+}
+
+Duration operator*(uint64_t multiplier, const Duration& duration)
+{
+    return Duration(duration.getMilliseconds() * multiplier);
+}
+
+bool operator==(const Duration& a, const Duration& b)
+{
+    return a.getMilliseconds() == b.getMilliseconds();
+}
+
+bool operator!=(const Duration& a, const Duration& b)
+{
+    return a.getMilliseconds() != b.getMilliseconds();
+}
+
+const Duration Duration::FOREVER(std::numeric_limits<uint64_t>::max());
+const Duration Duration::IMMEDIATE(0);
+const Duration Duration::SECOND(1000);
+const Duration Duration::MINUTE(SECOND * 60);
+
+}} // namespace proton::reactor

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/Link.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Link.cpp b/proton-c/bindings/cpp/src/Link.cpp
index aab01d9..59cf039 100644
--- a/proton-c/bindings/cpp/src/Link.cpp
+++ b/proton-c/bindings/cpp/src/Link.cpp
@@ -96,6 +96,10 @@ Terminus Link::getRemoteTarget() {
     return Terminus(pn_link_remote_target(impl), this);
 }
 
+std::string Link::getName() {
+    return std::string(pn_link_name(impl));
+}
+
 Connection &Link::getConnection() {
     pn_session_t *s = pn_link_session(impl);
     pn_connection_t *c = pn_session_connection(s);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/MessagingAdapter.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/MessagingAdapter.cpp b/proton-c/bindings/cpp/src/MessagingAdapter.cpp
index f2916db..625485e 100644
--- a/proton-c/bindings/cpp/src/MessagingAdapter.cpp
+++ b/proton-c/bindings/cpp/src/MessagingAdapter.cpp
@@ -32,14 +32,12 @@
 
 namespace proton {
 namespace reactor {
-
 MessagingAdapter::MessagingAdapter(MessagingHandler &delegate_) :
-    autoSettle(delegate_.autoSettle),
-    autoAccept(delegate_.autoAccept),
-    peerCloseIsError(delegate_.peerCloseIsError),
+    MessagingHandler(true, delegate_.prefetch, delegate_.autoSettle, delegate_.autoAccept, delegate_.peerCloseIsError),
     delegate(delegate_)
 {};
 
+
 MessagingAdapter::~MessagingAdapter(){};
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/MessagingHandler.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/MessagingHandler.cpp b/proton-c/bindings/cpp/src/MessagingHandler.cpp
index 7a4a5cb..925186a 100644
--- a/proton-c/bindings/cpp/src/MessagingHandler.cpp
+++ b/proton-c/bindings/cpp/src/MessagingHandler.cpp
@@ -26,11 +26,64 @@
 namespace proton {
 namespace reactor {
 
+namespace {
+class CFlowController : public ProtonHandler
+{
+  public:
+    pn_handler_t *flowcontroller;
+
+    CFlowController(int window) : flowcontroller(pn_flowcontroller(window)) {}
+    ~CFlowController() {
+        pn_decref(flowcontroller);
+    }
+
+    void redirect(Event &e) {
+        ProtonEvent *pne = dynamic_cast<ProtonEvent *>(&e);
+        pn_handler_dispatch(flowcontroller, pne->getPnEvent(), (pn_event_type_t) pne->getType());
+    }
+
+    virtual void onLinkLocalOpen(Event &e) { redirect(e); }
+    virtual void onLinkRemoteOpen(Event &e) { redirect(e); }
+    virtual void onLinkFlow(Event &e) { redirect(e); }
+    virtual void onDelivery(Event &e) { redirect(e); }
+};
+
+} // namespace
+
+
+
+
 MessagingHandler::MessagingHandler(int prefetch0, bool autoAccept0, bool autoSettle0, bool peerCloseIsError0) :
     prefetch(prefetch0), autoAccept(autoAccept0), autoSettle(autoSettle0), peerCloseIsError(peerCloseIsError0)
-{}
+{
+    createHelpers();
+}
+
+MessagingHandler::MessagingHandler(bool rawHandler, int prefetch0, bool autoAccept0, bool autoSettle0,
+                                   bool peerCloseIsError0) :
+    prefetch(prefetch0), autoAccept(autoAccept0), autoSettle(autoSettle0), peerCloseIsError(peerCloseIsError0)
+{
+    if (rawHandler) {
+        flowController = 0;
+        messagingAdapter = 0;
+    } else {
+        createHelpers();
+    }
+}
+
+void MessagingHandler::createHelpers() {
+    if (prefetch > 0) {
+        flowController = new CFlowController(prefetch);
+        addChildHandler(*flowController);
+    }
+    messagingAdapter = new MessagingAdapter(*this);
+    addChildHandler(*messagingAdapter);
+}
 
-MessagingHandler::~MessagingHandler(){};
+MessagingHandler::~MessagingHandler(){
+    delete flowController;
+    delete messagingAdapter;
+};
 
 void MessagingHandler::onAbort(Event &e) { onUnhandled(e); }
 void MessagingHandler::onAccepted(Event &e) { onUnhandled(e); }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/Sender.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Sender.cpp b/proton-c/bindings/cpp/src/Sender.cpp
index c521ad1..c8f962e 100644
--- a/proton-c/bindings/cpp/src/Sender.cpp
+++ b/proton-c/bindings/cpp/src/Sender.cpp
@@ -54,7 +54,7 @@ namespace{
 uint64_t tagCounter = 0;
 }
 
-void Sender::send(Message &message) {
+Delivery Sender::send(Message &message) {
     char tag[8];
     void *ptr = &tag;
     uint64_t id = ++tagCounter;
@@ -67,6 +67,7 @@ void Sender::send(Message &message) {
     pn_link_advance(link);
     if (pn_link_snd_settle_mode(link) == PN_SND_SETTLED)
         pn_delivery_settle(dlv);
+    return Delivery(dlv);
 }
 
 }} // namespace proton::reactor

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/blocking/BlockingConnection.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking/BlockingConnection.cpp b/proton-c/bindings/cpp/src/blocking/BlockingConnection.cpp
new file mode 100644
index 0000000..3fb6010
--- /dev/null
+++ b/proton-c/bindings/cpp/src/blocking/BlockingConnection.cpp
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/cpp/Container.h"
+#include "proton/cpp/BlockingConnection.h"
+#include "proton/cpp/BlockingSender.h"
+#include "proton/cpp/MessagingHandler.h"
+#include "proton/cpp/exceptions.h"
+#include "Msg.h"
+#include "BlockingConnectionImpl.h"
+#include "PrivateImplRef.h"
+
+namespace proton {
+namespace reactor {
+
+template class Handle<BlockingConnectionImpl>;
+typedef PrivateImplRef<BlockingConnection> PI;
+
+BlockingConnection::BlockingConnection() {PI::ctor(*this, 0); }
+
+BlockingConnection::BlockingConnection(const BlockingConnection& c) : Handle<BlockingConnectionImpl>() { PI::copy(*this, c); }
+
+BlockingConnection& BlockingConnection::operator=(const BlockingConnection& c) { return PI::assign(*this, c); }
+BlockingConnection::~BlockingConnection() { PI::dtor(*this); }
+
+BlockingConnection::BlockingConnection(std::string &url, Duration d, SslDomain *ssld, Container *c) {
+    BlockingConnectionImpl *cimpl = new BlockingConnectionImpl(url, d,ssld, c);
+    PI::ctor(*this, cimpl);
+}
+
+void BlockingConnection::close() { impl->close(); }
+
+void BlockingConnection::wait(WaitCondition &cond) { return impl->wait(cond); }
+void BlockingConnection::wait(WaitCondition &cond, std::string &msg, Duration timeout) {
+    return impl->wait(cond, msg, timeout);
+}
+
+BlockingSender BlockingConnection::createSender(std::string &address, Handler *h) {
+    Sender sender = impl->container.createSender(impl->connection, address, h);
+    return BlockingSender(*this, sender);
+}
+
+Duration BlockingConnection::getTimeout() { return impl->getTimeout(); }
+
+}} // namespace proton::reactor

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.cpp b/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.cpp
new file mode 100644
index 0000000..39adb87
--- /dev/null
+++ b/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.cpp
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/cpp/Container.h"
+#include "proton/cpp/MessagingHandler.h"
+#include "proton/cpp/Duration.h"
+#include "proton/cpp/exceptions.h"
+#include "proton/cpp/WaitCondition.h"
+#include "BlockingConnectionImpl.h"
+#include "Msg.h"
+#include "contexts.h"
+
+#include "proton/connection.h"
+
+namespace proton {
+namespace reactor {
+
+WaitCondition::~WaitCondition() {}
+
+
+void BlockingConnectionImpl::incref(BlockingConnectionImpl *impl) {
+    impl->refCount++;
+}
+
+void BlockingConnectionImpl::decref(BlockingConnectionImpl *impl) {
+    impl->refCount--;
+    if (impl->refCount == 0)
+        delete impl;
+}
+
+namespace {
+struct ConnectionOpening : public WaitCondition {
+    ConnectionOpening(pn_connection_t *c) : pnConnection(c) {}
+    bool achieved() { return (pn_connection_state(pnConnection) & PN_REMOTE_UNINIT); }
+    pn_connection_t *pnConnection;
+};
+
+struct ConnectionClosed : public WaitCondition {
+    ConnectionClosed(pn_connection_t *c) : pnConnection(c) {}
+    bool achieved() { return !(pn_connection_state(pnConnection) & PN_REMOTE_ACTIVE); }
+    pn_connection_t *pnConnection;
+};
+
+}
+
+
+BlockingConnectionImpl::BlockingConnectionImpl(std::string &u, Duration timeout0, SslDomain *ssld, Container *c)
+    : url(u), timeout(timeout0), refCount(0)
+{
+    if (c)
+        container = *c;
+    container.start();
+    container.setTimeout(timeout);
+    // Create connection and send the connection events here
+    connection = container.connect(url, static_cast<Handler *>(this));
+    ConnectionOpening cond(connection.getPnConnection());
+    wait(cond);
+}
+
+BlockingConnectionImpl::~BlockingConnectionImpl() {
+    container = Container();
+}
+
+void BlockingConnectionImpl::close() {
+    connection.close();
+    ConnectionClosed cond(connection.getPnConnection());
+    wait(cond);
+}
+
+void BlockingConnectionImpl::wait(WaitCondition &condition) {
+    std::string empty;
+    wait(condition, empty, timeout);
+}
+
+void BlockingConnectionImpl::wait(WaitCondition &condition, std::string &msg, Duration waitTimeout) {
+    if (waitTimeout == Duration::FOREVER) {
+        while (!condition.achieved()) {
+            container.process();
+        }
+    }
+
+    pn_reactor_t *reactor = container.getReactor();
+    pn_millis_t origTimeout = pn_reactor_get_timeout(reactor);
+    pn_reactor_set_timeout(reactor, waitTimeout.getMilliseconds());
+    try {
+        pn_timestamp_t now = pn_reactor_mark(reactor);
+        pn_timestamp_t deadline = now + waitTimeout.getMilliseconds();
+        while (!condition.achieved()) {
+            container.process();
+            if (deadline < pn_reactor_mark(reactor)) {
+                std::string txt = "Connection timed out";
+                if (!msg.empty())
+                    txt += ": " + msg;
+                // TODO: proper Timeout exception
+                throw ProtonException(MSG(txt));
+            }
+        }
+    } catch (...) {
+        pn_reactor_set_timeout(reactor, origTimeout);
+        throw;
+    }
+    pn_reactor_set_timeout(reactor, origTimeout);
+}
+    
+        
+
+}} // namespace proton::reactor

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.h b/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.h
new file mode 100644
index 0000000..5f263ab
--- /dev/null
+++ b/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.h
@@ -0,0 +1,63 @@
+#ifndef PROTON_CPP_CONNECTIONIMPL_H
+#define PROTON_CPP_CONNECTIONIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/cpp/ImportExport.h"
+#include "proton/cpp/Endpoint.h"
+#include "proton/cpp/Container.h"
+#include "proton/types.h"
+#include <string>
+
+struct pn_connection_t;
+
+namespace proton {
+namespace reactor {
+
+class Handler;
+class Container;
+class SslDomain;
+
+ class BlockingConnectionImpl : public MessagingHandler
+{
+  public:
+    PROTON_CPP_EXTERN BlockingConnectionImpl(std::string &url, Duration d, SslDomain *ssld, Container *c);
+    PROTON_CPP_EXTERN ~BlockingConnectionImpl();
+    PROTON_CPP_EXTERN void close();
+    PROTON_CPP_EXTERN void wait(WaitCondition &condition);
+    PROTON_CPP_EXTERN void wait(WaitCondition &condition, std::string &msg, Duration timeout);
+    PROTON_CPP_EXTERN pn_connection_t *getPnBlockingConnection();
+    Duration getTimeout() { return timeout; }
+    static void incref(BlockingConnectionImpl *);
+    static void decref(BlockingConnectionImpl *);
+  private:
+    friend class BlockingConnection;
+    Container container;
+    Connection connection;
+    std::string url;
+    Duration timeout;
+    int refCount;
+};
+
+
+}} // namespace proton::reactor
+
+#endif  /*!PROTON_CPP_CONNECTIONIMPL_H*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/blocking/BlockingLink.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking/BlockingLink.cpp b/proton-c/bindings/cpp/src/blocking/BlockingLink.cpp
new file mode 100644
index 0000000..5a572ae
--- /dev/null
+++ b/proton-c/bindings/cpp/src/blocking/BlockingLink.cpp
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/cpp/BlockingLink.h"
+#include "proton/cpp/BlockingConnection.h"
+#include "proton/cpp/MessagingHandler.h"
+#include "proton/cpp/WaitCondition.h"
+#include "proton/cpp/exceptions.h"
+#include "Msg.h"
+
+
+namespace proton {
+namespace reactor {
+
+namespace {
+struct LinkOpened : public WaitCondition {
+    LinkOpened(pn_link_t *l) : pnLink(l) {}
+    bool achieved() { return !(pn_link_state(pnLink) & PN_REMOTE_UNINIT); }
+    pn_link_t *pnLink;
+};
+
+struct LinkClosed : public WaitCondition {
+    LinkClosed(pn_link_t *l) : pnLink(l) {}
+    bool achieved() { return (pn_link_state(pnLink) & PN_REMOTE_CLOSED); }
+    pn_link_t *pnLink;
+};
+
+struct LinkNotOpen : public WaitCondition {
+    LinkNotOpen(pn_link_t *l) : pnLink(l) {}
+    bool achieved() { return !(pn_link_state(pnLink) & PN_REMOTE_ACTIVE); }
+    pn_link_t *pnLink;
+};
+
+
+} // namespace
+
+
+BlockingLink::BlockingLink(BlockingConnection *c, pn_link_t *pnl) : connection(*c), link(pnl) {
+    std::string msg = "Opening link " + link.getName();
+    LinkOpened linkOpened(link.getPnLink());
+    connection.wait(linkOpened, msg);
+}
+
+BlockingLink::~BlockingLink() {}
+
+void BlockingLink::waitForClosed(Duration timeout) {
+    std::string msg = "Closing link " + link.getName();
+    LinkClosed linkClosed(link.getPnLink());
+    connection.wait(linkClosed, msg);
+    checkClosed();
+}
+
+void BlockingLink::checkClosed() {
+    pn_link_t * pnLink = link.getPnLink();
+    if (pn_link_state(pnLink) & PN_REMOTE_CLOSED) {
+        link.close();
+        // TODO: LinkDetached exception
+        throw ProtonException(MSG("Link detached"));
+    }
+}
+
+void BlockingLink::close() {
+    link.close();
+    std::string msg = "Closing link " + link.getName();
+    LinkNotOpen linkNotOpen(link.getPnLink());
+    connection.wait(linkNotOpen, msg);
+}
+
+}} // namespace proton::reactor

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef29b07c/proton-c/bindings/cpp/src/blocking/BlockingSender.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking/BlockingSender.cpp b/proton-c/bindings/cpp/src/blocking/BlockingSender.cpp
new file mode 100644
index 0000000..dc6b9bd
--- /dev/null
+++ b/proton-c/bindings/cpp/src/blocking/BlockingSender.cpp
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/cpp/BlockingSender.h"
+#include "proton/cpp/BlockingConnection.h"
+#include "proton/cpp/WaitCondition.h"
+#include "proton/cpp/exceptions.h"
+#include "Msg.h"
+
+
+namespace proton {
+namespace reactor {
+
+namespace {
+struct DeliverySettled : public WaitCondition {
+    DeliverySettled(pn_delivery_t *d) : pnDelivery(d) {}
+    bool achieved() { return pn_delivery_settled(pnDelivery); }
+    pn_delivery_t *pnDelivery;
+};
+
+} // namespace
+
+
+BlockingSender::BlockingSender(BlockingConnection &c, Sender &l) : BlockingLink(&c, l.getPnLink()) {
+    std::string ta = link.getTarget().getAddress();
+    std::string rta = link.getRemoteTarget().getAddress();
+    if (ta.empty() || ta.compare(rta) != 0) {
+        waitForClosed();
+        link.close();
+        std::string txt = "Failed to open sender " + link.getName() + ", target does not match";
+        throw ProtonException(MSG("Container not started"));
+    }
+}
+
+Delivery BlockingSender::send(Message &msg, Duration timeout) {
+    Sender snd = link;
+    Delivery dlv = snd.send(msg);
+    std::string txt = "Sending on sender " + link.getName();
+    DeliverySettled cond(dlv.getPnDelivery());
+    connection.wait(cond, txt, timeout);
+    return dlv;
+}
+
+Delivery BlockingSender::send(Message &msg) {
+    // Use default timeout
+    return send(msg, connection.getTimeout());
+}
+
+}} // namespace proton::reactor


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message