Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 52D2B1020D for ; Tue, 5 May 2015 13:49:37 +0000 (UTC) Received: (qmail 22023 invoked by uid 500); 5 May 2015 13:49:37 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 21996 invoked by uid 500); 5 May 2015 13:49:37 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 21983 invoked by uid 99); 5 May 2015 13:49:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 May 2015 13:49:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EDA13DFFEF; Tue, 5 May 2015 13:49:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cliffjansen@apache.org To: commits@qpid.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: qpid-proton git commit: PROTON-865: most of MessagingAdapter in place, SimpleSend/Recv Date: Tue, 5 May 2015 13:49:36 +0000 (UTC) Repository: qpid-proton Updated Branches: refs/heads/cjansen-cpp-client 177def134 -> 5ce5f282f PROTON-865: most of MessagingAdapter in place, SimpleSend/Recv Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5ce5f282 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5ce5f282 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5ce5f282 Branch: refs/heads/cjansen-cpp-client Commit: 5ce5f282f5115941c44df6f91db65bc5136f1008 Parents: 177def1 Author: Clifford Jansen Authored: Tue May 5 06:48:48 2015 -0700 Committer: Clifford Jansen Committed: Tue May 5 06:48:48 2015 -0700 ---------------------------------------------------------------------- proton-c/bindings/cpp/CMakeLists.txt | 4 + proton-c/bindings/cpp/examples/SimpleRecv.cpp | 109 +++++++ proton-c/bindings/cpp/examples/SimpleSend.cpp | 117 +++++++ .../bindings/cpp/include/proton/cpp/Container.h | 1 + .../bindings/cpp/include/proton/cpp/Message.h | 23 +- .../cpp/include/proton/cpp/MessagingAdapter.h | 30 +- .../cpp/include/proton/cpp/MessagingEvent.h | 16 +- .../cpp/include/proton/cpp/MessagingHandler.h | 13 +- .../bindings/cpp/include/proton/cpp/Session.h | 8 +- proton-c/bindings/cpp/src/Connection.cpp | 2 +- proton-c/bindings/cpp/src/ConnectionImpl.cpp | 14 +- proton-c/bindings/cpp/src/Connector.cpp | 5 +- proton-c/bindings/cpp/src/Container.cpp | 4 + proton-c/bindings/cpp/src/ContainerImpl.cpp | 87 +++-- proton-c/bindings/cpp/src/ContainerImpl.h | 1 + proton-c/bindings/cpp/src/Link.cpp | 4 +- proton-c/bindings/cpp/src/Message.cpp | 194 ++++++++++-- proton-c/bindings/cpp/src/MessagingAdapter.cpp | 316 ++++++++++++++++--- proton-c/bindings/cpp/src/MessagingEvent.cpp | 31 +- proton-c/bindings/cpp/src/MessagingHandler.cpp | 12 +- proton-c/bindings/cpp/src/Session.cpp | 29 +- proton-c/bindings/cpp/src/contexts.cpp | 47 +-- proton-c/bindings/cpp/src/contexts.h | 4 + 23 files changed, 900 insertions(+), 171 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt index d83bd93..08c9418 100644 --- a/proton-c/bindings/cpp/CMakeLists.txt +++ b/proton-c/bindings/cpp/CMakeLists.txt @@ -93,6 +93,10 @@ add_executable (HelloWorld examples/HelloWorld.cpp) target_link_libraries (HelloWorld qpid-proton-cpp) add_executable (HelloWorldDirect examples/HelloWorldDirect.cpp) target_link_libraries (HelloWorldDirect qpid-proton-cpp) +add_executable (SimpleRecv examples/SimpleRecv.cpp) +target_link_libraries (SimpleRecv qpid-proton-cpp) +add_executable (SimpleSend examples/SimpleSend.cpp) +target_link_libraries (SimpleSend qpid-proton-cpp) install (TARGETS qpid-proton-cpp EXPORT proton http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/examples/SimpleRecv.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/examples/SimpleRecv.cpp b/proton-c/bindings/cpp/examples/SimpleRecv.cpp new file mode 100644 index 0000000..22778f0 --- /dev/null +++ b/proton-c/bindings/cpp/examples/SimpleRecv.cpp @@ -0,0 +1,109 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/cpp/Container.h" +#include "proton/cpp/MessagingHandler.h" +#include "proton/cpp/Link.h" + +#include +#include +#include +#include + + +using namespace proton::reactor; + +class Recv : public MessagingHandler { + private: + std::string url; + int expected; + int received; + public: + + Recv(const std::string &s, int c) : url(s), expected(c), received(0) {} + + void onStart(Event &e) { + e.getContainer().createReceiver(url); + } + + void onMessage(Event &e) { + uint64_t id = 0; + Message msg = e.getMessage(); + if (msg.getIdType() == PN_ULONG) { + id = msg.getId(); + if (id < received) + return; // ignore duplicate + } + if (expected == 0 || received < expected) { + std::cout << '[' << id << "]: " << msg.getBody() << std::endl; + received++; + if (received == expected) { + e.getReceiver().close(); + e.getConnection().close(); + } + } + } +}; + +static void parse_options(int argc, char **argv, int &count, std::string &addr); + +int main(int argc, char **argv) { + int messageCount = 100; + std::string address("localhost:5672/examples"); + parse_options(argc, argv, messageCount, address); + Recv recv(address, messageCount); + Container(recv).run(); +} + + +static void usage() { + std::cout << "Usage: SimpleRecv -m message_count -a address:" << std::endl; + exit (1); +} + + +static void parse_options(int argc, char **argv, int &count, std::string &addr) { + int c, i; + for (i = 1; i < argc; i++) { + if (strlen(argv[i]) == 2 && argv[i][0] == '-') { + c = argv[i][1]; + const char *nextarg = i < argc ? argv[i+1] : NULL; + + switch (c) { + case 'a': + if (!nextarg) usage(); + addr = nextarg; + i++; + break; + case 'm': + if (!nextarg) usage(); + unsigned newc; + if (sscanf( nextarg, "%d", &newc) != 1) usage(); + count = newc; + i++; + break; + default: + usage(); + } + } + else usage(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/examples/SimpleSend.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/examples/SimpleSend.cpp b/proton-c/bindings/cpp/examples/SimpleSend.cpp new file mode 100644 index 0000000..89ca39d --- /dev/null +++ b/proton-c/bindings/cpp/examples/SimpleSend.cpp @@ -0,0 +1,117 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/cpp/Container.h" +#include "proton/cpp/MessagingHandler.h" +#include "proton/cpp/Connection.h" + +#include +#include +#include +#include + + +using namespace proton::reactor; + +class Send : public MessagingHandler { + private: + std::string url; + int sent; + int confirmed; + int total; + public: + + Send(const std::string &s, int c) : url(s), sent(0), confirmed(0), total(c) {} + + void onStart(Event &e) { + e.getContainer().createSender(url); + } + + void onSendable(Event &e) { + Sender sender = e.getSender(); + while (sender.getCredit() && sent < total) { + Message msg; + msg.setId(sent + 1); + // TODO: fancy map body content as in Python example. Simple binary for now. + const char *bin = "some arbitrary binary data"; + msg.setBody(bin, strlen(bin)); + sender.send(msg); + sent++; + } + } + + void onAccepted(Event &e) { + confirmed++; + if (confirmed == total) { + std::cout << "all messages confirmed" << std::endl; + e.getConnection().close(); + } + } + + void onDisconnected(Event &e) { + sent = confirmed; + } +}; + +static void parse_options(int argc, char **argv, int &count, std::string &addr); + +int main(int argc, char **argv) { + int messageCount = 100; + std::string address("localhost:5672/examples"); + parse_options(argc, argv, messageCount, address); + Send send(address, messageCount); + Container(send).run(); +} + + +static void usage() { + std::cout << "Usage: SimpleSend -m message_count -a address:" << std::endl; + exit (1); +} + + +static void parse_options(int argc, char **argv, int &count, std::string &addr) { + int c, i; + for (i = 1; i < argc; i++) { + if (strlen(argv[i]) == 2 && argv[i][0] == '-') { + c = argv[i][1]; + const char *nextarg = i < argc ? argv[i+1] : NULL; + + switch (c) { + case 'a': + if (!nextarg) usage(); + addr = nextarg; + i++; + break; + case 'm': + if (!nextarg) usage(); + unsigned newc; + if (sscanf( nextarg, "%d", &newc) != 1) usage(); + count = newc; + i++; + break; + default: + usage(); + } + } + else usage(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/include/proton/cpp/Container.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/Container.h b/proton-c/bindings/cpp/include/proton/cpp/Container.h index fbb1a83..d596ab1 100644 --- a/proton-c/bindings/cpp/include/proton/cpp/Container.h +++ b/proton-c/bindings/cpp/include/proton/cpp/Container.h @@ -56,6 +56,7 @@ class Container : public Handle PROTON_CPP_EXTERN Sender createSender(Connection &connection, std::string &addr); PROTON_CPP_EXTERN Sender createSender(std::string &url); PROTON_CPP_EXTERN Receiver createReceiver(Connection &connection, std::string &addr); + PROTON_CPP_EXTERN Receiver createReceiver(const std::string &url); PROTON_CPP_EXTERN Acceptor listen(const std::string &url); PROTON_CPP_EXTERN std::string getContainerId(); private: http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/include/proton/cpp/Message.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/Message.h b/proton-c/bindings/cpp/include/proton/cpp/Message.h index 51ca731..ae29ca2 100644 --- a/proton-c/bindings/cpp/include/proton/cpp/Message.h +++ b/proton-c/bindings/cpp/include/proton/cpp/Message.h @@ -22,6 +22,7 @@ * */ #include "proton/cpp/ImportExport.h" +#include "proton/cpp/ProtonHandle.h" #include "proton/message.h" #include @@ -29,22 +30,36 @@ namespace proton { namespace reactor { -class Message +class Message : public ProtonHandle { public: PROTON_CPP_EXTERN Message(); - PROTON_CPP_EXTERN ~Message(); + PROTON_CPP_EXTERN Message(pn_message_t *); PROTON_CPP_EXTERN Message(const Message&); PROTON_CPP_EXTERN Message& operator=(const Message&); + PROTON_CPP_EXTERN ~Message(); + + PROTON_CPP_EXTERN pn_message_t *getPnMessage() const; + + PROTON_CPP_EXTERN void setId(uint64_t id); + PROTON_CPP_EXTERN uint64_t getId(); + PROTON_CPP_EXTERN pn_type_t getIdType(); - PROTON_CPP_EXTERN pn_message_t *getPnMessage(); PROTON_CPP_EXTERN void setBody(const std::string &data); PROTON_CPP_EXTERN std::string getBody(); + + PROTON_CPP_EXTERN void getBody(std::string &str); + + PROTON_CPP_EXTERN void setBody(const char *, size_t len); + PROTON_CPP_EXTERN size_t getBody(char *, size_t len); + PROTON_CPP_EXTERN size_t getBinaryBodySize(); + + PROTON_CPP_EXTERN void encode(std::string &data); PROTON_CPP_EXTERN void decode(const std::string &data); private: - pn_message_t *pnMessage; + friend class ProtonImplRef; }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h b/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h index 8551c9c..ac8b483 100644 --- a/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h +++ b/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h @@ -32,10 +32,10 @@ namespace proton { namespace reactor { -// For now, stands in for Python's: EndpointStateHandler, IncomingMessageHandler, OutgoingMessageHandler +// Combine's Python's: EndpointStateHandler, IncomingMessageHandler, OutgoingMessageHandler -class MessagingAdapter : public ProtonHandler +class MessagingAdapter : public MessagingHandler { public: PROTON_CPP_EXTERN MessagingAdapter(MessagingHandler &delegate); @@ -44,11 +44,37 @@ class MessagingAdapter : public ProtonHandler PROTON_CPP_EXTERN virtual void onLinkFlow(Event &e); PROTON_CPP_EXTERN virtual void onDelivery(Event &e); PROTON_CPP_EXTERN virtual void onUnhandled(Event &e); + PROTON_CPP_EXTERN virtual void onConnectionClosed(Event &e); + PROTON_CPP_EXTERN virtual void onConnectionClosing(Event &e); + PROTON_CPP_EXTERN virtual void onConnectionError(Event &e); + PROTON_CPP_EXTERN virtual void onConnectionLocalOpen(Event &e); + PROTON_CPP_EXTERN virtual void onConnectionRemoteOpen(Event &e); PROTON_CPP_EXTERN virtual void onConnectionRemoteClose(Event &e); + PROTON_CPP_EXTERN virtual void onConnectionOpened(Event &e); + PROTON_CPP_EXTERN virtual void onConnectionOpening(Event &e); + PROTON_CPP_EXTERN virtual void onSessionClosed(Event &e); + PROTON_CPP_EXTERN virtual void onSessionClosing(Event &e); + PROTON_CPP_EXTERN virtual void onSessionError(Event &e); + PROTON_CPP_EXTERN virtual void onSessionLocalOpen(Event &e); + PROTON_CPP_EXTERN virtual void onSessionRemoteOpen(Event &e); + PROTON_CPP_EXTERN virtual void onSessionRemoteClose(Event &e); + PROTON_CPP_EXTERN virtual void onSessionOpened(Event &e); + PROTON_CPP_EXTERN virtual void onSessionOpening(Event &e); + PROTON_CPP_EXTERN virtual void onLinkClosed(Event &e); + PROTON_CPP_EXTERN virtual void onLinkClosing(Event &e); + PROTON_CPP_EXTERN virtual void onLinkError(Event &e); + PROTON_CPP_EXTERN virtual void onLinkLocalOpen(Event &e); PROTON_CPP_EXTERN virtual void onLinkRemoteOpen(Event &e); + PROTON_CPP_EXTERN virtual void onLinkRemoteClose(Event &e); + PROTON_CPP_EXTERN virtual void onLinkOpened(Event &e); + PROTON_CPP_EXTERN virtual void onLinkOpening(Event &e); + PROTON_CPP_EXTERN virtual void onTransportTailClosed(Event &e); private: MessagingHandler &delegate; // The actual MessagingHandler pn_handler_t *handshaker; + bool autoSettle; + bool autoAccept; + bool peerCloseIsError; }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/include/proton/cpp/MessagingEvent.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/MessagingEvent.h b/proton-c/bindings/cpp/include/proton/cpp/MessagingEvent.h index d8d5c7f..de79618 100644 --- a/proton-c/bindings/cpp/include/proton/cpp/MessagingEvent.h +++ b/proton-c/bindings/cpp/include/proton/cpp/MessagingEvent.h @@ -37,17 +37,19 @@ typedef enum { PN_MESSAGING_ABORT, PN_MESSAGING_ACCEPTED, PN_MESSAGING_COMMIT, - PN_MESSAGING_CONNECTION_CLOSE, PN_MESSAGING_CONNECTION_CLOSED, PN_MESSAGING_CONNECTION_CLOSING, - PN_MESSAGING_CONNECTION_OPEN, + PN_MESSAGING_CONNECTION_ERROR, PN_MESSAGING_CONNECTION_OPENED, + PN_MESSAGING_CONNECTION_OPENING, PN_MESSAGING_DISCONNECTED, PN_MESSAGING_FETCH, PN_MESSAGING_ID_LOADED, + PN_MESSAGING_LINK_CLOSED, PN_MESSAGING_LINK_CLOSING, PN_MESSAGING_LINK_OPENED, PN_MESSAGING_LINK_OPENING, + PN_MESSAGING_LINK_ERROR, PN_MESSAGING_MESSAGE, PN_MESSAGING_QUIT, PN_MESSAGING_RECORD_INSERTED, @@ -57,19 +59,25 @@ typedef enum { PN_MESSAGING_REQUEST, PN_MESSAGING_RESPONSE, PN_MESSAGING_SENDABLE, + PN_MESSAGING_SESSION_CLOSED, + PN_MESSAGING_SESSION_CLOSING, + PN_MESSAGING_SESSION_OPENED, + PN_MESSAGING_SESSION_OPENING, + PN_MESSAGING_SESSION_ERROR, PN_MESSAGING_SETTLED, PN_MESSAGING_START, PN_MESSAGING_TIMER, PN_MESSAGING_TRANSACTION_ABORTED, PN_MESSAGING_TRANSACTION_COMMITTED, - PN_MESSAGING_TRANSACTION_DECLARED + PN_MESSAGING_TRANSACTION_DECLARED, + PN_MESSAGING_TRANSPORT_CLOSED } MessagingEventType_t; class MessagingEvent : public ProtonEvent { public: MessagingEvent(pn_event_t *ce, pn_event_type_t t, Container &c); - MessagingEvent(MessagingEventType_t t, ProtonEvent *parent, Container &c); + MessagingEvent(MessagingEventType_t t, ProtonEvent &parent); ~MessagingEvent(); virtual PROTON_CPP_EXTERN void dispatch(Handler &h); virtual PROTON_CPP_EXTERN Connection &getConnection(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h b/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h index 875af43..51f679a 100644 --- a/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h +++ b/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h @@ -34,20 +34,23 @@ class PROTON_CPP_EXTERN MessagingHandler : public ProtonHandler { public: PROTON_CPP_EXTERN MessagingHandler(); +//ZZZ PROTON_CPP_EXTERN MessagingHandler(int prefetch=10, bool autoAccept=true, autoSettle=true, peerCloseIsError=false); virtual ~MessagingHandler(); virtual void onAbort(Event &e); virtual void onAccepted(Event &e); virtual void onCommit(Event &e); - virtual void onConnectionClose(Event &e); virtual void onConnectionClosed(Event &e); virtual void onConnectionClosing(Event &e); - virtual void onConnectionOpen(Event &e); + virtual void onConnectionError(Event &e); + virtual void onConnectionOpening(Event &e); virtual void onConnectionOpened(Event &e); virtual void onDisconnected(Event &e); virtual void onFetch(Event &e); virtual void onIdLoaded(Event &e); + virtual void onLinkClosed(Event &e); virtual void onLinkClosing(Event &e); + virtual void onLinkError(Event &e); virtual void onLinkOpened(Event &e); virtual void onLinkOpening(Event &e); virtual void onMessage(Event &e); @@ -59,12 +62,18 @@ class PROTON_CPP_EXTERN MessagingHandler : public ProtonHandler virtual void onRequest(Event &e); virtual void onResponse(Event &e); virtual void onSendable(Event &e); + virtual void onSessionClosed(Event &e); + virtual void onSessionClosing(Event &e); + virtual void onSessionError(Event &e); + virtual void onSessionOpened(Event &e); + virtual void onSessionOpening(Event &e); virtual void onSettled(Event &e); virtual void onStart(Event &e); virtual void onTimer(Event &e); virtual void onTransactionAborted(Event &e); virtual void onTransactionCommitted(Event &e); virtual void onTransactionDeclared(Event &e); + virtual void onTransportClosed(Event &e); }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/include/proton/cpp/Session.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/Session.h b/proton-c/bindings/cpp/include/proton/cpp/Session.h index e556cde..68f5e40 100644 --- a/proton-c/bindings/cpp/include/proton/cpp/Session.h +++ b/proton-c/bindings/cpp/include/proton/cpp/Session.h @@ -27,6 +27,7 @@ #include "proton/types.h" #include "proton/link.h" +#include "ProtonImplRef.h" #include struct pn_connection_t; @@ -38,19 +39,22 @@ class Container; class Handler; class Transport; -class Session : public Endpoint + class Session : public Endpoint, public ProtonHandle { public: PROTON_CPP_EXTERN Session(pn_session_t *s); + PROTON_CPP_EXTERN Session(); PROTON_CPP_EXTERN ~Session(); PROTON_CPP_EXTERN void open(); + PROTON_CPP_EXTERN Session(const Session&); + PROTON_CPP_EXTERN Session& operator=(const Session&); PROTON_CPP_EXTERN void close(); PROTON_CPP_EXTERN pn_session_t *getPnSession(); virtual PROTON_CPP_EXTERN Connection &getConnection(); Receiver createReceiver(std::string name); Sender createSender(std::string name); private: - pn_session_t *pnSession; + friend class ProtonImplRef; }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/Connection.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Connection.cpp b/proton-c/bindings/cpp/src/Connection.cpp index e85b323..49d171e 100644 --- a/proton-c/bindings/cpp/src/Connection.cpp +++ b/proton-c/bindings/cpp/src/Connection.cpp @@ -35,7 +35,7 @@ namespace reactor { template class Handle; typedef PrivateImplRef PI; -Connection::Connection() {} +Connection::Connection() {PI::ctor(*this, 0); } Connection::Connection(ConnectionImpl* p) { PI::ctor(*this, p); } Connection::Connection(const Connection& c) : Handle() { PI::copy(*this, c); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/ConnectionImpl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/ConnectionImpl.cpp b/proton-c/bindings/cpp/src/ConnectionImpl.cpp index 3b70362..be01f8d 100644 --- a/proton-c/bindings/cpp/src/ConnectionImpl.cpp +++ b/proton-c/bindings/cpp/src/ConnectionImpl.cpp @@ -22,6 +22,7 @@ #include "proton/cpp/Handler.h" #include "proton/cpp/exceptions.h" #include "ConnectionImpl.h" +#include "proton/cpp/Transport.h" #include "Msg.h" #include "contexts.h" @@ -34,7 +35,7 @@ void ConnectionImpl::incref(ConnectionImpl *impl) { impl->refCount++; } -void ConnectionImpl::decref(ConnectionImpl *impl) { +void ConnectionImpl::decref(ConnectionImpl *impl) { impl->refCount--; if (impl->refCount == 0) delete impl; @@ -47,7 +48,10 @@ ConnectionImpl::ConnectionImpl(Container &c) : container(c), refCount(0), overri setConnectionContext(pnConnection, this); } -ConnectionImpl::~ConnectionImpl() {} +ConnectionImpl::~ConnectionImpl() { + delete transport; + delete override; +} Transport &ConnectionImpl::getTransport() { if (transport) @@ -56,7 +60,11 @@ Transport &ConnectionImpl::getTransport() { } Handler* ConnectionImpl::getOverride() { return override; } -void ConnectionImpl::setOverride(Handler *h) { override = h; } +void ConnectionImpl::setOverride(Handler *h) { + if (override) + delete override; + override = h; +} void ConnectionImpl::open() { pn_connection_open(pnConnection); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/Connector.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Connector.cpp b/proton-c/bindings/cpp/src/Connector.cpp index 6885575..8ebdfb6 100644 --- a/proton-c/bindings/cpp/src/Connector.cpp +++ b/proton-c/bindings/cpp/src/Connector.cpp @@ -62,15 +62,14 @@ void Connector::onConnectionRemoteOpen(Event &e) { } void Connector::onConnectionInit(Event &e) { - } void Connector::onTransportClosed(Event &e) { // TODO: prepend with reconnect logic PN_CPP_LOG(info, "Disconnected"); - connection.setOverride(0); // No more call backs pn_connection_release(connection.impl->pnConnection); - delete this; + // No more interaction, so drop our counted reference. + connection = Connection(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/Container.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Container.cpp b/proton-c/bindings/cpp/src/Container.cpp index 8e79b15..4eccb15 100644 --- a/proton-c/bindings/cpp/src/Container.cpp +++ b/proton-c/bindings/cpp/src/Container.cpp @@ -74,6 +74,10 @@ Receiver Container::createReceiver(Connection &connection, std::string &addr) { return impl->createReceiver(connection, addr); } +Receiver Container::createReceiver(const std::string &url) { + return impl->createReceiver(url); +} + Acceptor Container::listen(const std::string &urlString) { return impl->listen(urlString); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/ContainerImpl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/ContainerImpl.cpp b/proton-c/bindings/cpp/src/ContainerImpl.cpp index 7ff9c1d..df8c716 100644 --- a/proton-c/bindings/cpp/src/ContainerImpl.cpp +++ b/proton-c/bindings/cpp/src/ContainerImpl.cpp @@ -37,6 +37,7 @@ #include "proton/connection.h" #include "proton/session.h" +#include "proton/handlers.h" namespace proton { namespace reactor { @@ -64,23 +65,20 @@ class CHandler : public Handler pn_decref(pnHandler); } pn_handler_t *getPnHandler() { return pnHandler; } + + virtual void onUnhandled(Event &e) { + ProtonEvent *pne = dynamic_cast(&e); + if (!pne) return; + int type = pne->getType(); + if (!type) return; // Not from the reactor + pn_handler_dispatch(pnHandler, pne->getPnEvent(), (pn_event_type_t) type); + } + private: pn_handler_t *pnHandler; }; -void dispatch(Handler &h, MessagingEvent &e) { - // TODO: also dispatch to add()'ed Handlers - CHandler *chandler; - int type = e.getType(); - if (type && (chandler = dynamic_cast(&h))) { - // event and handler are both native Proton C - pn_handler_dispatch(chandler->getPnHandler(), e.getPnEvent(), (pn_event_type_t) type); - } - else - e.dispatch(h); -} - // Used to sniff for Connector events before the reactor's global handler sees them. class OverrideHandler : public Handler { @@ -109,8 +107,9 @@ class OverrideHandler : public Handler ConnectionImpl *connection = getConnectionContext(conn); if (connection) { Handler *override = connection->getOverride(); - if (override) + if (override) { e.dispatch(*override); + } } } @@ -127,6 +126,29 @@ class OverrideHandler : public Handler } }; + +class CFlowController : public ProtonHandler +{ + public: + pn_handler_t *flowcontroller; + + CFlowController(int window) : flowcontroller(pn_flowcontroller(window)) {} + ~CFlowController() { + pn_decref(flowcontroller); + } + + void redirect(Event &e) { + ProtonEvent *pne = dynamic_cast(&e); + pn_handler_dispatch(flowcontroller, pne->getPnEvent(), (pn_event_type_t) pne->getType()); + } + + virtual void onLinkLocalOpen(Event &e) { redirect(e); } + virtual void onLinkRemoteOpen(Event &e) { redirect(e); } + virtual void onLinkFlow(Event &e) { redirect(e); } + virtual void onDelivery(Event &e) { redirect(e); } +}; + + namespace { // TODO: configurable policy. SessionPerConnection for now. @@ -162,8 +184,8 @@ Handler &getCppHandler(pn_handler_t *c_handler) { void cpp_handler_dispatch(pn_handler_t *c_handler, pn_event_t *cevent, pn_event_type_t type) { - MessagingEvent ev(cevent, type, getContainerRef(c_handler)); - dispatch(getCppHandler(c_handler), ev); + MessagingEvent mevent(cevent, type, getContainerRef(c_handler)); + mevent.dispatch(getCppHandler(c_handler)); } void cpp_handler_cleanup(pn_handler_t *c_handler) @@ -176,7 +198,7 @@ pn_handler_t *cpp_handler(ContainerImpl *c, Handler *h) { pn_handler_t *handler = pn_handler_new(cpp_handler_dispatch, sizeof(struct InboundContext), cpp_handler_cleanup); struct InboundContext *ctxt = (struct InboundContext *) pn_handler_mem(handler); - ctxt->containerRef = Container(c); + new (&ctxt->containerRef) Container(c); ctxt->containerImpl = c; ctxt->cppHandler = h; return handler; @@ -254,6 +276,17 @@ Receiver ContainerImpl::createReceiver(Connection &connection, std::string &addr return rcv; } +Receiver ContainerImpl::createReceiver(const std::string &urlString) { + // TODO: const cleanup of API + Connection conn = connect(const_cast(urlString)); + Session session = getDefaultSession(conn.getPnConnection(), &getImpl(conn)->defaultSession); + std::string path = Url(urlString).getPath(); + Receiver rcv = session.createReceiver(containerId + '-' + path); + pn_terminus_set_address(pn_link_source(rcv.getPnLink()), path.c_str()); + rcv.open(); + return rcv; +} + Acceptor ContainerImpl::acceptor(const std::string &host, const std::string &port) { pn_acceptor_t *acptr = pn_reactor_acceptor(reactor, host.c_str(), port.c_str(), NULL); if (acptr) @@ -271,11 +304,20 @@ Acceptor ContainerImpl::listen(const std::string &urlString) { void ContainerImpl::run() { reactor = pn_reactor(); + // Set our context on the reactor setContainerContext(reactor, this); + int prefetch = 10; // TODO: configurable + Handler *flowController = 0; + + // Set the reactor's main/default handler (see note below) MessagingAdapter messagingAdapter(messagingHandler); + if (prefetch) { + flowController = new CFlowController(prefetch); + messagingHandler.addChildHandler(*flowController); + } messagingHandler.addChildHandler(messagingAdapter); pn_handler_t *cppHandler = cpp_handler(this, &messagingHandler); pn_reactor_set_handler(reactor, cppHandler); @@ -287,15 +329,18 @@ void ContainerImpl::run() { pn_handler_t *cppGlobalHandler = cpp_handler(this, &overrideHandler); pn_reactor_set_global_handler(reactor, cppGlobalHandler); - // Note: we have just set up the following 4 handlers that see events in this order: - // messagingHandler, messagingAdapter, connector override, the reactor's default global - // handler (pn_iohandler) - // TODO: remove fifth pn_handshaker once messagingAdapter matures - + // Note: we have just set up the following 4/5 handlers that see events in this order: + // messagingHandler (Proton C events), pn_flowcontroller (optional), messagingAdapter, + // messagingHandler (Messaging events from the messagingAdapter), connector override, + // the reactor's default globalhandler (pn_iohandler) pn_reactor_run(reactor); + + pn_decref(cppHandler); + pn_decref(cppGlobalHandler); pn_decref(cGlobalHandler); pn_reactor_free(reactor); reactor = 0; + delete(flowController); } }} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/ContainerImpl.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/ContainerImpl.h b/proton-c/bindings/cpp/src/ContainerImpl.h index a14bf52..8a6faba 100644 --- a/proton-c/bindings/cpp/src/ContainerImpl.h +++ b/proton-c/bindings/cpp/src/ContainerImpl.h @@ -49,6 +49,7 @@ class ContainerImpl PROTON_CPP_EXTERN Sender createSender(Connection &connection, std::string &addr); PROTON_CPP_EXTERN Sender createSender(std::string &url); PROTON_CPP_EXTERN Receiver createReceiver(Connection &connection, std::string &addr); + PROTON_CPP_EXTERN Receiver createReceiver(const std::string &url); //ZZZ PROTON_CPP_EXTERN Acceptor listen(const std::string &url); PROTON_CPP_EXTERN std::string getContainerId(); static void incref(ContainerImpl *); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/Link.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Link.cpp b/proton-c/bindings/cpp/src/Link.cpp index 3661129..3356b38 100644 --- a/proton-c/bindings/cpp/src/Link.cpp +++ b/proton-c/bindings/cpp/src/Link.cpp @@ -36,7 +36,7 @@ namespace reactor { namespace { static inline void throwIfNull(pn_link_t *l) { if (!l) throw ProtonException(MSG("Disassociated link")); } - + } template class ProtonHandle; @@ -48,7 +48,7 @@ Link::Link(pn_link_t* p) { if (p) senderLink = pn_link_is_sender(p); } Link::Link() { - PI::ctor(*this, 0); + PI::ctor(*this, 0); } Link::Link(const Link& c) : ProtonHandle() { verifyType(impl); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/Message.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Message.cpp b/proton-c/bindings/cpp/src/Message.cpp index 840a10b..ead6eb1 100644 --- a/proton-c/bindings/cpp/src/Message.cpp +++ b/proton-c/bindings/cpp/src/Message.cpp @@ -22,33 +22,128 @@ #include "proton/cpp/Message.h" #include "proton/cpp/exceptions.h" #include "Msg.h" +#include "ProtonImplRef.h" + +#include namespace proton { namespace reactor { -Message::Message() : pnMessage(pn_message()){} +template class ProtonHandle; +typedef ProtonImplRef PI; -Message::~Message() { - pn_decref(pnMessage); +Message::Message() { + PI::ctor(*this, 0); +} +Message::Message(pn_message_t *p) { + PI::ctor(*this, p); } +Message::Message(const Message& m) : ProtonHandle() { + PI::copy(*this, m); +} +Message& Message::operator=(const Message& m) { + return PI::assign(*this, m); +} +Message::~Message() { PI::dtor(*this); } -Message::Message(const Message& m) : pnMessage(m.pnMessage) { - pn_incref(pnMessage); +namespace { +void confirm(pn_message_t *&p) { + if (p) return; + p = pn_message(); // Correct refcount of 1 + if (!p) + throw ProtonException(MSG("No memory")); } -Message& Message::operator=(const Message& m) { - pnMessage = m.pnMessage; - pn_incref(pnMessage); - return *this; +void getFormatedStringContent(pn_data_t *data, std::string &str) { + pn_data_rewind(data); + size_t sz = str.capacity(); + if (sz < 512) sz = 512; + while (true) { + str.resize(sz); + int err = pn_data_format(data, (char *) str.data(), &sz); + if (err) { + if (err != PN_OVERFLOW) + throw ProtonException(MSG("Unexpected message body data error")); + } + else { + str.resize(sz); + return; + } + sz *= 2; + } +} + +} // namespace + +void Message::setId(uint64_t id) { + confirm(impl); + pn_data_t *data = pn_message_id(impl); + pn_data_clear(data); + if (int err = pn_data_put_ulong(data, id)) + throw ProtonException(MSG("setId error " << err)); +} + +uint64_t Message::getId() { + confirm(impl); + pn_data_t *data = pn_message_id(impl); + pn_data_rewind(data); + if (pn_data_size(data) == 1 && pn_data_next(data) && pn_data_type(data) == PN_ULONG) { + return pn_data_get_ulong(data); + } + throw ProtonException(MSG("Message ID is not a ULONG")); +} + +pn_type_t Message::getIdType() { + confirm(impl); + pn_data_t *data = pn_message_id(impl); + pn_data_rewind(data); + if (pn_data_size(data) == 1 && pn_data_next(data)) { + pn_type_t type = pn_data_type(data); + switch (type) { + case PN_ULONG: + case PN_STRING: + case PN_BINARY: + case PN_UUID: + return type; + break; + default: + break; + } + } + return PN_NULL; } void Message::setBody(const std::string &buf) { - pn_data_t *body = pn_message_body(pnMessage); + confirm(impl); + pn_data_t *body = pn_message_body(impl); + pn_data_clear(body); pn_data_put_string(body, pn_bytes(buf.size(), buf.data())); } +void Message::getBody(std::string &str) { + // User supplied string/buffer + confirm(impl); + pn_data_t *body = pn_message_body(impl); + pn_data_rewind(body); + + if (pn_data_next(body) && pn_data_type(body) == PN_STRING) { + pn_bytes_t bytes= pn_data_get_string(body); + if (!pn_data_next(body)) { + // String data and nothing else + str.resize(bytes.size); + memmove((void *) str.data(), bytes.start, bytes.size); + return; + } + } + + getFormatedStringContent(body, str); +} + std::string Message::getBody() { - pn_data_t *body = pn_message_body(pnMessage); + confirm(impl); + pn_data_t *body = pn_message_body(impl); + pn_data_rewind(body); + if (pn_data_next(body) && pn_data_type(body) == PN_STRING) { pn_bytes_t bytes= pn_data_get_string(body); if (!pn_data_next(body)) { @@ -57,36 +152,73 @@ std::string Message::getBody() { } } - pn_data_rewind(body); std::string str; - size_t sz = 1024; - str.resize(sz); - int err = pn_data_format(body, (char *) str.data(), &sz); - if (err == PN_OVERFLOW) - throw ProtonException(MSG("TODO: sizing loop missing")); - if (err) throw ProtonException(MSG("Unexpected data error")); - str.resize(sz); + getFormatedStringContent(body, str); return str; } +void Message::setBody(const char *bytes, size_t len) { + confirm(impl); + pn_data_t *body = pn_message_body(impl); + pn_data_clear(body); + pn_data_put_binary(body, pn_bytes(len, bytes)); +} + +size_t Message::getBody(char *bytes, size_t len) { + confirm(impl); + pn_data_t *body = pn_message_body(impl); + pn_data_rewind(body); + if (pn_data_size(body) == 1 && pn_data_next(body) && pn_data_type(body) == PN_BINARY) { + pn_bytes_t pnb = pn_data_get_binary(body); + if (len >= pnb.size) { + memmove(bytes, pnb.start, pnb.size); + return pnb.size; + } + throw ProtonException(MSG("Binary buffer too small")); + } + throw ProtonException(MSG("Not simple binary data")); +} + + + +size_t Message::getBinaryBodySize() { + confirm(impl); + pn_data_t *body = pn_message_body(impl); + pn_data_rewind(body); + if (pn_data_size(body) == 1 && pn_data_next(body) && pn_data_type(body) == PN_BINARY) { + pn_bytes_t bytes = pn_data_get_binary(body); + return bytes.size; + } + return 0; +} + + void Message::encode(std::string &s) { - size_t sz = 1024; - if (s.capacity() > sz) - sz = s.capacity(); - else - s.reserve(sz); - s.resize(sz); - int err = pn_message_encode(pnMessage, (char *) s.data(), &sz); - if (err == PN_OVERFLOW) - throw ProtonException(MSG("TODO: fix overflow with dynamic buffer resizing")); - if (err) throw ProtonException(MSG("unexpected error")); - s.resize(sz); + confirm(impl); + size_t sz = s.capacity(); + if (sz < 512) sz = 512; + while (true) { + s.resize(sz); + int err = pn_message_encode(impl, (char *) s.data(), &sz); + if (err) { + if (err != PN_OVERFLOW) + throw ProtonException(MSG("unexpected error")); + } else { + s.resize(sz); + return; + } + sz *= 2; + } } void Message::decode(const std::string &s) { - int err = pn_message_decode(pnMessage, s.data(), s.size()); + confirm(impl); + int err = pn_message_decode(impl, s.data(), s.size()); if (err) throw ProtonException(MSG("unexpected error")); } +pn_message_t *Message::getPnMessage() const { + return impl; +} }} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/MessagingAdapter.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/MessagingAdapter.cpp b/proton-c/bindings/cpp/src/MessagingAdapter.cpp index 9cab2b3..1097305 100644 --- a/proton-c/bindings/cpp/src/MessagingAdapter.cpp +++ b/proton-c/bindings/cpp/src/MessagingAdapter.cpp @@ -28,23 +28,26 @@ #include "proton/handlers.h" #include "proton/delivery.h" #include "proton/connection.h" +#include "proton/session.h" namespace proton { namespace reactor { -MessagingAdapter::MessagingAdapter(MessagingHandler &d) : delegate(d), handshaker(pn_handshaker()) { - pn_handler_t *flowcontroller = pn_flowcontroller(10); - pn_handler_add(handshaker, flowcontroller); - pn_decref(flowcontroller); +MessagingAdapter::MessagingAdapter(MessagingHandler &d) : delegate(d), handshaker(pn_handshaker()), + autoSettle(true), autoAccept(true), + peerCloseIsError(false) { }; MessagingAdapter::~MessagingAdapter(){ pn_decref(handshaker); }; + void MessagingAdapter::onReactorInit(Event &e) { - // create onStart extended event - MessagingEvent mevent(PN_MESSAGING_START, NULL, e.getContainer()); - mevent.dispatch(delegate); + ProtonEvent *pe = dynamic_cast(&e); + if (pe) { + MessagingEvent mevent(PN_MESSAGING_START, *pe); + delegate.onStart(mevent); + } } void MessagingAdapter::onLinkFlow(Event &e) { @@ -54,8 +57,8 @@ void MessagingAdapter::onLinkFlow(Event &e) { pn_link_t *lnk = pn_event_link(pne); if (lnk && pn_link_is_sender(lnk) && pn_link_credit(lnk) > 0) { // create onMessage extended event - MessagingEvent mevent(PN_MESSAGING_SENDABLE, pe, e.getContainer()); - mevent.dispatch(delegate); + MessagingEvent mevent(PN_MESSAGING_SENDABLE, *pe); + delegate.onSendable(mevent);; } } } @@ -85,32 +88,60 @@ void MessagingAdapter::onDelivery(Event &e) { if (pn_link_is_receiver(lnk)) { if (!pn_delivery_partial(dlv) && pn_delivery_readable(dlv)) { // generate onMessage - MessagingEvent mevent(PN_MESSAGING_MESSAGE, pe, pe->getContainer()); + MessagingEvent mevent(PN_MESSAGING_MESSAGE, *pe); Message m(receiveMessage(lnk, dlv)); mevent.setMessage(m); - // TODO: check if endpoint closed... - mevent.dispatch(delegate); - // only do auto accept for now - pn_delivery_update(dlv, PN_ACCEPTED); - pn_delivery_settle(dlv); - // TODO: generate onSettled + if (pn_link_state(lnk) & PN_LOCAL_CLOSED) { + if (autoAccept) { + pn_delivery_update(dlv, PN_RELEASED); + pn_delivery_settle(dlv); + } + } + else { + try { + delegate.onMessage(mevent); + if (autoAccept) { + pn_delivery_update(dlv, PN_ACCEPTED); + pn_delivery_settle(dlv); + } + } + catch (MessageReject &) { + pn_delivery_update(dlv, PN_REJECTED); + pn_delivery_settle(dlv); + } + catch (MessageRelease &) { + pn_delivery_update(dlv, PN_REJECTED); + pn_delivery_settle(dlv); + } + } + } + else if (pn_delivery_updated(dlv) && pn_delivery_settled(dlv)) { + MessagingEvent mevent(PN_MESSAGING_SETTLED, *pe); + delegate.onSettled(mevent); } } else { // Sender if (pn_delivery_updated(dlv)) { uint64_t rstate = pn_delivery_remote_state(dlv); - if (rstate == PN_ACCEPTED) - // generate onAccepted - MessagingEvent(PN_MESSAGING_ACCEPTED, pe, pe->getContainer()).dispatch(delegate); - else if (rstate = PN_REJECTED) - MessagingEvent(PN_MESSAGING_REJECTED, pe, pe->getContainer()).dispatch(delegate); - else if (rstate == PN_RELEASED || rstate == PN_MODIFIED) - MessagingEvent(PN_MESSAGING_RELEASED, pe, pe->getContainer()).dispatch(delegate); - - if (pn_delivery_settled(dlv)) - MessagingEvent(PN_MESSAGING_SETTLED, pe, pe->getContainer()).dispatch(delegate); - - pn_delivery_settle(dlv); // TODO: only if auto settled + if (rstate == PN_ACCEPTED) { + MessagingEvent mevent(PN_MESSAGING_ACCEPTED, *pe); + delegate.onAccepted(mevent); + } + else if (rstate = PN_REJECTED) { + MessagingEvent mevent(PN_MESSAGING_REJECTED, *pe); + delegate.onRejected(mevent); + } + else if (rstate == PN_RELEASED || rstate == PN_MODIFIED) { + MessagingEvent mevent(PN_MESSAGING_RELEASED, *pe); + delegate.onReleased(mevent); + } + + if (pn_delivery_settled(dlv)) { + MessagingEvent mevent(PN_MESSAGING_SETTLED, *pe); + delegate.onSettled(mevent); + } + if (autoSettle) + pn_delivery_settle(dlv); } } } @@ -140,52 +171,247 @@ bool isRemoteClosed(pn_state_t state) { } // namespace +void MessagingAdapter::onLinkRemoteClose(Event &e) { + ProtonEvent *pe = dynamic_cast(&e); + if (pe) { + pn_event_t *cevent = pe->getPnEvent(); + pn_link_t *lnk = pn_event_link(cevent); + pn_state_t state = pn_link_state(lnk); + if (pn_condition_is_set(pn_link_remote_condition(lnk))) { + MessagingEvent mevent(PN_MESSAGING_LINK_ERROR, *pe); + onLinkError(mevent); + } + else if (isLocalClosed(state)) { + MessagingEvent mevent(PN_MESSAGING_LINK_CLOSED, *pe); + onLinkClosed(mevent); + } + else { + MessagingEvent mevent(PN_MESSAGING_LINK_CLOSING, *pe); + onLinkClosing(mevent); + } + pn_link_close(lnk); + } +} + +void MessagingAdapter::onSessionRemoteClose(Event &e) { + ProtonEvent *pe = dynamic_cast(&e); + if (pe) { + pn_event_t *cevent = pe->getPnEvent(); + pn_session_t *session = pn_event_session(cevent); + pn_state_t state = pn_session_state(session); + if (pn_condition_is_set(pn_session_remote_condition(session))) { + MessagingEvent mevent(PN_MESSAGING_SESSION_ERROR, *pe); + onSessionError(mevent); + } + else if (isLocalClosed(state)) { + MessagingEvent mevent(PN_MESSAGING_SESSION_CLOSED, *pe); + onSessionClosed(mevent); + } + else { + MessagingEvent mevent(PN_MESSAGING_SESSION_CLOSING, *pe); + onSessionClosing(mevent); + } + pn_session_close(session); + } +} + void MessagingAdapter::onConnectionRemoteClose(Event &e) { ProtonEvent *pe = dynamic_cast(&e); if (pe) { pn_event_t *cevent = pe->getPnEvent(); - pn_connection_t *conn = pn_event_connection(cevent); - // TODO: remote condition -> error - if (isLocalClosed(pn_connection_state(conn))) { - MessagingEvent(PN_MESSAGING_CONNECTION_CLOSED, pe, pe->getContainer()).dispatch(delegate); + pn_connection_t *connection = pn_event_connection(cevent); + pn_state_t state = pn_connection_state(connection); + if (pn_condition_is_set(pn_connection_remote_condition(connection))) { + MessagingEvent mevent(PN_MESSAGING_CONNECTION_ERROR, *pe); + onConnectionError(mevent); + } + else if (isLocalClosed(state)) { + MessagingEvent mevent(PN_MESSAGING_CONNECTION_CLOSED, *pe); + onConnectionClosed(mevent); } else { - MessagingEvent(PN_MESSAGING_CONNECTION_CLOSING, pe, pe->getContainer()).dispatch(delegate); + MessagingEvent mevent(PN_MESSAGING_CONNECTION_CLOSING, *pe); + onConnectionClosing(mevent); + } + pn_connection_close(connection); + } +} + +void MessagingAdapter::onConnectionLocalOpen(Event &e) { + ProtonEvent *pe = dynamic_cast(&e); + if (pe) { + pn_connection_t *connection = pn_event_connection(pe->getPnEvent()); + if (isRemoteOpen(pn_connection_state(connection))) { + MessagingEvent mevent(PN_MESSAGING_CONNECTION_OPENED, *pe); + onConnectionOpened(mevent); + } + } +} + +void MessagingAdapter::onConnectionRemoteOpen(Event &e) { + ProtonEvent *pe = dynamic_cast(&e); + if (pe) { + pn_connection_t *connection = pn_event_connection(pe->getPnEvent()); + if (isLocalOpen(pn_connection_state(connection))) { + MessagingEvent mevent(PN_MESSAGING_CONNECTION_OPENED, *pe); + onConnectionOpened(mevent); + } + else if (isLocalUnititialised(pn_connection_state(connection))) { + MessagingEvent mevent(PN_MESSAGING_CONNECTION_OPENING, *pe); + onConnectionOpening(mevent); + pn_connection_open(connection); + } + } +} + +void MessagingAdapter::onSessionLocalOpen(Event &e) { + ProtonEvent *pe = dynamic_cast(&e); + if (pe) { + pn_session_t *session = pn_event_session(pe->getPnEvent()); + if (isRemoteOpen(pn_session_state(session))) { + MessagingEvent mevent(PN_MESSAGING_SESSION_OPENED, *pe); + onSessionOpened(mevent); + } + } +} + +void MessagingAdapter::onSessionRemoteOpen(Event &e) { + ProtonEvent *pe = dynamic_cast(&e); + if (pe) { + pn_session_t *session = pn_event_session(pe->getPnEvent()); + if (isLocalOpen(pn_session_state(session))) { + MessagingEvent mevent(PN_MESSAGING_SESSION_OPENED, *pe); + onSessionOpened(mevent); + } + else if (isLocalUnititialised(pn_session_state(session))) { + MessagingEvent mevent(PN_MESSAGING_SESSION_OPENING, *pe); + onSessionOpening(mevent); + pn_session_open(session); } - pn_connection_close(conn); } } +void MessagingAdapter::onLinkLocalOpen(Event &e) { + ProtonEvent *pe = dynamic_cast(&e); + if (pe) { + pn_link_t *link = pn_event_link(pe->getPnEvent()); + if (isRemoteOpen(pn_link_state(link))) { + MessagingEvent mevent(PN_MESSAGING_LINK_OPENED, *pe); + onLinkOpened(mevent); + } + } +} void MessagingAdapter::onLinkRemoteOpen(Event &e) { ProtonEvent *pe = dynamic_cast(&e); if (pe) { - pn_event_t *cevent = pe->getPnEvent(); - pn_link_t *link = pn_event_link(cevent); - // TODO: remote condition -> error + pn_link_t *link = pn_event_link(pe->getPnEvent()); if (isLocalOpen(pn_link_state(link))) { - MessagingEvent(PN_MESSAGING_LINK_OPENED, pe, pe->getContainer()).dispatch(delegate); + MessagingEvent mevent(PN_MESSAGING_LINK_OPENED, *pe); + onLinkOpened(mevent); } else if (isLocalUnititialised(pn_link_state(link))) { - MessagingEvent(PN_MESSAGING_LINK_OPENING, pe, pe->getContainer()).dispatch(delegate); + MessagingEvent mevent(PN_MESSAGING_LINK_OPENING, *pe); + onLinkOpening(mevent); pn_link_open(link); } } } +void MessagingAdapter::onTransportTailClosed(Event &e) { + ProtonEvent *pe = dynamic_cast(&e); + if (pe) { + pn_connection_t *conn = pn_event_connection(pe->getPnEvent()); + if (conn && isLocalOpen(pn_connection_state(conn))) { + MessagingEvent mevent(PN_MESSAGING_DISCONNECTED, *pe); + delegate.onDisconnected(mevent); + } + } +} + -void MessagingAdapter::onUnhandled(Event &e) { - // Until this code fleshes out closer to python's, cheat a bit with a pn_handshaker +void MessagingAdapter::onConnectionOpened(Event &e) { + delegate.onConnectionOpened(e); +} +void MessagingAdapter::onSessionOpened(Event &e) { + delegate.onSessionOpened(e); +} + +void MessagingAdapter::onLinkOpened(Event &e) { + delegate.onLinkOpened(e); +} + +void MessagingAdapter::onConnectionOpening(Event &e) { + delegate.onConnectionOpening(e); +} + +void MessagingAdapter::onSessionOpening(Event &e) { + delegate.onSessionOpening(e); +} + +void MessagingAdapter::onLinkOpening(Event &e) { + delegate.onLinkOpening(e); +} + +void MessagingAdapter::onConnectionError(Event &e) { + delegate.onConnectionError(e); ProtonEvent *pe = dynamic_cast(&e); if (pe) { - pn_event_type_t type = (pn_event_type_t) pe->getType(); - if (type != PN_EVENT_NONE) { - pn_handler_dispatch(handshaker, pe->getPnEvent(), type); - } + pn_connection_t *connection = pn_event_connection(pe->getPnEvent()); + pn_connection_close(connection); + } +} + +void MessagingAdapter::onSessionError(Event &e) { + delegate.onSessionError(e); + ProtonEvent *pe = dynamic_cast(&e); + if (pe) { + pn_session_t *session = pn_event_session(pe->getPnEvent()); + pn_session_close(session); } } +void MessagingAdapter::onLinkError(Event &e) { + delegate.onLinkError(e); + ProtonEvent *pe = dynamic_cast(&e); + if (pe) { + pn_link_t *link = pn_event_link(pe->getPnEvent()); + pn_link_close(link); + } +} + +void MessagingAdapter::onConnectionClosed(Event &e) { + delegate.onConnectionClosed(e); +} +void MessagingAdapter::onSessionClosed(Event &e) { + delegate.onSessionClosed(e); +} + +void MessagingAdapter::onLinkClosed(Event &e) { + delegate.onLinkClosed(e); +} + +void MessagingAdapter::onConnectionClosing(Event &e) { + delegate.onConnectionClosing(e); + if (peerCloseIsError) + onConnectionError(e); +} + +void MessagingAdapter::onSessionClosing(Event &e) { + delegate.onSessionClosing(e); + if (peerCloseIsError) + onSessionError(e); +} + +void MessagingAdapter::onLinkClosing(Event &e) { + delegate.onLinkClosing(e); + if (peerCloseIsError) + onLinkError(e); +} + +void MessagingAdapter::onUnhandled(Event &e) { +} }} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/MessagingEvent.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/MessagingEvent.cpp b/proton-c/bindings/cpp/src/MessagingEvent.cpp index bcfb721..b8a2f8a 100644 --- a/proton-c/bindings/cpp/src/MessagingEvent.cpp +++ b/proton-c/bindings/cpp/src/MessagingEvent.cpp @@ -24,6 +24,7 @@ #include "proton/link.h" #include "proton/cpp/MessagingEvent.h" +#include "proton/cpp/Message.h" #include "proton/cpp/ProtonHandler.h" #include "proton/cpp/MessagingHandler.h" #include "proton/cpp/exceptions.h" @@ -37,8 +38,8 @@ MessagingEvent::MessagingEvent(pn_event_t *ce, pn_event_type_t t, Container &c) ProtonEvent(ce, t, c), messagingType(PN_MESSAGING_PROTON), parentEvent(0), message(0) {} -MessagingEvent::MessagingEvent(MessagingEventType_t t, ProtonEvent *p, Container &c) : - ProtonEvent(NULL, PN_EVENT_NONE, c), messagingType(t), parentEvent(p), message(0) { +MessagingEvent::MessagingEvent(MessagingEventType_t t, ProtonEvent &p) : + ProtonEvent(NULL, PN_EVENT_NONE, p.getContainer()), messagingType(t), parentEvent(&p), message(0) { if (messagingType == PN_MESSAGING_PROTON) throw ProtonException(MSG("invalid messaging event type")); } @@ -80,16 +81,18 @@ Link MessagingEvent::getLink() { } Message MessagingEvent::getMessage() { - if (message) - return *message; + if (parentEvent) { + pn_message_t *m = getEventContext(parentEvent->getPnEvent()); + if (m) + return Message(m); + } throw ProtonException(MSG("No message context for event")); } void MessagingEvent::setMessage(Message &m) { - if (messagingType != PN_MESSAGING_MESSAGE) + if (messagingType != PN_MESSAGING_MESSAGE || !parentEvent) throw ProtonException(MSG("Event type does not provide message")); - delete message; - message = new Message(m); + setEventContext(parentEvent->getPnEvent(), m.getPnMessage()); } void MessagingEvent::dispatch(Handler &h) { @@ -112,9 +115,23 @@ void MessagingEvent::dispatch(Handler &h) { case PN_MESSAGING_CONNECTION_CLOSING: handler->onConnectionClosing(*this); break; case PN_MESSAGING_CONNECTION_CLOSED: handler->onConnectionClosed(*this); break; + case PN_MESSAGING_CONNECTION_ERROR: handler->onConnectionError(*this); break; + case PN_MESSAGING_CONNECTION_OPENING: handler->onConnectionOpening(*this); break; + case PN_MESSAGING_CONNECTION_OPENED: handler->onConnectionOpened(*this); break; + + case PN_MESSAGING_LINK_CLOSED: handler->onLinkClosed(*this); break; + case PN_MESSAGING_LINK_CLOSING: handler->onLinkClosing(*this); break; + case PN_MESSAGING_LINK_ERROR: handler->onLinkError(*this); break; case PN_MESSAGING_LINK_OPENING: handler->onLinkOpening(*this); break; case PN_MESSAGING_LINK_OPENED: handler->onLinkOpened(*this); break; + case PN_MESSAGING_SESSION_CLOSED: handler->onSessionClosed(*this); break; + case PN_MESSAGING_SESSION_CLOSING: handler->onSessionClosing(*this); break; + case PN_MESSAGING_SESSION_ERROR: handler->onSessionError(*this); break; + case PN_MESSAGING_SESSION_OPENING: handler->onSessionOpening(*this); break; + case PN_MESSAGING_SESSION_OPENED: handler->onSessionOpened(*this); break; + + case PN_MESSAGING_TRANSPORT_CLOSED: handler->onTransportClosed(*this); break; default: throw ProtonException(MSG("Unkown messaging event type " << messagingType)); break; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/MessagingHandler.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/MessagingHandler.cpp b/proton-c/bindings/cpp/src/MessagingHandler.cpp index 7cb48f6..6066b07 100644 --- a/proton-c/bindings/cpp/src/MessagingHandler.cpp +++ b/proton-c/bindings/cpp/src/MessagingHandler.cpp @@ -30,15 +30,17 @@ MessagingHandler::~MessagingHandler(){}; void MessagingHandler::onAbort(Event &e) { onUnhandled(e); } void MessagingHandler::onAccepted(Event &e) { onUnhandled(e); } void MessagingHandler::onCommit(Event &e) { onUnhandled(e); } -void MessagingHandler::onConnectionClose(Event &e) { onUnhandled(e); } void MessagingHandler::onConnectionClosed(Event &e) { onUnhandled(e); } void MessagingHandler::onConnectionClosing(Event &e) { onUnhandled(e); } -void MessagingHandler::onConnectionOpen(Event &e) { onUnhandled(e); } +void MessagingHandler::onConnectionError(Event &e) { onUnhandled(e); } void MessagingHandler::onConnectionOpened(Event &e) { onUnhandled(e); } +void MessagingHandler::onConnectionOpening(Event &e) { onUnhandled(e); } void MessagingHandler::onDisconnected(Event &e) { onUnhandled(e); } void MessagingHandler::onFetch(Event &e) { onUnhandled(e); } void MessagingHandler::onIdLoaded(Event &e) { onUnhandled(e); } +void MessagingHandler::onLinkClosed(Event &e) { onUnhandled(e); } void MessagingHandler::onLinkClosing(Event &e) { onUnhandled(e); } +void MessagingHandler::onLinkError(Event &e) { onUnhandled(e); } void MessagingHandler::onLinkOpened(Event &e) { onUnhandled(e); } void MessagingHandler::onLinkOpening(Event &e) { onUnhandled(e); } void MessagingHandler::onMessage(Event &e) { onUnhandled(e); } @@ -50,11 +52,17 @@ void MessagingHandler::onReleased(Event &e) { onUnhandled(e); } void MessagingHandler::onRequest(Event &e) { onUnhandled(e); } void MessagingHandler::onResponse(Event &e) { onUnhandled(e); } void MessagingHandler::onSendable(Event &e) { onUnhandled(e); } +void MessagingHandler::onSessionClosed(Event &e) { onUnhandled(e); } +void MessagingHandler::onSessionClosing(Event &e) { onUnhandled(e); } +void MessagingHandler::onSessionError(Event &e) { onUnhandled(e); } +void MessagingHandler::onSessionOpened(Event &e) { onUnhandled(e); } +void MessagingHandler::onSessionOpening(Event &e) { onUnhandled(e); } void MessagingHandler::onSettled(Event &e) { onUnhandled(e); } void MessagingHandler::onStart(Event &e) { onUnhandled(e); } void MessagingHandler::onTimer(Event &e) { onUnhandled(e); } void MessagingHandler::onTransactionAborted(Event &e) { onUnhandled(e); } void MessagingHandler::onTransactionCommitted(Event &e) { onUnhandled(e); } void MessagingHandler::onTransactionDeclared(Event &e) { onUnhandled(e); } +void MessagingHandler::onTransportClosed(Event &e) { onUnhandled(e); } }} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/Session.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Session.cpp b/proton-c/bindings/cpp/src/Session.cpp index d2b01dd..4333dcf 100644 --- a/proton-c/bindings/cpp/src/Session.cpp +++ b/proton-c/bindings/cpp/src/Session.cpp @@ -30,34 +30,43 @@ namespace proton { namespace reactor { +template class ProtonHandle; +typedef ProtonImplRef PI; -Session::Session(pn_session_t *s) : pnSession(s) -{ - pn_incref(pnSession); +Session::Session(pn_session_t *p) { + PI::ctor(*this, p); +} +Session::Session() { + PI::ctor(*this, 0); +} +Session::Session(const Session& c) : ProtonHandle() { + PI::copy(*this, c); +} +Session& Session::operator=(const Session& c) { + return PI::assign(*this, c); } - Session::~Session() { - pn_decref(pnSession); + PI::dtor(*this); } -pn_session_t *Session::getPnSession() { return pnSession; } +pn_session_t *Session::getPnSession() { return impl; } void Session::open() { - pn_session_open(pnSession); + pn_session_open(impl); } Connection &Session::getConnection() { - pn_connection_t *c = pn_session_connection(pnSession); + pn_connection_t *c = pn_session_connection(impl); return ConnectionImpl::getReactorReference(c); } Receiver Session::createReceiver(std::string name) { - pn_link_t *link = pn_receiver(pnSession, name.c_str()); + pn_link_t *link = pn_receiver(impl, name.c_str()); return Receiver(link); } Sender Session::createSender(std::string name) { - pn_link_t *link = pn_sender(pnSession, name.c_str()); + pn_link_t *link = pn_sender(impl, name.c_str()); return Sender(link); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/contexts.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/contexts.cpp b/proton-c/bindings/cpp/src/contexts.cpp index b1dec49..56483ff 100644 --- a/proton-c/bindings/cpp/src/contexts.cpp +++ b/proton-c/bindings/cpp/src/contexts.cpp @@ -23,13 +23,13 @@ #include "proton/cpp/exceptions.h" #include "Msg.h" #include "proton/object.h" +#include "proton/message.h" #include "proton/session.h" #include "proton/link.h" PN_HANDLE(PNI_CPP_CONNECTION_CONTEXT) -PN_HANDLE(PNI_CPP_SESSION_CONTEXT) -PN_HANDLE(PNI_CPP_LINK_CONTEXT) PN_HANDLE(PNI_CPP_CONTAINER_CONTEXT) +PN_HANDLE(PNI_CPP_EVENT_CONTEXT) namespace proton { namespace reactor { @@ -39,7 +39,6 @@ void setConnectionContext(pn_connection_t *pnConnection, ConnectionImpl *connect pn_record_def(record, PNI_CPP_CONNECTION_CONTEXT, PN_VOID); pn_record_set(record, PNI_CPP_CONNECTION_CONTEXT, connection); } - ConnectionImpl *getConnectionContext(pn_connection_t *pnConnection) { if (!pnConnection) return NULL; pn_record_t *record = pn_connection_attachments(pnConnection); @@ -48,40 +47,11 @@ ConnectionImpl *getConnectionContext(pn_connection_t *pnConnection) { } -void setSessionContext(pn_session_t *pnSession, Session *session) { - pn_record_t *record = pn_session_attachments(pnSession); - pn_record_def(record, PNI_CPP_SESSION_CONTEXT, PN_VOID); - pn_record_set(record, PNI_CPP_SESSION_CONTEXT, session); -} - -Session *getSessionContext(pn_session_t *pnSession) { - if (!pnSession) return NULL; - pn_record_t *record = pn_session_attachments(pnSession); - Session *p = (Session *) pn_record_get(record, PNI_CPP_SESSION_CONTEXT); - return p; -} - - -void setLinkContext(pn_link_t *pnLink, Link *link) { - pn_record_t *record = pn_link_attachments(pnLink); - pn_record_def(record, PNI_CPP_LINK_CONTEXT, PN_VOID); - pn_record_set(record, PNI_CPP_LINK_CONTEXT, link); -} - -Link *getLinkContext(pn_link_t *pnLink) { - if (!pnLink) return NULL; - pn_record_t *record = pn_link_attachments(pnLink); - Link *p = (Link *) pn_record_get(record, PNI_CPP_LINK_CONTEXT); - return p; -} - - void setContainerContext(pn_reactor_t *pnReactor, ContainerImpl *container) { pn_record_t *record = pn_reactor_attachments(pnReactor); pn_record_def(record, PNI_CPP_CONTAINER_CONTEXT, PN_VOID); pn_record_set(record, PNI_CPP_CONTAINER_CONTEXT, container); } - ContainerImpl *getContainerContext(pn_reactor_t *pnReactor) { pn_record_t *record = pn_reactor_attachments(pnReactor); ContainerImpl *p = (ContainerImpl *) pn_record_get(record, PNI_CPP_CONTAINER_CONTEXT); @@ -89,4 +59,17 @@ ContainerImpl *getContainerContext(pn_reactor_t *pnReactor) { return p; } +void setEventContext(pn_event_t *pnEvent, pn_message_t *m) { + pn_record_t *record = pn_event_attachments(pnEvent); + pn_record_def(record, PNI_CPP_EVENT_CONTEXT, PN_OBJECT); // refcount it for life of the event + pn_record_set(record, PNI_CPP_EVENT_CONTEXT, m); +} +pn_message_t *getEventContext(pn_event_t *pnEvent) { + if (!pnEvent) return NULL; + pn_record_t *record = pn_event_attachments(pnEvent); + pn_message_t *p = (pn_message_t *) pn_record_get(record, PNI_CPP_EVENT_CONTEXT); + return p; +} + + }} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5ce5f282/proton-c/bindings/cpp/src/contexts.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/contexts.h b/proton-c/bindings/cpp/src/contexts.h index c04a77a..e1b5f24 100644 --- a/proton-c/bindings/cpp/src/contexts.h +++ b/proton-c/bindings/cpp/src/contexts.h @@ -23,6 +23,7 @@ */ #include "proton/reactor.h" #include "proton/connection.h" +#include "proton/message.h" namespace proton { namespace reactor { @@ -43,6 +44,9 @@ class ContainerImpl; void setContainerContext(pn_reactor_t *pnReactor, ContainerImpl *container); ContainerImpl *getContainerContext(pn_reactor_t *pnReactor); +void setEventContext(pn_event_t *pnEvent, pn_message_t *m); +pn_message_t *getEventContext(pn_event_t *pnEvent); + }} // namespace proton::reactor #endif /*!PROTON_CPP_CONTEXTS_H*/ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org