qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cliffjan...@apache.org
Subject qpid-proton git commit: PROTON-865: most of MessagingAdapter in place, SimpleSend/Recv
Date Tue, 05 May 2015 13:49:36 GMT
Repository: qpid-proton
Updated Branches:
  refs/heads/cjansen-cpp-client 177def134 -> 5ce5f282f


PROTON-865: most of MessagingAdapter in place, SimpleSend/Recv


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5ce5f282
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5ce5f282
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5ce5f282

Branch: refs/heads/cjansen-cpp-client
Commit: 5ce5f282f5115941c44df6f91db65bc5136f1008
Parents: 177def1
Author: Clifford Jansen <cliffjansen@apache.org>
Authored: Tue May 5 06:48:48 2015 -0700
Committer: Clifford Jansen <cliffjansen@apache.org>
Committed: Tue May 5 06:48:48 2015 -0700

----------------------------------------------------------------------
 proton-c/bindings/cpp/CMakeLists.txt            |   4 +
 proton-c/bindings/cpp/examples/SimpleRecv.cpp   | 109 +++++++
 proton-c/bindings/cpp/examples/SimpleSend.cpp   | 117 +++++++
 .../bindings/cpp/include/proton/cpp/Container.h |   1 +
 .../bindings/cpp/include/proton/cpp/Message.h   |  23 +-
 .../cpp/include/proton/cpp/MessagingAdapter.h   |  30 +-
 .../cpp/include/proton/cpp/MessagingEvent.h     |  16 +-
 .../cpp/include/proton/cpp/MessagingHandler.h   |  13 +-
 .../bindings/cpp/include/proton/cpp/Session.h   |   8 +-
 proton-c/bindings/cpp/src/Connection.cpp        |   2 +-
 proton-c/bindings/cpp/src/ConnectionImpl.cpp    |  14 +-
 proton-c/bindings/cpp/src/Connector.cpp         |   5 +-
 proton-c/bindings/cpp/src/Container.cpp         |   4 +
 proton-c/bindings/cpp/src/ContainerImpl.cpp     |  87 +++--
 proton-c/bindings/cpp/src/ContainerImpl.h       |   1 +
 proton-c/bindings/cpp/src/Link.cpp              |   4 +-
 proton-c/bindings/cpp/src/Message.cpp           | 194 ++++++++++--
 proton-c/bindings/cpp/src/MessagingAdapter.cpp  | 316 ++++++++++++++++---
 proton-c/bindings/cpp/src/MessagingEvent.cpp    |  31 +-
 proton-c/bindings/cpp/src/MessagingHandler.cpp  |  12 +-
 proton-c/bindings/cpp/src/Session.cpp           |  29 +-
 proton-c/bindings/cpp/src/contexts.cpp          |  47 +--
 proton-c/bindings/cpp/src/contexts.h            |   4 +
 23 files changed, 900 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index d83bd93..08c9418 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -93,6 +93,10 @@ add_executable (HelloWorld examples/HelloWorld.cpp)
 target_link_libraries (HelloWorld qpid-proton-cpp)
 add_executable (HelloWorldDirect examples/HelloWorldDirect.cpp)
 target_link_libraries (HelloWorldDirect qpid-proton-cpp)
+add_executable (SimpleRecv examples/SimpleRecv.cpp)
+target_link_libraries (SimpleRecv qpid-proton-cpp)
+add_executable (SimpleSend examples/SimpleSend.cpp)
+target_link_libraries (SimpleSend qpid-proton-cpp)
 
 install (TARGETS qpid-proton-cpp
   EXPORT  proton

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/examples/SimpleRecv.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/examples/SimpleRecv.cpp b/proton-c/bindings/cpp/examples/SimpleRecv.cpp
new file mode 100644
index 0000000..22778f0
--- /dev/null
+++ b/proton-c/bindings/cpp/examples/SimpleRecv.cpp
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/cpp/Container.h"
+#include "proton/cpp/MessagingHandler.h"
+#include "proton/cpp/Link.h"
+
+#include <iostream>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+
+using namespace proton::reactor;
+
+class Recv : public MessagingHandler {
+  private:
+    std::string url;
+    int expected;
+    int received;
+  public:
+
+    Recv(const std::string &s, int c) : url(s), expected(c), received(0) {}
+
+    void onStart(Event &e) {
+        e.getContainer().createReceiver(url);
+    }
+
+    void onMessage(Event &e) {
+        uint64_t id = 0;
+        Message msg = e.getMessage();
+        if (msg.getIdType() == PN_ULONG) {
+            id = msg.getId();
+            if (id < received)
+                return; // ignore duplicate
+        }
+        if (expected == 0 || received < expected) {
+            std::cout << '[' << id << "]: " << msg.getBody() << std::endl;
+            received++;
+            if (received == expected) {
+                e.getReceiver().close();
+                e.getConnection().close();
+            }
+        }
+    }
+};
+
+static void parse_options(int argc, char **argv, int &count, std::string &addr);
+
+int main(int argc, char **argv) {
+    int messageCount = 100;
+    std::string address("localhost:5672/examples");
+    parse_options(argc, argv, messageCount, address);
+    Recv recv(address, messageCount);
+    Container(recv).run();
+}
+
+
+static void usage() {
+    std::cout << "Usage: SimpleRecv -m message_count -a address:" << std::endl;
+    exit (1);
+}
+
+
+static void parse_options(int argc, char **argv, int &count, std::string &addr) {
+    int c, i;
+    for (i = 1; i < argc; i++) {
+        if (strlen(argv[i]) == 2 && argv[i][0] == '-') {
+            c = argv[i][1];
+            const char *nextarg = i < argc ? argv[i+1] : NULL;
+
+            switch (c) {
+            case 'a':
+                if (!nextarg) usage();
+                addr = nextarg;
+                i++;
+                break;
+            case 'm':
+                if (!nextarg) usage();
+                unsigned newc;
+                if (sscanf( nextarg, "%d", &newc) != 1) usage();
+                count = newc;
+                i++;
+                break;
+            default:
+                usage();
+            }
+        }
+        else usage();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/examples/SimpleSend.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/examples/SimpleSend.cpp b/proton-c/bindings/cpp/examples/SimpleSend.cpp
new file mode 100644
index 0000000..89ca39d
--- /dev/null
+++ b/proton-c/bindings/cpp/examples/SimpleSend.cpp
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/cpp/Container.h"
+#include "proton/cpp/MessagingHandler.h"
+#include "proton/cpp/Connection.h"
+
+#include <iostream>
+#include <string.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+
+using namespace proton::reactor;
+
+class Send : public MessagingHandler {
+  private:
+    std::string url;
+    int sent;
+    int confirmed;
+    int total;
+  public:
+
+    Send(const std::string &s, int c) : url(s), sent(0), confirmed(0), total(c) {}
+
+    void onStart(Event &e) {
+        e.getContainer().createSender(url);
+    }
+
+    void onSendable(Event &e) {
+        Sender sender = e.getSender();
+        while (sender.getCredit() && sent < total) {
+            Message msg;
+            msg.setId(sent + 1);
+            // TODO: fancy map body content as in Python example.  Simple binary for now.
+            const char *bin = "some arbitrary binary data";
+            msg.setBody(bin, strlen(bin));
+            sender.send(msg);
+            sent++;
+        }
+    }
+
+    void onAccepted(Event &e) {
+        confirmed++;
+        if (confirmed == total) {
+            std::cout << "all messages confirmed" << std::endl;
+            e.getConnection().close();
+        }
+    }
+
+    void onDisconnected(Event &e) {
+        sent = confirmed;
+    }
+};
+
+static void parse_options(int argc, char **argv, int &count, std::string &addr);
+
+int main(int argc, char **argv) {
+    int messageCount = 100;
+    std::string address("localhost:5672/examples");
+    parse_options(argc, argv, messageCount, address);
+    Send send(address, messageCount);
+    Container(send).run();
+}
+
+
+static void usage() {
+    std::cout << "Usage: SimpleSend -m message_count -a address:" << std::endl;
+    exit (1);
+}
+
+
+static void parse_options(int argc, char **argv, int &count, std::string &addr) {
+    int c, i;
+    for (i = 1; i < argc; i++) {
+        if (strlen(argv[i]) == 2 && argv[i][0] == '-') {
+            c = argv[i][1];
+            const char *nextarg = i < argc ? argv[i+1] : NULL;
+
+            switch (c) {
+            case 'a':
+                if (!nextarg) usage();
+                addr = nextarg;
+                i++;
+                break;
+            case 'm':
+                if (!nextarg) usage();
+                unsigned newc;
+                if (sscanf( nextarg, "%d", &newc) != 1) usage();
+                count = newc;
+                i++;
+                break;
+            default:
+                usage();
+            }
+        }
+        else usage();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/include/proton/cpp/Container.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Container.h b/proton-c/bindings/cpp/include/proton/cpp/Container.h
index fbb1a83..d596ab1 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/Container.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/Container.h
@@ -56,6 +56,7 @@ class Container : public Handle<ContainerImpl>
     PROTON_CPP_EXTERN Sender createSender(Connection &connection, std::string &addr);
     PROTON_CPP_EXTERN Sender createSender(std::string &url);
     PROTON_CPP_EXTERN Receiver createReceiver(Connection &connection, std::string &addr);
+    PROTON_CPP_EXTERN Receiver createReceiver(const std::string &url);
     PROTON_CPP_EXTERN Acceptor listen(const std::string &url);
     PROTON_CPP_EXTERN std::string getContainerId();
   private:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/include/proton/cpp/Message.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Message.h b/proton-c/bindings/cpp/include/proton/cpp/Message.h
index 51ca731..ae29ca2 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/Message.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/Message.h
@@ -22,6 +22,7 @@
  *
  */
 #include "proton/cpp/ImportExport.h"
+#include "proton/cpp/ProtonHandle.h"
 #include "proton/message.h"
 #include <string>
 
@@ -29,22 +30,36 @@
 namespace proton {
 namespace reactor {
 
-class Message
+class Message : public ProtonHandle<pn_message_t>
 {
   public:
     PROTON_CPP_EXTERN Message();
-    PROTON_CPP_EXTERN ~Message();
+    PROTON_CPP_EXTERN Message(pn_message_t *);
     PROTON_CPP_EXTERN Message(const Message&);
     PROTON_CPP_EXTERN Message& operator=(const Message&);
+    PROTON_CPP_EXTERN ~Message();
+
+    PROTON_CPP_EXTERN pn_message_t *getPnMessage() const;
+
+    PROTON_CPP_EXTERN void setId(uint64_t id);
+    PROTON_CPP_EXTERN uint64_t getId();
+    PROTON_CPP_EXTERN pn_type_t getIdType();
 
-    PROTON_CPP_EXTERN pn_message_t *getPnMessage();
     PROTON_CPP_EXTERN void setBody(const std::string &data);
     PROTON_CPP_EXTERN std::string getBody();
+
+    PROTON_CPP_EXTERN void getBody(std::string &str);
+
+    PROTON_CPP_EXTERN void setBody(const char *, size_t len);
+    PROTON_CPP_EXTERN size_t getBody(char *, size_t len);
+    PROTON_CPP_EXTERN size_t getBinaryBodySize();
+
+
     PROTON_CPP_EXTERN void encode(std::string &data);
     PROTON_CPP_EXTERN void decode(const std::string &data);
 
   private:
-    pn_message_t *pnMessage;
+    friend class ProtonImplRef<Message>;
 };
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h b/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h
index 8551c9c..ac8b483 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h
@@ -32,10 +32,10 @@
 namespace proton {
 namespace reactor {
 
-// For now, stands in for Python's: EndpointStateHandler, IncomingMessageHandler, OutgoingMessageHandler
+// Combine's Python's: EndpointStateHandler, IncomingMessageHandler, OutgoingMessageHandler
 
 
-class MessagingAdapter : public ProtonHandler
+class MessagingAdapter : public MessagingHandler
 {
   public:
     PROTON_CPP_EXTERN MessagingAdapter(MessagingHandler &delegate);
@@ -44,11 +44,37 @@ class MessagingAdapter : public ProtonHandler
     PROTON_CPP_EXTERN virtual void onLinkFlow(Event &e);
     PROTON_CPP_EXTERN virtual void onDelivery(Event &e);
     PROTON_CPP_EXTERN virtual void onUnhandled(Event &e);
+    PROTON_CPP_EXTERN virtual void onConnectionClosed(Event &e);
+    PROTON_CPP_EXTERN virtual void onConnectionClosing(Event &e);
+    PROTON_CPP_EXTERN virtual void onConnectionError(Event &e);
+    PROTON_CPP_EXTERN virtual void onConnectionLocalOpen(Event &e);
+    PROTON_CPP_EXTERN virtual void onConnectionRemoteOpen(Event &e);
     PROTON_CPP_EXTERN virtual void onConnectionRemoteClose(Event &e);
+    PROTON_CPP_EXTERN virtual void onConnectionOpened(Event &e);
+    PROTON_CPP_EXTERN virtual void onConnectionOpening(Event &e);
+    PROTON_CPP_EXTERN virtual void onSessionClosed(Event &e);
+    PROTON_CPP_EXTERN virtual void onSessionClosing(Event &e);
+    PROTON_CPP_EXTERN virtual void onSessionError(Event &e);
+    PROTON_CPP_EXTERN virtual void onSessionLocalOpen(Event &e);
+    PROTON_CPP_EXTERN virtual void onSessionRemoteOpen(Event &e);
+    PROTON_CPP_EXTERN virtual void onSessionRemoteClose(Event &e);
+    PROTON_CPP_EXTERN virtual void onSessionOpened(Event &e);
+    PROTON_CPP_EXTERN virtual void onSessionOpening(Event &e);
+    PROTON_CPP_EXTERN virtual void onLinkClosed(Event &e);
+    PROTON_CPP_EXTERN virtual void onLinkClosing(Event &e);
+    PROTON_CPP_EXTERN virtual void onLinkError(Event &e);
+    PROTON_CPP_EXTERN virtual void onLinkLocalOpen(Event &e);
     PROTON_CPP_EXTERN virtual void onLinkRemoteOpen(Event &e);
+    PROTON_CPP_EXTERN virtual void onLinkRemoteClose(Event &e);
+    PROTON_CPP_EXTERN virtual void onLinkOpened(Event &e);
+    PROTON_CPP_EXTERN virtual void onLinkOpening(Event &e);
+    PROTON_CPP_EXTERN virtual void onTransportTailClosed(Event &e);
   private:
     MessagingHandler &delegate;  // The actual MessagingHandler
     pn_handler_t *handshaker;
+    bool autoSettle;
+    bool autoAccept;
+    bool peerCloseIsError;
 };
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/include/proton/cpp/MessagingEvent.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/MessagingEvent.h b/proton-c/bindings/cpp/include/proton/cpp/MessagingEvent.h
index d8d5c7f..de79618 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/MessagingEvent.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/MessagingEvent.h
@@ -37,17 +37,19 @@ typedef enum {
     PN_MESSAGING_ABORT,
     PN_MESSAGING_ACCEPTED,
     PN_MESSAGING_COMMIT,
-    PN_MESSAGING_CONNECTION_CLOSE,
     PN_MESSAGING_CONNECTION_CLOSED,
     PN_MESSAGING_CONNECTION_CLOSING,
-    PN_MESSAGING_CONNECTION_OPEN,
+    PN_MESSAGING_CONNECTION_ERROR,
     PN_MESSAGING_CONNECTION_OPENED,
+    PN_MESSAGING_CONNECTION_OPENING,
     PN_MESSAGING_DISCONNECTED,
     PN_MESSAGING_FETCH,
     PN_MESSAGING_ID_LOADED,
+    PN_MESSAGING_LINK_CLOSED,
     PN_MESSAGING_LINK_CLOSING,
     PN_MESSAGING_LINK_OPENED,
     PN_MESSAGING_LINK_OPENING,
+    PN_MESSAGING_LINK_ERROR,
     PN_MESSAGING_MESSAGE,
     PN_MESSAGING_QUIT,
     PN_MESSAGING_RECORD_INSERTED,
@@ -57,19 +59,25 @@ typedef enum {
     PN_MESSAGING_REQUEST,
     PN_MESSAGING_RESPONSE,
     PN_MESSAGING_SENDABLE,
+    PN_MESSAGING_SESSION_CLOSED,
+    PN_MESSAGING_SESSION_CLOSING,
+    PN_MESSAGING_SESSION_OPENED,
+    PN_MESSAGING_SESSION_OPENING,
+    PN_MESSAGING_SESSION_ERROR,
     PN_MESSAGING_SETTLED,
     PN_MESSAGING_START,
     PN_MESSAGING_TIMER,
     PN_MESSAGING_TRANSACTION_ABORTED,
     PN_MESSAGING_TRANSACTION_COMMITTED,
-    PN_MESSAGING_TRANSACTION_DECLARED
+    PN_MESSAGING_TRANSACTION_DECLARED,
+    PN_MESSAGING_TRANSPORT_CLOSED
 } MessagingEventType_t;
 
 class MessagingEvent : public ProtonEvent
 {
   public:
     MessagingEvent(pn_event_t *ce, pn_event_type_t t, Container &c);
-    MessagingEvent(MessagingEventType_t t, ProtonEvent *parent, Container &c);
+    MessagingEvent(MessagingEventType_t t, ProtonEvent &parent);
     ~MessagingEvent();
     virtual PROTON_CPP_EXTERN void dispatch(Handler &h);
     virtual PROTON_CPP_EXTERN Connection &getConnection();

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h b/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h
index 875af43..51f679a 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h
@@ -34,20 +34,23 @@ class PROTON_CPP_EXTERN MessagingHandler : public ProtonHandler
 {
   public:
     PROTON_CPP_EXTERN MessagingHandler();
+//ZZZ    PROTON_CPP_EXTERN MessagingHandler(int prefetch=10, bool autoAccept=true, autoSettle=true, peerCloseIsError=false);
     virtual ~MessagingHandler();
 
     virtual void onAbort(Event &e);
     virtual void onAccepted(Event &e);
     virtual void onCommit(Event &e);
-    virtual void onConnectionClose(Event &e);
     virtual void onConnectionClosed(Event &e);
     virtual void onConnectionClosing(Event &e);
-    virtual void onConnectionOpen(Event &e);
+    virtual void onConnectionError(Event &e);
+    virtual void onConnectionOpening(Event &e);
     virtual void onConnectionOpened(Event &e);
     virtual void onDisconnected(Event &e);
     virtual void onFetch(Event &e);
     virtual void onIdLoaded(Event &e);
+    virtual void onLinkClosed(Event &e);
     virtual void onLinkClosing(Event &e);
+    virtual void onLinkError(Event &e);
     virtual void onLinkOpened(Event &e);
     virtual void onLinkOpening(Event &e);
     virtual void onMessage(Event &e);
@@ -59,12 +62,18 @@ class PROTON_CPP_EXTERN MessagingHandler : public ProtonHandler
     virtual void onRequest(Event &e);
     virtual void onResponse(Event &e);
     virtual void onSendable(Event &e);
+    virtual void onSessionClosed(Event &e);
+    virtual void onSessionClosing(Event &e);
+    virtual void onSessionError(Event &e);
+    virtual void onSessionOpened(Event &e);
+    virtual void onSessionOpening(Event &e);
     virtual void onSettled(Event &e);
     virtual void onStart(Event &e);
     virtual void onTimer(Event &e);
     virtual void onTransactionAborted(Event &e);
     virtual void onTransactionCommitted(Event &e);
     virtual void onTransactionDeclared(Event &e);
+    virtual void onTransportClosed(Event &e);
 };
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/include/proton/cpp/Session.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Session.h b/proton-c/bindings/cpp/include/proton/cpp/Session.h
index e556cde..68f5e40 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/Session.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/Session.h
@@ -27,6 +27,7 @@
 
 #include "proton/types.h"
 #include "proton/link.h"
+#include "ProtonImplRef.h"
 #include <string>
 
 struct pn_connection_t;
@@ -38,19 +39,22 @@ class Container;
 class Handler;
 class Transport;
 
-class Session : public Endpoint
+ class Session : public Endpoint, public ProtonHandle<pn_session_t>
 {
   public:
     PROTON_CPP_EXTERN Session(pn_session_t *s);
+    PROTON_CPP_EXTERN Session();
     PROTON_CPP_EXTERN ~Session();
     PROTON_CPP_EXTERN void open();
+    PROTON_CPP_EXTERN Session(const Session&);
+    PROTON_CPP_EXTERN Session& operator=(const Session&);
     PROTON_CPP_EXTERN void close();
     PROTON_CPP_EXTERN pn_session_t *getPnSession();
     virtual PROTON_CPP_EXTERN Connection &getConnection();
     Receiver createReceiver(std::string name);
     Sender createSender(std::string name);
   private:
-    pn_session_t *pnSession;
+    friend class ProtonImplRef<Session>;
 };
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/Connection.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Connection.cpp b/proton-c/bindings/cpp/src/Connection.cpp
index e85b323..49d171e 100644
--- a/proton-c/bindings/cpp/src/Connection.cpp
+++ b/proton-c/bindings/cpp/src/Connection.cpp
@@ -35,7 +35,7 @@ namespace reactor {
 template class Handle<ConnectionImpl>;
 typedef PrivateImplRef<Connection> PI;
 
-Connection::Connection() {}
+Connection::Connection() {PI::ctor(*this, 0); }
 Connection::Connection(ConnectionImpl* p) { PI::ctor(*this, p); }
 Connection::Connection(const Connection& c) : Handle<ConnectionImpl>() { PI::copy(*this, c); }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/ConnectionImpl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/ConnectionImpl.cpp b/proton-c/bindings/cpp/src/ConnectionImpl.cpp
index 3b70362..be01f8d 100644
--- a/proton-c/bindings/cpp/src/ConnectionImpl.cpp
+++ b/proton-c/bindings/cpp/src/ConnectionImpl.cpp
@@ -22,6 +22,7 @@
 #include "proton/cpp/Handler.h"
 #include "proton/cpp/exceptions.h"
 #include "ConnectionImpl.h"
+#include "proton/cpp/Transport.h"
 #include "Msg.h"
 #include "contexts.h"
 
@@ -34,7 +35,7 @@ void ConnectionImpl::incref(ConnectionImpl *impl) {
     impl->refCount++;
 }
 
-void ConnectionImpl::decref(ConnectionImpl *impl) { 
+void ConnectionImpl::decref(ConnectionImpl *impl) {
     impl->refCount--;
     if (impl->refCount == 0)
         delete impl;
@@ -47,7 +48,10 @@ ConnectionImpl::ConnectionImpl(Container &c) : container(c), refCount(0), overri
     setConnectionContext(pnConnection, this);
 }
 
-ConnectionImpl::~ConnectionImpl() {}
+ConnectionImpl::~ConnectionImpl() {
+    delete transport;
+    delete override;
+}
 
 Transport &ConnectionImpl::getTransport() {
     if (transport)
@@ -56,7 +60,11 @@ Transport &ConnectionImpl::getTransport() {
 }
 
 Handler* ConnectionImpl::getOverride() { return override; }
-void ConnectionImpl::setOverride(Handler *h) { override = h; }
+void ConnectionImpl::setOverride(Handler *h) {
+    if (override)
+        delete override;
+    override = h;
+}
 
 void ConnectionImpl::open() {
     pn_connection_open(pnConnection);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/Connector.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Connector.cpp b/proton-c/bindings/cpp/src/Connector.cpp
index 6885575..8ebdfb6 100644
--- a/proton-c/bindings/cpp/src/Connector.cpp
+++ b/proton-c/bindings/cpp/src/Connector.cpp
@@ -62,15 +62,14 @@ void Connector::onConnectionRemoteOpen(Event &e) {
 }
 
 void Connector::onConnectionInit(Event &e) {
-
 }
 
 void Connector::onTransportClosed(Event &e) {
     // TODO: prepend with reconnect logic
     PN_CPP_LOG(info, "Disconnected");
-    connection.setOverride(0);  // No more call backs
     pn_connection_release(connection.impl->pnConnection);
-    delete this;
+    // No more interaction, so drop our counted reference.
+    connection = Connection();
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/Container.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Container.cpp b/proton-c/bindings/cpp/src/Container.cpp
index 8e79b15..4eccb15 100644
--- a/proton-c/bindings/cpp/src/Container.cpp
+++ b/proton-c/bindings/cpp/src/Container.cpp
@@ -74,6 +74,10 @@ Receiver Container::createReceiver(Connection &connection, std::string &addr) {
     return impl->createReceiver(connection, addr);
 }
 
+Receiver Container::createReceiver(const std::string &url) {
+    return impl->createReceiver(url);
+}
+
 Acceptor Container::listen(const std::string &urlString) {
     return impl->listen(urlString);
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/ContainerImpl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/ContainerImpl.cpp b/proton-c/bindings/cpp/src/ContainerImpl.cpp
index 7ff9c1d..df8c716 100644
--- a/proton-c/bindings/cpp/src/ContainerImpl.cpp
+++ b/proton-c/bindings/cpp/src/ContainerImpl.cpp
@@ -37,6 +37,7 @@
 
 #include "proton/connection.h"
 #include "proton/session.h"
+#include "proton/handlers.h"
 
 namespace proton {
 namespace reactor {
@@ -64,23 +65,20 @@ class CHandler : public Handler
         pn_decref(pnHandler);
     }
     pn_handler_t *getPnHandler() { return pnHandler; }
+
+    virtual void onUnhandled(Event &e) {
+        ProtonEvent *pne = dynamic_cast<ProtonEvent *>(&e);
+        if (!pne) return;
+        int type = pne->getType();
+        if (!type) return;  // Not from the reactor
+        pn_handler_dispatch(pnHandler, pne->getPnEvent(), (pn_event_type_t) type);
+    }
+
   private:
     pn_handler_t *pnHandler;
 };
 
 
-void dispatch(Handler &h, MessagingEvent &e) {
-    // TODO: also dispatch to add()'ed Handlers
-    CHandler *chandler;
-    int type = e.getType();
-    if (type &&  (chandler = dynamic_cast<CHandler*>(&h))) {
-        // event and handler are both native Proton C
-        pn_handler_dispatch(chandler->getPnHandler(), e.getPnEvent(), (pn_event_type_t) type);
-    }
-    else
-        e.dispatch(h);
-}
-
 // Used to sniff for Connector events before the reactor's global handler sees them.
 class OverrideHandler : public Handler
 {
@@ -109,8 +107,9 @@ class OverrideHandler : public Handler
             ConnectionImpl *connection = getConnectionContext(conn);
             if (connection) {
                 Handler *override = connection->getOverride();
-                if (override)
+                if (override) {
                     e.dispatch(*override);
+                }
             }
         }
 
@@ -127,6 +126,29 @@ class OverrideHandler : public Handler
     }
 };
 
+
+class CFlowController : public ProtonHandler
+{
+  public:
+    pn_handler_t *flowcontroller;
+
+    CFlowController(int window) : flowcontroller(pn_flowcontroller(window)) {}
+    ~CFlowController() {
+        pn_decref(flowcontroller);
+    }
+
+    void redirect(Event &e) {
+        ProtonEvent *pne = dynamic_cast<ProtonEvent *>(&e);
+        pn_handler_dispatch(flowcontroller, pne->getPnEvent(), (pn_event_type_t) pne->getType());
+    }
+
+    virtual void onLinkLocalOpen(Event &e) { redirect(e); }
+    virtual void onLinkRemoteOpen(Event &e) { redirect(e); }
+    virtual void onLinkFlow(Event &e) { redirect(e); }
+    virtual void onDelivery(Event &e) { redirect(e); }
+};
+
+
 namespace {
 
 // TODO: configurable policy.  SessionPerConnection for now.
@@ -162,8 +184,8 @@ Handler &getCppHandler(pn_handler_t *c_handler) {
 
 void cpp_handler_dispatch(pn_handler_t *c_handler, pn_event_t *cevent, pn_event_type_t type)
 {
-    MessagingEvent ev(cevent, type, getContainerRef(c_handler));
-    dispatch(getCppHandler(c_handler), ev);
+    MessagingEvent mevent(cevent, type, getContainerRef(c_handler));
+    mevent.dispatch(getCppHandler(c_handler));
 }
 
 void cpp_handler_cleanup(pn_handler_t *c_handler)
@@ -176,7 +198,7 @@ pn_handler_t *cpp_handler(ContainerImpl *c, Handler *h)
 {
     pn_handler_t *handler = pn_handler_new(cpp_handler_dispatch, sizeof(struct InboundContext), cpp_handler_cleanup);
     struct InboundContext *ctxt = (struct InboundContext *) pn_handler_mem(handler);
-    ctxt->containerRef = Container(c);
+    new (&ctxt->containerRef) Container(c);
     ctxt->containerImpl = c;
     ctxt->cppHandler = h;
     return handler;
@@ -254,6 +276,17 @@ Receiver ContainerImpl::createReceiver(Connection &connection, std::string &addr
     return rcv;
 }
 
+Receiver ContainerImpl::createReceiver(const std::string &urlString) {
+    // TODO: const cleanup of API
+    Connection conn = connect(const_cast<std::string &>(urlString));
+    Session session = getDefaultSession(conn.getPnConnection(), &getImpl(conn)->defaultSession);
+    std::string path = Url(urlString).getPath();
+    Receiver rcv = session.createReceiver(containerId + '-' + path);
+    pn_terminus_set_address(pn_link_source(rcv.getPnLink()), path.c_str());
+    rcv.open();
+    return rcv;
+}
+
 Acceptor ContainerImpl::acceptor(const std::string &host, const std::string &port) {
     pn_acceptor_t *acptr = pn_reactor_acceptor(reactor, host.c_str(), port.c_str(), NULL);
     if (acptr)
@@ -271,11 +304,20 @@ Acceptor ContainerImpl::listen(const std::string &urlString) {
 
 void ContainerImpl::run() {
     reactor = pn_reactor();
+
     // Set our context on the reactor
     setContainerContext(reactor, this);
 
+    int prefetch = 10; // TODO: configurable
+    Handler *flowController = 0;
+
+
     // Set the reactor's main/default handler (see note below)
     MessagingAdapter messagingAdapter(messagingHandler);
+    if (prefetch) {
+        flowController = new CFlowController(prefetch);
+        messagingHandler.addChildHandler(*flowController);
+    }
     messagingHandler.addChildHandler(messagingAdapter);
     pn_handler_t *cppHandler = cpp_handler(this, &messagingHandler);
     pn_reactor_set_handler(reactor, cppHandler);
@@ -287,15 +329,18 @@ void ContainerImpl::run() {
     pn_handler_t *cppGlobalHandler = cpp_handler(this, &overrideHandler);
     pn_reactor_set_global_handler(reactor, cppGlobalHandler);
 
-    // Note: we have just set up the following 4 handlers that see events in this order:
-    // messagingHandler, messagingAdapter, connector override, the reactor's default global
-    // handler (pn_iohandler)
-    // TODO: remove fifth pn_handshaker once messagingAdapter matures
-
+    // Note: we have just set up the following 4/5 handlers that see events in this order:
+    // messagingHandler (Proton C events), pn_flowcontroller (optional), messagingAdapter,
+    // messagingHandler (Messaging events from the messagingAdapter), connector override,
+    // the reactor's default globalhandler (pn_iohandler)
     pn_reactor_run(reactor);
+
+    pn_decref(cppHandler);
+    pn_decref(cppGlobalHandler);
     pn_decref(cGlobalHandler);
     pn_reactor_free(reactor);
     reactor = 0;
+    delete(flowController);
 }
 
 }} // namespace proton::reactor

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/ContainerImpl.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/ContainerImpl.h b/proton-c/bindings/cpp/src/ContainerImpl.h
index a14bf52..8a6faba 100644
--- a/proton-c/bindings/cpp/src/ContainerImpl.h
+++ b/proton-c/bindings/cpp/src/ContainerImpl.h
@@ -49,6 +49,7 @@ class ContainerImpl
     PROTON_CPP_EXTERN Sender createSender(Connection &connection, std::string &addr);
     PROTON_CPP_EXTERN Sender createSender(std::string &url);
     PROTON_CPP_EXTERN Receiver createReceiver(Connection &connection, std::string &addr);
+    PROTON_CPP_EXTERN Receiver createReceiver(const std::string &url); //ZZZ
     PROTON_CPP_EXTERN Acceptor listen(const std::string &url);
     PROTON_CPP_EXTERN std::string getContainerId();
     static void incref(ContainerImpl *);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/Link.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Link.cpp b/proton-c/bindings/cpp/src/Link.cpp
index 3661129..3356b38 100644
--- a/proton-c/bindings/cpp/src/Link.cpp
+++ b/proton-c/bindings/cpp/src/Link.cpp
@@ -36,7 +36,7 @@ namespace reactor {
 namespace {
 
 static inline void throwIfNull(pn_link_t *l) { if (!l) throw ProtonException(MSG("Disassociated link")); }
-    
+
 }
 
 template class ProtonHandle<pn_link_t>;
@@ -48,7 +48,7 @@ Link::Link(pn_link_t* p) {
     if (p) senderLink = pn_link_is_sender(p);
 }
 Link::Link() {
-    PI::ctor(*this, 0); 
+    PI::ctor(*this, 0);
 }
 Link::Link(const Link& c) : ProtonHandle<pn_link_t>() {
     verifyType(impl);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/Message.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Message.cpp b/proton-c/bindings/cpp/src/Message.cpp
index 840a10b..ead6eb1 100644
--- a/proton-c/bindings/cpp/src/Message.cpp
+++ b/proton-c/bindings/cpp/src/Message.cpp
@@ -22,33 +22,128 @@
 #include "proton/cpp/Message.h"
 #include "proton/cpp/exceptions.h"
 #include "Msg.h"
+#include "ProtonImplRef.h"
+
+#include <cstring>
 
 namespace proton {
 namespace reactor {
 
-Message::Message() : pnMessage(pn_message()){}
+template class ProtonHandle<pn_message_t>;
+typedef ProtonImplRef<Message> PI;
 
-Message::~Message() {
-    pn_decref(pnMessage);
+Message::Message() {
+    PI::ctor(*this, 0);
+}
+Message::Message(pn_message_t *p) {
+    PI::ctor(*this, p);
 }
+Message::Message(const Message& m) : ProtonHandle<pn_message_t>() {
+    PI::copy(*this, m);
+}
+Message& Message::operator=(const Message& m) {
+    return PI::assign(*this, m);
+}
+Message::~Message() { PI::dtor(*this); }
 
-Message::Message(const Message& m) : pnMessage(m.pnMessage) {
-    pn_incref(pnMessage);
+namespace {
+void confirm(pn_message_t *&p) {
+    if (p) return;
+    p = pn_message(); // Correct refcount of 1
+    if (!p)
+        throw ProtonException(MSG("No memory"));
 }
 
-Message& Message::operator=(const Message& m) {
-    pnMessage = m.pnMessage;
-    pn_incref(pnMessage);
-    return *this;
+void getFormatedStringContent(pn_data_t *data, std::string &str) {
+    pn_data_rewind(data);
+    size_t sz = str.capacity();
+    if (sz < 512) sz = 512;
+    while (true) {
+        str.resize(sz);
+        int err = pn_data_format(data, (char *) str.data(), &sz);
+        if (err) {
+            if (err != PN_OVERFLOW)
+                throw ProtonException(MSG("Unexpected message body data error"));
+        }
+        else {
+            str.resize(sz);
+            return;
+        }
+        sz *= 2;
+    }
+}
+
+} // namespace
+
+void Message::setId(uint64_t id) {
+    confirm(impl);
+    pn_data_t *data = pn_message_id(impl);
+    pn_data_clear(data);
+    if (int err = pn_data_put_ulong(data, id))
+        throw ProtonException(MSG("setId error " << err));
+}
+
+uint64_t Message::getId() {
+    confirm(impl);
+    pn_data_t *data = pn_message_id(impl);
+    pn_data_rewind(data);
+    if (pn_data_size(data) == 1 && pn_data_next(data) && pn_data_type(data) == PN_ULONG) {
+        return pn_data_get_ulong(data);
+    }
+    throw ProtonException(MSG("Message ID is not a ULONG"));
+}
+
+pn_type_t Message::getIdType() {
+    confirm(impl);
+    pn_data_t *data = pn_message_id(impl);
+    pn_data_rewind(data);
+    if (pn_data_size(data) == 1 && pn_data_next(data)) {
+        pn_type_t type = pn_data_type(data);
+        switch (type) {
+        case PN_ULONG:
+        case PN_STRING:
+        case PN_BINARY:
+        case PN_UUID:
+            return type;
+            break;
+        default:
+            break;
+        }
+    }
+    return PN_NULL;
 }
 
 void Message::setBody(const std::string &buf) {
-    pn_data_t *body = pn_message_body(pnMessage);
+    confirm(impl);
+    pn_data_t *body = pn_message_body(impl);
+    pn_data_clear(body);
     pn_data_put_string(body, pn_bytes(buf.size(), buf.data()));
 }
 
+void Message::getBody(std::string &str) {
+    // User supplied string/buffer
+    confirm(impl);
+    pn_data_t *body = pn_message_body(impl);
+    pn_data_rewind(body);
+
+    if (pn_data_next(body) && pn_data_type(body) == PN_STRING) {
+        pn_bytes_t bytes= pn_data_get_string(body);
+        if (!pn_data_next(body)) {
+            // String data and nothing else
+            str.resize(bytes.size);
+            memmove((void *) str.data(), bytes.start, bytes.size);
+            return;
+        }
+    }
+
+    getFormatedStringContent(body, str);
+}
+
 std::string Message::getBody() {
-    pn_data_t *body = pn_message_body(pnMessage);
+    confirm(impl);
+    pn_data_t *body = pn_message_body(impl);
+    pn_data_rewind(body);
+
     if (pn_data_next(body) && pn_data_type(body) == PN_STRING) {
         pn_bytes_t bytes= pn_data_get_string(body);
         if (!pn_data_next(body)) {
@@ -57,36 +152,73 @@ std::string Message::getBody() {
         }
     }
 
-    pn_data_rewind(body);
     std::string str;
-    size_t sz = 1024;
-    str.resize(sz);
-    int err = pn_data_format(body, (char *) str.data(), &sz);
-    if (err == PN_OVERFLOW)
-        throw ProtonException(MSG("TODO: sizing loop missing"));
-    if (err) throw ProtonException(MSG("Unexpected data error"));
-    str.resize(sz);
+    getFormatedStringContent(body, str);
     return str;
 }
 
+void Message::setBody(const char *bytes, size_t len) {
+    confirm(impl);
+    pn_data_t *body = pn_message_body(impl);
+    pn_data_clear(body);
+    pn_data_put_binary(body, pn_bytes(len, bytes));
+}
+
+size_t Message::getBody(char *bytes, size_t len) {
+    confirm(impl);
+    pn_data_t *body = pn_message_body(impl);
+    pn_data_rewind(body);
+    if (pn_data_size(body) == 1 && pn_data_next(body) && pn_data_type(body) == PN_BINARY) {
+        pn_bytes_t pnb = pn_data_get_binary(body);
+        if (len >= pnb.size) {
+            memmove(bytes, pnb.start, pnb.size);
+            return pnb.size;
+        }
+        throw ProtonException(MSG("Binary buffer too small"));
+    }
+    throw ProtonException(MSG("Not simple binary data"));
+}
+
+
+
+size_t Message::getBinaryBodySize() {
+    confirm(impl);
+    pn_data_t *body = pn_message_body(impl);
+    pn_data_rewind(body);
+    if (pn_data_size(body) == 1 && pn_data_next(body) && pn_data_type(body) == PN_BINARY) {
+        pn_bytes_t bytes = pn_data_get_binary(body);
+        return bytes.size;
+    }
+    return 0;
+}
+
+
 void Message::encode(std::string &s) {
-    size_t sz = 1024;
-    if (s.capacity() > sz)
-        sz = s.capacity();
-    else
-        s.reserve(sz);
-    s.resize(sz);
-    int err = pn_message_encode(pnMessage, (char *) s.data(), &sz);
-    if (err == PN_OVERFLOW)
-        throw ProtonException(MSG("TODO: fix overflow with dynamic buffer resizing"));
-    if (err) throw ProtonException(MSG("unexpected error"));
-    s.resize(sz);
+    confirm(impl);
+    size_t sz = s.capacity();
+    if (sz < 512) sz = 512;
+    while (true) {
+        s.resize(sz);
+        int err = pn_message_encode(impl, (char *) s.data(), &sz);
+        if (err) {
+            if (err != PN_OVERFLOW)
+                throw ProtonException(MSG("unexpected error"));
+        } else {
+            s.resize(sz);
+            return;
+        }
+        sz *= 2;
+    }
 }
 
 void Message::decode(const std::string &s) {
-    int err = pn_message_decode(pnMessage, s.data(), s.size());
+    confirm(impl);
+    int err = pn_message_decode(impl, s.data(), s.size());
     if (err) throw ProtonException(MSG("unexpected error"));
 }
 
+pn_message_t *Message::getPnMessage() const {
+    return impl;
+}
 
 }} // namespace proton::reactor

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/MessagingAdapter.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/MessagingAdapter.cpp b/proton-c/bindings/cpp/src/MessagingAdapter.cpp
index 9cab2b3..1097305 100644
--- a/proton-c/bindings/cpp/src/MessagingAdapter.cpp
+++ b/proton-c/bindings/cpp/src/MessagingAdapter.cpp
@@ -28,23 +28,26 @@
 #include "proton/handlers.h"
 #include "proton/delivery.h"
 #include "proton/connection.h"
+#include "proton/session.h"
 
 namespace proton {
 namespace reactor {
 
-MessagingAdapter::MessagingAdapter(MessagingHandler &d) : delegate(d), handshaker(pn_handshaker()) {
-    pn_handler_t *flowcontroller = pn_flowcontroller(10);
-    pn_handler_add(handshaker, flowcontroller);
-    pn_decref(flowcontroller);
+MessagingAdapter::MessagingAdapter(MessagingHandler &d) : delegate(d), handshaker(pn_handshaker()),
+                                                          autoSettle(true), autoAccept(true),
+                                                          peerCloseIsError(false) {
 };
 MessagingAdapter::~MessagingAdapter(){
     pn_decref(handshaker);
 };
 
+
 void MessagingAdapter::onReactorInit(Event &e) {
-    // create onStart extended event
-    MessagingEvent mevent(PN_MESSAGING_START, NULL, e.getContainer());
-    mevent.dispatch(delegate);
+    ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
+    if (pe) {
+        MessagingEvent mevent(PN_MESSAGING_START, *pe);
+        delegate.onStart(mevent);
+    }
 }
 
 void MessagingAdapter::onLinkFlow(Event &e) {
@@ -54,8 +57,8 @@ void MessagingAdapter::onLinkFlow(Event &e) {
         pn_link_t *lnk = pn_event_link(pne);
         if (lnk && pn_link_is_sender(lnk) && pn_link_credit(lnk) > 0) {
             // create onMessage extended event
-            MessagingEvent mevent(PN_MESSAGING_SENDABLE, pe, e.getContainer());
-            mevent.dispatch(delegate);
+            MessagingEvent mevent(PN_MESSAGING_SENDABLE, *pe);
+            delegate.onSendable(mevent);;
         }
    }
 }
@@ -85,32 +88,60 @@ void MessagingAdapter::onDelivery(Event &e) {
         if (pn_link_is_receiver(lnk)) {
             if (!pn_delivery_partial(dlv) && pn_delivery_readable(dlv)) {
                 // generate onMessage
-                MessagingEvent mevent(PN_MESSAGING_MESSAGE, pe, pe->getContainer());
+                MessagingEvent mevent(PN_MESSAGING_MESSAGE, *pe);
                 Message m(receiveMessage(lnk, dlv));
                 mevent.setMessage(m);
-                // TODO: check if endpoint closed...
-                mevent.dispatch(delegate);
-                // only do auto accept for now
-                pn_delivery_update(dlv, PN_ACCEPTED);
-                pn_delivery_settle(dlv);
-                // TODO: generate onSettled
+                if (pn_link_state(lnk) & PN_LOCAL_CLOSED) {
+                    if (autoAccept) {
+                        pn_delivery_update(dlv, PN_RELEASED);
+                        pn_delivery_settle(dlv);
+                    }
+                }
+                else {
+                    try {
+                        delegate.onMessage(mevent);
+                        if (autoAccept) {
+                            pn_delivery_update(dlv, PN_ACCEPTED);
+                            pn_delivery_settle(dlv);
+                        }
+                    }
+                    catch (MessageReject &) {
+                        pn_delivery_update(dlv, PN_REJECTED);
+                        pn_delivery_settle(dlv);
+                    }
+                    catch (MessageRelease &) {
+                        pn_delivery_update(dlv, PN_REJECTED);
+                        pn_delivery_settle(dlv);
+                    }
+                }
+            }
+            else if (pn_delivery_updated(dlv) && pn_delivery_settled(dlv)) {
+                MessagingEvent mevent(PN_MESSAGING_SETTLED, *pe);
+                delegate.onSettled(mevent);
             }
         } else {
             // Sender
             if (pn_delivery_updated(dlv)) {
                 uint64_t rstate = pn_delivery_remote_state(dlv);
-                if (rstate == PN_ACCEPTED)
-                    // generate onAccepted
-                    MessagingEvent(PN_MESSAGING_ACCEPTED, pe, pe->getContainer()).dispatch(delegate);
-                else if (rstate = PN_REJECTED)
-                    MessagingEvent(PN_MESSAGING_REJECTED, pe, pe->getContainer()).dispatch(delegate);
-                else if (rstate == PN_RELEASED || rstate == PN_MODIFIED)
-                    MessagingEvent(PN_MESSAGING_RELEASED, pe, pe->getContainer()).dispatch(delegate);
-
-                if (pn_delivery_settled(dlv))
-                    MessagingEvent(PN_MESSAGING_SETTLED, pe, pe->getContainer()).dispatch(delegate);
-
-                pn_delivery_settle(dlv); // TODO: only if auto settled
+                if (rstate == PN_ACCEPTED) {
+                    MessagingEvent mevent(PN_MESSAGING_ACCEPTED, *pe);
+                    delegate.onAccepted(mevent);
+                }
+                else if (rstate = PN_REJECTED) {
+                    MessagingEvent mevent(PN_MESSAGING_REJECTED, *pe);
+                    delegate.onRejected(mevent);
+                }
+                else if (rstate == PN_RELEASED || rstate == PN_MODIFIED) {
+                    MessagingEvent mevent(PN_MESSAGING_RELEASED, *pe);
+                    delegate.onReleased(mevent);
+                }
+
+                if (pn_delivery_settled(dlv)) {
+                    MessagingEvent mevent(PN_MESSAGING_SETTLED, *pe);
+                    delegate.onSettled(mevent);
+                }
+                if (autoSettle)
+                    pn_delivery_settle(dlv);
             }
         }
     }
@@ -140,52 +171,247 @@ bool isRemoteClosed(pn_state_t state) {
 
 } // namespace
 
+void MessagingAdapter::onLinkRemoteClose(Event &e) {
+    ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
+    if (pe) {
+        pn_event_t *cevent = pe->getPnEvent();
+        pn_link_t *lnk = pn_event_link(cevent);
+        pn_state_t state = pn_link_state(lnk);
+        if (pn_condition_is_set(pn_link_remote_condition(lnk))) {
+            MessagingEvent mevent(PN_MESSAGING_LINK_ERROR, *pe);
+            onLinkError(mevent);
+        }
+        else if (isLocalClosed(state)) {
+            MessagingEvent mevent(PN_MESSAGING_LINK_CLOSED, *pe);
+            onLinkClosed(mevent);
+        }
+        else {
+            MessagingEvent mevent(PN_MESSAGING_LINK_CLOSING, *pe);
+            onLinkClosing(mevent);
+        }
+        pn_link_close(lnk);
+    }
+}
+
+void MessagingAdapter::onSessionRemoteClose(Event &e) {
+    ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
+    if (pe) {
+        pn_event_t *cevent = pe->getPnEvent();
+        pn_session_t *session = pn_event_session(cevent);
+        pn_state_t state = pn_session_state(session);
+        if (pn_condition_is_set(pn_session_remote_condition(session))) {
+            MessagingEvent mevent(PN_MESSAGING_SESSION_ERROR, *pe);
+            onSessionError(mevent);
+        }
+        else if (isLocalClosed(state)) {
+            MessagingEvent mevent(PN_MESSAGING_SESSION_CLOSED, *pe);
+            onSessionClosed(mevent);
+        }
+        else {
+            MessagingEvent mevent(PN_MESSAGING_SESSION_CLOSING, *pe);
+            onSessionClosing(mevent);
+        }
+        pn_session_close(session);
+    }
+}
+
 void MessagingAdapter::onConnectionRemoteClose(Event &e) {
     ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
     if (pe) {
         pn_event_t *cevent = pe->getPnEvent();
-        pn_connection_t *conn = pn_event_connection(cevent);
-        // TODO: remote condition -> error
-        if (isLocalClosed(pn_connection_state(conn))) {
-            MessagingEvent(PN_MESSAGING_CONNECTION_CLOSED, pe, pe->getContainer()).dispatch(delegate);
+        pn_connection_t *connection = pn_event_connection(cevent);
+        pn_state_t state = pn_connection_state(connection);
+        if (pn_condition_is_set(pn_connection_remote_condition(connection))) {
+            MessagingEvent mevent(PN_MESSAGING_CONNECTION_ERROR, *pe);
+            onConnectionError(mevent);
+        }
+        else if (isLocalClosed(state)) {
+            MessagingEvent mevent(PN_MESSAGING_CONNECTION_CLOSED, *pe);
+            onConnectionClosed(mevent);
         }
         else {
-            MessagingEvent(PN_MESSAGING_CONNECTION_CLOSING, pe, pe->getContainer()).dispatch(delegate);
+            MessagingEvent mevent(PN_MESSAGING_CONNECTION_CLOSING, *pe);
+            onConnectionClosing(mevent);
+        }
+        pn_connection_close(connection);
+    }
+}
+
+void MessagingAdapter::onConnectionLocalOpen(Event &e) {
+    ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
+    if (pe) {
+        pn_connection_t *connection = pn_event_connection(pe->getPnEvent());
+        if (isRemoteOpen(pn_connection_state(connection))) {
+            MessagingEvent mevent(PN_MESSAGING_CONNECTION_OPENED, *pe);
+            onConnectionOpened(mevent);
+        }
+    }
+}
+
+void MessagingAdapter::onConnectionRemoteOpen(Event &e) {
+    ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
+    if (pe) {
+        pn_connection_t *connection = pn_event_connection(pe->getPnEvent());
+        if (isLocalOpen(pn_connection_state(connection))) {
+            MessagingEvent mevent(PN_MESSAGING_CONNECTION_OPENED, *pe);
+            onConnectionOpened(mevent);
+        }
+        else if (isLocalUnititialised(pn_connection_state(connection))) {
+            MessagingEvent mevent(PN_MESSAGING_CONNECTION_OPENING, *pe);
+            onConnectionOpening(mevent);
+            pn_connection_open(connection);
+        }
+    }
+}
+
+void MessagingAdapter::onSessionLocalOpen(Event &e) {
+    ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
+    if (pe) {
+        pn_session_t *session = pn_event_session(pe->getPnEvent());
+        if (isRemoteOpen(pn_session_state(session))) {
+            MessagingEvent mevent(PN_MESSAGING_SESSION_OPENED, *pe);
+            onSessionOpened(mevent);
+        }
+    }
+}
+
+void MessagingAdapter::onSessionRemoteOpen(Event &e) {
+    ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
+    if (pe) {
+        pn_session_t *session = pn_event_session(pe->getPnEvent());
+        if (isLocalOpen(pn_session_state(session))) {
+            MessagingEvent mevent(PN_MESSAGING_SESSION_OPENED, *pe);
+            onSessionOpened(mevent);
+        }
+        else if (isLocalUnititialised(pn_session_state(session))) {
+            MessagingEvent mevent(PN_MESSAGING_SESSION_OPENING, *pe);
+            onSessionOpening(mevent);
+            pn_session_open(session);
         }
-        pn_connection_close(conn);
     }
 }
 
+void MessagingAdapter::onLinkLocalOpen(Event &e) {
+    ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
+    if (pe) {
+        pn_link_t *link = pn_event_link(pe->getPnEvent());
+        if (isRemoteOpen(pn_link_state(link))) {
+            MessagingEvent mevent(PN_MESSAGING_LINK_OPENED, *pe);
+            onLinkOpened(mevent);
+        }
+    }
+}
 
 void MessagingAdapter::onLinkRemoteOpen(Event &e) {
     ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
     if (pe) {
-        pn_event_t *cevent = pe->getPnEvent();
-        pn_link_t *link = pn_event_link(cevent);
-        // TODO: remote condition -> error
+        pn_link_t *link = pn_event_link(pe->getPnEvent());
         if (isLocalOpen(pn_link_state(link))) {
-            MessagingEvent(PN_MESSAGING_LINK_OPENED, pe, pe->getContainer()).dispatch(delegate);
+            MessagingEvent mevent(PN_MESSAGING_LINK_OPENED, *pe);
+            onLinkOpened(mevent);
         }
         else if (isLocalUnititialised(pn_link_state(link))) {
-            MessagingEvent(PN_MESSAGING_LINK_OPENING, pe, pe->getContainer()).dispatch(delegate);
+            MessagingEvent mevent(PN_MESSAGING_LINK_OPENING, *pe);
+            onLinkOpening(mevent);
             pn_link_open(link);
         }
     }
 }
 
+void MessagingAdapter::onTransportTailClosed(Event &e) {
+    ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
+    if (pe) {
+        pn_connection_t *conn = pn_event_connection(pe->getPnEvent());
+        if (conn && isLocalOpen(pn_connection_state(conn))) {
+            MessagingEvent mevent(PN_MESSAGING_DISCONNECTED, *pe);
+            delegate.onDisconnected(mevent);
+        }
+    }
+}
+
 
-void MessagingAdapter::onUnhandled(Event &e) {
-    // Until this code fleshes out closer to python's, cheat a bit with a pn_handshaker
+void MessagingAdapter::onConnectionOpened(Event &e) {
+    delegate.onConnectionOpened(e);
+}
 
+void MessagingAdapter::onSessionOpened(Event &e) {
+    delegate.onSessionOpened(e);
+}
+
+void MessagingAdapter::onLinkOpened(Event &e) {
+    delegate.onLinkOpened(e);
+}
+
+void MessagingAdapter::onConnectionOpening(Event &e) {
+    delegate.onConnectionOpening(e);
+}
+
+void MessagingAdapter::onSessionOpening(Event &e) {
+    delegate.onSessionOpening(e);
+}
+
+void MessagingAdapter::onLinkOpening(Event &e) {
+    delegate.onLinkOpening(e);
+}
+
+void MessagingAdapter::onConnectionError(Event &e) {
+    delegate.onConnectionError(e);
     ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
     if (pe) {
-        pn_event_type_t type = (pn_event_type_t) pe->getType();
-        if (type != PN_EVENT_NONE) {
-            pn_handler_dispatch(handshaker, pe->getPnEvent(), type);
-        }
+        pn_connection_t *connection = pn_event_connection(pe->getPnEvent());
+        pn_connection_close(connection);
+    }
+}
+
+void MessagingAdapter::onSessionError(Event &e) {
+    delegate.onSessionError(e);
+    ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
+    if (pe) {
+        pn_session_t *session = pn_event_session(pe->getPnEvent());
+        pn_session_close(session);
     }
 }
 
+void MessagingAdapter::onLinkError(Event &e) {
+    delegate.onLinkError(e);
+    ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e);
+    if (pe) {
+        pn_link_t *link = pn_event_link(pe->getPnEvent());
+        pn_link_close(link);
+    }
+}
+
+void MessagingAdapter::onConnectionClosed(Event &e) {
+    delegate.onConnectionClosed(e);
+}
 
+void MessagingAdapter::onSessionClosed(Event &e) {
+    delegate.onSessionClosed(e);
+}
+
+void MessagingAdapter::onLinkClosed(Event &e) {
+    delegate.onLinkClosed(e);
+}
+
+void MessagingAdapter::onConnectionClosing(Event &e) {
+    delegate.onConnectionClosing(e);
+    if (peerCloseIsError)
+        onConnectionError(e);
+}
+
+void MessagingAdapter::onSessionClosing(Event &e) {
+    delegate.onSessionClosing(e);
+    if (peerCloseIsError)
+        onSessionError(e);
+}
+
+void MessagingAdapter::onLinkClosing(Event &e) {
+    delegate.onLinkClosing(e);
+    if (peerCloseIsError)
+        onLinkError(e);
+}
+
+void MessagingAdapter::onUnhandled(Event &e) {
+}
 
 }} // namespace proton::reactor

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/MessagingEvent.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/MessagingEvent.cpp b/proton-c/bindings/cpp/src/MessagingEvent.cpp
index bcfb721..b8a2f8a 100644
--- a/proton-c/bindings/cpp/src/MessagingEvent.cpp
+++ b/proton-c/bindings/cpp/src/MessagingEvent.cpp
@@ -24,6 +24,7 @@
 #include "proton/link.h"
 
 #include "proton/cpp/MessagingEvent.h"
+#include "proton/cpp/Message.h"
 #include "proton/cpp/ProtonHandler.h"
 #include "proton/cpp/MessagingHandler.h"
 #include "proton/cpp/exceptions.h"
@@ -37,8 +38,8 @@ MessagingEvent::MessagingEvent(pn_event_t *ce, pn_event_type_t t, Container &c)
     ProtonEvent(ce, t, c), messagingType(PN_MESSAGING_PROTON), parentEvent(0), message(0)
 {}
 
-MessagingEvent::MessagingEvent(MessagingEventType_t t, ProtonEvent *p, Container &c) :
-    ProtonEvent(NULL, PN_EVENT_NONE, c), messagingType(t), parentEvent(p), message(0) {
+MessagingEvent::MessagingEvent(MessagingEventType_t t, ProtonEvent &p) :
+    ProtonEvent(NULL, PN_EVENT_NONE, p.getContainer()), messagingType(t), parentEvent(&p), message(0) {
     if (messagingType == PN_MESSAGING_PROTON)
         throw ProtonException(MSG("invalid messaging event type"));
 }
@@ -80,16 +81,18 @@ Link MessagingEvent::getLink() {
 }
 
 Message MessagingEvent::getMessage() {
-    if (message)
-        return *message;
+    if (parentEvent) {
+        pn_message_t *m = getEventContext(parentEvent->getPnEvent());
+        if (m)
+            return Message(m);
+    }
     throw ProtonException(MSG("No message context for event"));
 }
 
 void MessagingEvent::setMessage(Message &m) {
-    if (messagingType != PN_MESSAGING_MESSAGE)
+    if (messagingType != PN_MESSAGING_MESSAGE || !parentEvent)
         throw ProtonException(MSG("Event type does not provide message"));
-    delete message;
-    message = new Message(m);
+    setEventContext(parentEvent->getPnEvent(), m.getPnMessage());
 }
 
 void MessagingEvent::dispatch(Handler &h) {
@@ -112,9 +115,23 @@ void MessagingEvent::dispatch(Handler &h) {
 
         case PN_MESSAGING_CONNECTION_CLOSING:     handler->onConnectionClosing(*this); break;
         case PN_MESSAGING_CONNECTION_CLOSED:      handler->onConnectionClosed(*this); break;
+        case PN_MESSAGING_CONNECTION_ERROR:       handler->onConnectionError(*this); break;
+        case PN_MESSAGING_CONNECTION_OPENING:     handler->onConnectionOpening(*this); break;
+        case PN_MESSAGING_CONNECTION_OPENED:      handler->onConnectionOpened(*this); break;
+
+        case PN_MESSAGING_LINK_CLOSED:            handler->onLinkClosed(*this); break;
+        case PN_MESSAGING_LINK_CLOSING:           handler->onLinkClosing(*this); break;
+        case PN_MESSAGING_LINK_ERROR:             handler->onLinkError(*this); break;
         case PN_MESSAGING_LINK_OPENING:           handler->onLinkOpening(*this); break;
         case PN_MESSAGING_LINK_OPENED:            handler->onLinkOpened(*this); break;
 
+        case PN_MESSAGING_SESSION_CLOSED:         handler->onSessionClosed(*this); break;
+        case PN_MESSAGING_SESSION_CLOSING:        handler->onSessionClosing(*this); break;
+        case PN_MESSAGING_SESSION_ERROR:          handler->onSessionError(*this); break;
+        case PN_MESSAGING_SESSION_OPENING:        handler->onSessionOpening(*this); break;
+        case PN_MESSAGING_SESSION_OPENED:         handler->onSessionOpened(*this); break;
+
+        case PN_MESSAGING_TRANSPORT_CLOSED:       handler->onTransportClosed(*this); break;
         default:
             throw ProtonException(MSG("Unkown messaging event type " << messagingType));
             break;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/MessagingHandler.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/MessagingHandler.cpp b/proton-c/bindings/cpp/src/MessagingHandler.cpp
index 7cb48f6..6066b07 100644
--- a/proton-c/bindings/cpp/src/MessagingHandler.cpp
+++ b/proton-c/bindings/cpp/src/MessagingHandler.cpp
@@ -30,15 +30,17 @@ MessagingHandler::~MessagingHandler(){};
 void MessagingHandler::onAbort(Event &e) { onUnhandled(e); }
 void MessagingHandler::onAccepted(Event &e) { onUnhandled(e); }
 void MessagingHandler::onCommit(Event &e) { onUnhandled(e); }
-void MessagingHandler::onConnectionClose(Event &e) { onUnhandled(e); }
 void MessagingHandler::onConnectionClosed(Event &e) { onUnhandled(e); }
 void MessagingHandler::onConnectionClosing(Event &e) { onUnhandled(e); }
-void MessagingHandler::onConnectionOpen(Event &e) { onUnhandled(e); }
+void MessagingHandler::onConnectionError(Event &e) { onUnhandled(e); }
 void MessagingHandler::onConnectionOpened(Event &e) { onUnhandled(e); }
+void MessagingHandler::onConnectionOpening(Event &e) { onUnhandled(e); }
 void MessagingHandler::onDisconnected(Event &e) { onUnhandled(e); }
 void MessagingHandler::onFetch(Event &e) { onUnhandled(e); }
 void MessagingHandler::onIdLoaded(Event &e) { onUnhandled(e); }
+void MessagingHandler::onLinkClosed(Event &e) { onUnhandled(e); }
 void MessagingHandler::onLinkClosing(Event &e) { onUnhandled(e); }
+void MessagingHandler::onLinkError(Event &e) { onUnhandled(e); }
 void MessagingHandler::onLinkOpened(Event &e) { onUnhandled(e); }
 void MessagingHandler::onLinkOpening(Event &e) { onUnhandled(e); }
 void MessagingHandler::onMessage(Event &e) { onUnhandled(e); }
@@ -50,11 +52,17 @@ void MessagingHandler::onReleased(Event &e) { onUnhandled(e); }
 void MessagingHandler::onRequest(Event &e) { onUnhandled(e); }
 void MessagingHandler::onResponse(Event &e) { onUnhandled(e); }
 void MessagingHandler::onSendable(Event &e) { onUnhandled(e); }
+void MessagingHandler::onSessionClosed(Event &e) { onUnhandled(e); }
+void MessagingHandler::onSessionClosing(Event &e) { onUnhandled(e); }
+void MessagingHandler::onSessionError(Event &e) { onUnhandled(e); }
+void MessagingHandler::onSessionOpened(Event &e) { onUnhandled(e); }
+void MessagingHandler::onSessionOpening(Event &e) { onUnhandled(e); }
 void MessagingHandler::onSettled(Event &e) { onUnhandled(e); }
 void MessagingHandler::onStart(Event &e) { onUnhandled(e); }
 void MessagingHandler::onTimer(Event &e) { onUnhandled(e); }
 void MessagingHandler::onTransactionAborted(Event &e) { onUnhandled(e); }
 void MessagingHandler::onTransactionCommitted(Event &e) { onUnhandled(e); }
 void MessagingHandler::onTransactionDeclared(Event &e) { onUnhandled(e); }
+void MessagingHandler::onTransportClosed(Event &e) { onUnhandled(e); }
 
 }} // namespace proton::reactor

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/Session.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Session.cpp b/proton-c/bindings/cpp/src/Session.cpp
index d2b01dd..4333dcf 100644
--- a/proton-c/bindings/cpp/src/Session.cpp
+++ b/proton-c/bindings/cpp/src/Session.cpp
@@ -30,34 +30,43 @@
 namespace proton {
 namespace reactor {
 
+template class ProtonHandle<pn_session_t>;
+typedef ProtonImplRef<Session> PI;
 
-Session::Session(pn_session_t *s) : pnSession(s)
-{
-    pn_incref(pnSession);
+Session::Session(pn_session_t *p) {
+    PI::ctor(*this, p);
+}
+Session::Session() {
+    PI::ctor(*this, 0);
+}
+Session::Session(const Session& c) : ProtonHandle<pn_session_t>() {
+    PI::copy(*this, c);
+}
+Session& Session::operator=(const Session& c) {
+    return PI::assign(*this, c);
 }
-
 Session::~Session() {
-    pn_decref(pnSession);
+    PI::dtor(*this);
 }
 
-pn_session_t *Session::getPnSession() { return pnSession; }
+pn_session_t *Session::getPnSession() { return impl; }
 
 void Session::open() {
-    pn_session_open(pnSession);
+    pn_session_open(impl);
 }
 
 Connection &Session::getConnection() {
-    pn_connection_t *c = pn_session_connection(pnSession);
+    pn_connection_t *c = pn_session_connection(impl);
     return ConnectionImpl::getReactorReference(c);
 }
 
 Receiver Session::createReceiver(std::string name) {
-    pn_link_t *link = pn_receiver(pnSession, name.c_str());
+    pn_link_t *link = pn_receiver(impl, name.c_str());
     return Receiver(link);
 }
 
 Sender Session::createSender(std::string name) {
-    pn_link_t *link = pn_sender(pnSession, name.c_str());
+    pn_link_t *link = pn_sender(impl, name.c_str());
     return Sender(link);
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/contexts.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/contexts.cpp b/proton-c/bindings/cpp/src/contexts.cpp
index b1dec49..56483ff 100644
--- a/proton-c/bindings/cpp/src/contexts.cpp
+++ b/proton-c/bindings/cpp/src/contexts.cpp
@@ -23,13 +23,13 @@
 #include "proton/cpp/exceptions.h"
 #include "Msg.h"
 #include "proton/object.h"
+#include "proton/message.h"
 #include "proton/session.h"
 #include "proton/link.h"
 
 PN_HANDLE(PNI_CPP_CONNECTION_CONTEXT)
-PN_HANDLE(PNI_CPP_SESSION_CONTEXT)
-PN_HANDLE(PNI_CPP_LINK_CONTEXT)
 PN_HANDLE(PNI_CPP_CONTAINER_CONTEXT)
+PN_HANDLE(PNI_CPP_EVENT_CONTEXT)
 
 namespace proton {
 namespace reactor {
@@ -39,7 +39,6 @@ void setConnectionContext(pn_connection_t *pnConnection, ConnectionImpl *connect
     pn_record_def(record, PNI_CPP_CONNECTION_CONTEXT, PN_VOID);
     pn_record_set(record, PNI_CPP_CONNECTION_CONTEXT, connection);
 }
-
 ConnectionImpl *getConnectionContext(pn_connection_t *pnConnection) {
     if (!pnConnection) return NULL;
     pn_record_t *record = pn_connection_attachments(pnConnection);
@@ -48,40 +47,11 @@ ConnectionImpl *getConnectionContext(pn_connection_t *pnConnection) {
 }
 
 
-void setSessionContext(pn_session_t *pnSession, Session *session) {
-    pn_record_t *record = pn_session_attachments(pnSession);
-    pn_record_def(record, PNI_CPP_SESSION_CONTEXT, PN_VOID);
-    pn_record_set(record, PNI_CPP_SESSION_CONTEXT, session);
-}
-
-Session *getSessionContext(pn_session_t *pnSession) {
-    if (!pnSession) return NULL;
-    pn_record_t *record = pn_session_attachments(pnSession);
-    Session *p = (Session *) pn_record_get(record, PNI_CPP_SESSION_CONTEXT);
-    return p;
-}
-
-
-void setLinkContext(pn_link_t *pnLink, Link *link) {
-    pn_record_t *record = pn_link_attachments(pnLink);
-    pn_record_def(record, PNI_CPP_LINK_CONTEXT, PN_VOID);
-    pn_record_set(record, PNI_CPP_LINK_CONTEXT, link);
-}
-
-Link *getLinkContext(pn_link_t *pnLink) {
-    if (!pnLink) return NULL;
-    pn_record_t *record = pn_link_attachments(pnLink);
-    Link *p = (Link *) pn_record_get(record, PNI_CPP_LINK_CONTEXT);
-    return p;
-}
-
-
 void setContainerContext(pn_reactor_t *pnReactor, ContainerImpl *container) {
     pn_record_t *record = pn_reactor_attachments(pnReactor);
     pn_record_def(record, PNI_CPP_CONTAINER_CONTEXT, PN_VOID);
     pn_record_set(record, PNI_CPP_CONTAINER_CONTEXT, container);
 }
-
 ContainerImpl *getContainerContext(pn_reactor_t *pnReactor) {
     pn_record_t *record = pn_reactor_attachments(pnReactor);
     ContainerImpl *p = (ContainerImpl *) pn_record_get(record, PNI_CPP_CONTAINER_CONTEXT);
@@ -89,4 +59,17 @@ ContainerImpl *getContainerContext(pn_reactor_t *pnReactor) {
     return p;
 }
 
+void setEventContext(pn_event_t *pnEvent, pn_message_t *m) {
+    pn_record_t *record = pn_event_attachments(pnEvent);
+    pn_record_def(record, PNI_CPP_EVENT_CONTEXT, PN_OBJECT); // refcount it for life of the event
+    pn_record_set(record, PNI_CPP_EVENT_CONTEXT, m);
+}
+pn_message_t *getEventContext(pn_event_t *pnEvent) {
+    if (!pnEvent) return NULL;
+    pn_record_t *record = pn_event_attachments(pnEvent);
+    pn_message_t *p = (pn_message_t *) pn_record_get(record, PNI_CPP_EVENT_CONTEXT);
+    return p;
+}
+
+
 }} // namespace proton::reactor

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/contexts.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/contexts.h b/proton-c/bindings/cpp/src/contexts.h
index c04a77a..e1b5f24 100644
--- a/proton-c/bindings/cpp/src/contexts.h
+++ b/proton-c/bindings/cpp/src/contexts.h
@@ -23,6 +23,7 @@
  */
 #include "proton/reactor.h"
 #include "proton/connection.h"
+#include "proton/message.h"
 
 namespace proton {
 namespace reactor {
@@ -43,6 +44,9 @@ class ContainerImpl;
 void setContainerContext(pn_reactor_t *pnReactor, ContainerImpl *container);
 ContainerImpl *getContainerContext(pn_reactor_t *pnReactor);
 
+void setEventContext(pn_event_t *pnEvent, pn_message_t *m);
+pn_message_t *getEventContext(pn_event_t *pnEvent);
+
 }} // namespace proton::reactor
 
 #endif  /*!PROTON_CPP_CONTEXTS_H*/


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message