qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cliffjan...@apache.org
Subject qpid-proton git commit: PROTON-865: Added basic Terminus, Broker.cpp example
Date Tue, 12 May 2015 02:30:54 GMT
Repository: qpid-proton
Updated Branches:
  refs/heads/cjansen-cpp-client 26d74105f -> e0db6ea98


PROTON-865: Added basic Terminus, Broker.cpp example


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e0db6ea9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e0db6ea9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e0db6ea9

Branch: refs/heads/cjansen-cpp-client
Commit: e0db6ea98ab3f63ecdeb5a0e6f5885128b5e2d1a
Parents: 26d7410
Author: Clifford Jansen <cliffjansen@apache.org>
Authored: Mon May 11 19:29:53 2015 -0700
Committer: Clifford Jansen <cliffjansen@apache.org>
Committed: Mon May 11 19:29:53 2015 -0700

----------------------------------------------------------------------
 proton-c/bindings/cpp/CMakeLists.txt            |   4 +
 proton-c/bindings/cpp/examples/Broker.cpp       | 193 +++++++++++++++++++
 .../cpp/include/proton/cpp/Connection.h         |   1 +
 .../bindings/cpp/include/proton/cpp/Endpoint.h  |  11 ++
 .../bindings/cpp/include/proton/cpp/Handle.h    |   4 +
 proton-c/bindings/cpp/include/proton/cpp/Link.h |   6 +
 .../bindings/cpp/include/proton/cpp/Receiver.h  |   1 +
 .../bindings/cpp/include/proton/cpp/Sender.h    |   1 +
 .../bindings/cpp/include/proton/cpp/Terminus.h  |  81 ++++++++
 proton-c/bindings/cpp/src/Connection.cpp        |   4 +
 proton-c/bindings/cpp/src/ConnectionImpl.cpp    |  16 +-
 proton-c/bindings/cpp/src/ConnectionImpl.h      |   3 +-
 proton-c/bindings/cpp/src/ContainerImpl.cpp     |   2 +
 proton-c/bindings/cpp/src/Link.cpp              |  31 ++-
 proton-c/bindings/cpp/src/Receiver.cpp          |   2 +
 proton-c/bindings/cpp/src/Sender.cpp            |   3 +
 proton-c/bindings/cpp/src/Terminus.cpp          | 102 ++++++++++
 17 files changed, 448 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0db6ea9/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt
index 35dab37..fad26ea 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -37,6 +37,7 @@ set (qpid-proton-cpp-core
     src/Event.cpp
     src/Handler.cpp
     src/Link.cpp
+    src/Terminus.cpp
     src/Acceptor.cpp
     src/Url.cpp
     src/Message.cpp
@@ -99,6 +100,9 @@ add_executable (SimpleRecv examples/SimpleRecv.cpp)
 target_link_libraries (SimpleRecv qpid-proton-cpp)
 add_executable (SimpleSend examples/SimpleSend.cpp)
 target_link_libraries (SimpleSend qpid-proton-cpp)
+add_executable (Broker examples/Broker.cpp)
+target_link_libraries (Broker qpid-proton-cpp)
+
 
 install (TARGETS qpid-proton-cpp
   EXPORT  proton

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0db6ea9/proton-c/bindings/cpp/examples/Broker.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/examples/Broker.cpp b/proton-c/bindings/cpp/examples/Broker.cpp
new file mode 100644
index 0000000..7d5214d
--- /dev/null
+++ b/proton-c/bindings/cpp/examples/Broker.cpp
@@ -0,0 +1,193 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/cpp/Container.h"
+#include "proton/cpp/MessagingHandler.h"
+
+#include <iostream>
+#include <deque>
+#include <map>
+#include <list>
+
+
+using namespace proton::reactor;
+
+std::string generateUuid(){
+    throw "TODO: platform neutral uuid";
+}
+
+class Queue {
+  public:
+    bool dynamic;
+    typedef std::deque<Message> MsgQ;
+    typedef std::list<Sender> List;
+    MsgQ queue;
+    List consumers;
+
+    Queue(bool dyn = false) : dynamic(dyn), queue(MsgQ()), consumers(List()) {}
+
+    void subscribe(Sender &c) {
+        consumers.push_back(c);
+    }
+
+    bool unsubscribe(Sender &c) {
+        consumers.remove(c);
+        return (consumers.size() == 0 && (dynamic || queue.size() == 0));
+    }
+
+    void publish(Message &m) {
+        queue.push_back(m);
+        dispatch(0);
+    }
+
+    void dispatch(Sender *s) {
+        while (deliverTo(s)) {
+        }
+    }
+
+    bool deliverTo(Sender *consumer) {
+        // deliver to single consumer if supplied, else all consumers
+        int count = consumer ? 1 : consumers.size();
+        if (!count) return false;
+        bool result = false;
+        List::iterator it = consumers.begin();
+        if (!consumer && count) consumer = &*it;
+
+        while (queue.size()) {
+            if (consumer->getCredit()) {
+                consumer->send(queue.front());
+                queue.pop_front();
+                result = true;
+            }
+            if (--count)
+                it++;
+            else
+                return result;
+        }
+        return false;
+    }
+};
+
+class Broker : public MessagingHandler {
+  private:
+    std::string url;
+    typedef std::map<std::string, Queue *> QMap;
+    QMap queues;
+  public:
+
+    Broker(const std::string &s) : url(s), queues(QMap()) {}
+
+    void onStart(Event &e) {
+        e.getContainer().listen(url);
+    }
+
+    Queue &queue(std::string &address) {
+        QMap::iterator it = queues.find(address);
+        if (it == queues.end()) {
+            queues[address] = new Queue();
+            return *queues[address];
+        }
+        else {
+            return *it->second;
+        }
+    }
+
+    void onLinkOpening(Event &e) {
+        Link lnk = e.getLink();
+        if (lnk.isSender()) {
+            Sender sender(lnk);
+            Terminus remoteSource(lnk.getRemoteSource());
+            if (remoteSource.isDynamic()) {
+                std::string address = generateUuid();
+                lnk.getSource().setAddress(address);
+                Queue *q = new Queue(true);
+                queues[address] = q;
+                q->subscribe(sender);
+            }
+            else {
+                std::string address = remoteSource.getAddress();
+                if (!address.empty()) {
+                    lnk.getSource().setAddress(address);
+                    queue(address).subscribe(sender);
+                }
+            }
+        }
+        else {
+            std::string address = lnk.getRemoteTarget().getAddress();
+            if (!address.empty())
+                lnk.getTarget().setAddress(address);
+        }
+    }
+
+    void unsubscribe (Sender &lnk) {
+        std::string address = lnk.getSource().getAddress();
+        QMap::iterator it = queues.find(address);
+        if (it != queues.end() && it->second->unsubscribe(lnk)) {
+            delete it->second;
+            queues.erase(it);
+        }
+    }
+
+    void onLinkClosing(Event &e) {
+        Link lnk = e.getLink();
+        if (lnk.isSender()) {
+            Sender s(lnk);
+            unsubscribe(s);
+        }
+    }
+
+    void onConnectionClosing(Event &e) {
+        removeStaleConsumers(e.getConnection());
+    }
+
+    void onDisconnected(Event &e) {
+        removeStaleConsumers(e.getConnection());
+    }
+
+    void removeStaleConsumers(Connection &connection) {
+        Link l = connection.getLinkHead(Endpoint::REMOTE_ACTIVE);
+        while (l) {
+            if (l.isSender()) {
+                Sender s(l);
+                unsubscribe(s);
+            }
+            l = l.getNext(Endpoint::REMOTE_ACTIVE);
+        }
+    }
+
+    void onSendable(Event &e) {
+        Link lnk = e.getLink();
+        Sender sender(lnk);
+        std::string addr = lnk.getSource().getAddress();
+        queue(addr).dispatch(&sender);
+    }
+
+    void onMessage(Event &e) {
+        std::string addr = e.getLink().getTarget().getAddress();
+        Message msg = e.getMessage();
+        queue(addr).publish(msg);
+    }
+};
+
+int main(int argc, char **argv) {
+    Broker hw("localhost:5672");
+    Container(hw).run();
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0db6ea9/proton-c/bindings/cpp/include/proton/cpp/Connection.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Connection.h b/proton-c/bindings/cpp/include/proton/cpp/Connection.h
index 7d97ebb..86abbe6 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/Connection.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/Connection.h
@@ -57,6 +57,7 @@ class Connection : public Endpoint, public Handle<ConnectionImpl>
     PROTON_CPP_EXTERN Container &getContainer();
     PROTON_CPP_EXTERN std::string getHostname();
     virtual PROTON_CPP_EXTERN Connection &getConnection();
+    PROTON_CPP_EXTERN Link getLinkHead(Endpoint::State mask);
   private:
    friend class PrivateImplRef<Connection>;
    friend class Connector;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0db6ea9/proton-c/bindings/cpp/include/proton/cpp/Endpoint.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Endpoint.h b/proton-c/bindings/cpp/include/proton/cpp/Endpoint.h
index 9992eff..fc1a712 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/Endpoint.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/Endpoint.h
@@ -22,6 +22,7 @@
  *
  */
 #include "proton/cpp/ImportExport.h"
+#include "proton/connection.h"
 
 namespace proton {
 namespace reactor {
@@ -33,6 +34,16 @@ class Transport;
 class Endpoint
 {
   public:
+    enum {
+        LOCAL_UNINIT = PN_LOCAL_UNINIT,
+        REMOTE_UNINIT = PN_REMOTE_UNINIT,
+        LOCAL_ACTIVE = PN_LOCAL_ACTIVE,
+        REMOTE_ACTIVE = PN_REMOTE_ACTIVE,
+        LOCAL_CLOSED = PN_LOCAL_CLOSED,
+        REMOTE_CLOSED  = PN_REMOTE_CLOSED
+    };
+    typedef int State;
+
     // TODO: getCondition, getRemoteCondition, updateCondition, get/setHandler
     virtual PROTON_CPP_EXTERN Connection &getConnection() = 0;
     Transport PROTON_CPP_EXTERN &getTransport();

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0db6ea9/proton-c/bindings/cpp/include/proton/cpp/Handle.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Handle.h b/proton-c/bindings/cpp/include/proton/cpp/Handle.h
index 632e30e..77b7814 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/Handle.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/Handle.h
@@ -51,6 +51,10 @@ template <class T> class Handle {
     /** Operator ! supports idiom if (!handle) { do_if_handle_is_null(); } */
     PROTON_CPP_INLINE_EXTERN bool operator !() const { return !impl; }
 
+    /** Operator ==  equal if they point to same non-null object*/
+    PROTON_CPP_INLINE_EXTERN bool operator ==(const Handle<T>& other) const { return
impl == other.impl; }
+    PROTON_CPP_INLINE_EXTERN bool operator !=(const Handle<T>& other) const { return
impl != other.impl; }
+
     void swap(Handle<T>& h) { T* t = h.impl; h.impl = impl; impl = t; }
 
   private:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0db6ea9/proton-c/bindings/cpp/include/proton/cpp/Link.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Link.h b/proton-c/bindings/cpp/include/proton/cpp/Link.h
index 21b1ca2..265d80d 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/Link.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/Link.h
@@ -24,6 +24,7 @@
 #include "proton/cpp/ImportExport.h"
 #include "proton/cpp/ProtonHandle.h"
 #include "proton/cpp/Endpoint.h"
+#include "proton/cpp/Terminus.h"
 #include "proton/types.h"
 #include <string>
 
@@ -45,8 +46,13 @@ class Link : public Endpoint, public ProtonHandle<pn_link_t>
     PROTON_CPP_EXTERN bool isSender();
     PROTON_CPP_EXTERN bool isReceiver();
     PROTON_CPP_EXTERN int getCredit();
+    PROTON_CPP_EXTERN Terminus getSource();
+    PROTON_CPP_EXTERN Terminus getTarget();
+    PROTON_CPP_EXTERN Terminus getRemoteSource();
+    PROTON_CPP_EXTERN Terminus getRemoteTarget();
     PROTON_CPP_EXTERN pn_link_t *getPnLink() const;
     virtual PROTON_CPP_EXTERN Connection &getConnection();
+    PROTON_CPP_EXTERN Link getNext(Endpoint::State mask);
   protected:
     virtual void verifyType(pn_link_t *l);
   private:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0db6ea9/proton-c/bindings/cpp/include/proton/cpp/Receiver.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Receiver.h b/proton-c/bindings/cpp/include/proton/cpp/Receiver.h
index 197cfb1..c904dc9 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/Receiver.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/Receiver.h
@@ -37,6 +37,7 @@ class Receiver : public Link
   public:
     PROTON_CPP_EXTERN Receiver(pn_link_t *lnk);
     PROTON_CPP_EXTERN Receiver();
+    PROTON_CPP_EXTERN Receiver(const Link& c);
   protected:
     virtual void verifyType(pn_link_t *l);
 };

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0db6ea9/proton-c/bindings/cpp/include/proton/cpp/Sender.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Sender.h b/proton-c/bindings/cpp/include/proton/cpp/Sender.h
index fa8cce8..9b8683d 100644
--- a/proton-c/bindings/cpp/include/proton/cpp/Sender.h
+++ b/proton-c/bindings/cpp/include/proton/cpp/Sender.h
@@ -39,6 +39,7 @@ class Sender : public Link
   public:
     PROTON_CPP_EXTERN Sender(pn_link_t *lnk);
     PROTON_CPP_EXTERN Sender();
+    PROTON_CPP_EXTERN Sender(const Link& c);
     PROTON_CPP_EXTERN void send(Message &m);
   protected:
     virtual void verifyType(pn_link_t *l);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0db6ea9/proton-c/bindings/cpp/include/proton/cpp/Terminus.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/cpp/Terminus.h b/proton-c/bindings/cpp/include/proton/cpp/Terminus.h
new file mode 100644
index 0000000..56c29c5
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/cpp/Terminus.h
@@ -0,0 +1,81 @@
+#ifndef PROTON_CPP_TERMINUS_H
+#define PROTON_CPP_TERMINUS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/cpp/ImportExport.h"
+#include "proton/cpp/Link.h"
+
+#include "proton/link.h"
+#include <string>
+
+namespace proton {
+namespace reactor {
+
+class Link;
+
+class Terminus : public ProtonHandle<pn_terminus_t>
+{
+    enum Type {
+        TYPE_UNSPECIFIED = PN_UNSPECIFIED,
+        SOURCE = PN_SOURCE,
+        TARGET = PN_TARGET,
+        COORDINATOR = PN_COORDINATOR
+    };
+    enum ExpiryPolicy {
+        NONDURABLE = PN_NONDURABLE,
+        CONFIGURATION = PN_CONFIGURATION,
+        DELIVERIES = PN_DELIVERIES
+    };
+    enum DistributionMode {
+        MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED,
+        COPY = PN_DIST_MODE_COPY,
+        MOVE = PN_DIST_MODE_MOVE
+    };
+
+  public:
+    PROTON_CPP_EXTERN Terminus();
+    PROTON_CPP_EXTERN ~Terminus();
+    PROTON_CPP_EXTERN Terminus(const Terminus&);
+    PROTON_CPP_EXTERN Terminus& operator=(const Terminus&);
+    PROTON_CPP_EXTERN pn_terminus_t *getPnTerminus();
+    PROTON_CPP_EXTERN Type getType();
+    PROTON_CPP_EXTERN void setType(Type);
+    PROTON_CPP_EXTERN ExpiryPolicy getExpiryPolicy();
+    PROTON_CPP_EXTERN void setExpiryPolicy(ExpiryPolicy);
+    PROTON_CPP_EXTERN DistributionMode getDistributionMode();
+    PROTON_CPP_EXTERN void setDistributionMode(DistributionMode);
+    PROTON_CPP_EXTERN std::string getAddress();
+    PROTON_CPP_EXTERN void setAddress(std::string &);
+    PROTON_CPP_EXTERN bool isDynamic();
+    PROTON_CPP_EXTERN void setDynamic(bool);
+    
+  private:
+    Link *link;
+    PROTON_CPP_EXTERN Terminus(pn_terminus_t *, Link *);
+    friend class Link;
+    friend class ProtonImplRef<Terminus>;
+};
+
+
+}} // namespace proton::reactor
+
+#endif  /*!PROTON_CPP_TERMINUS_H*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0db6ea9/proton-c/bindings/cpp/src/Connection.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Connection.cpp b/proton-c/bindings/cpp/src/Connection.cpp
index 49d171e..1db8fbc 100644
--- a/proton-c/bindings/cpp/src/Connection.cpp
+++ b/proton-c/bindings/cpp/src/Connection.cpp
@@ -66,4 +66,8 @@ Connection &Connection::getConnection() {
 
 Container &Connection::getContainer() { return impl->getContainer(); }
 
+Link Connection::getLinkHead(Endpoint::State mask) {
+    return impl->getLinkHead(mask);
+}
+
 }} // namespace proton::reactor

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0db6ea9/proton-c/bindings/cpp/src/ConnectionImpl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/ConnectionImpl.cpp b/proton-c/bindings/cpp/src/ConnectionImpl.cpp
index be01f8d..9cadffe 100644
--- a/proton-c/bindings/cpp/src/ConnectionImpl.cpp
+++ b/proton-c/bindings/cpp/src/ConnectionImpl.cpp
@@ -41,10 +41,12 @@ void ConnectionImpl::decref(ConnectionImpl *impl) {
         delete impl;
 }
 
-ConnectionImpl::ConnectionImpl(Container &c) : container(c), refCount(0), override(0),
transport(0), defaultSession(0),
-                                               pnConnection(pn_reactor_connection(container.getReactor(),
NULL)),
+ConnectionImpl::ConnectionImpl(Container &c, pn_connection_t *pnConn) : container(c),
refCount(0), override(0), transport(0), defaultSession(0),
+                                               pnConnection(pnConn),
                                                reactorReference(this)
 {
+    if (!pnConnection)
+        pnConnection = pn_reactor_connection(container.getReactor(), NULL);
     setConnectionContext(pnConnection, this);
 }
 
@@ -110,12 +112,14 @@ Connection &ConnectionImpl::getReactorReference(pn_connection_t
*conn) {
         Container container(getContainerContext(reactor));
         if (!container)  // can't be one created by our container
             throw ProtonException(MSG("Unknown Proton connection specifier"));
-        Connection connection(container);
-        impl = connection.impl;
-        setConnectionContext(conn, impl);
-        impl->reactorReference = connection;
+        impl = new ConnectionImpl(container, conn);
     }
     return impl->reactorReference;
 }
 
+Link ConnectionImpl::getLinkHead(Endpoint::State mask) {
+    return Link(pn_link_head(pnConnection, mask));
+}
+
+
 }} // namespace proton::reactor

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0db6ea9/proton-c/bindings/cpp/src/ConnectionImpl.h
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/ConnectionImpl.h b/proton-c/bindings/cpp/src/ConnectionImpl.h
index ad8d71e..11b5765 100644
--- a/proton-c/bindings/cpp/src/ConnectionImpl.h
+++ b/proton-c/bindings/cpp/src/ConnectionImpl.h
@@ -39,7 +39,7 @@ class Container;
 class ConnectionImpl : public Endpoint
 {
   public:
-    PROTON_CPP_EXTERN ConnectionImpl(Container &c);
+    PROTON_CPP_EXTERN ConnectionImpl(Container &c, pn_connection_t *pnConn = 0);
     PROTON_CPP_EXTERN ~ConnectionImpl();
     PROTON_CPP_EXTERN Transport &getTransport();
     PROTON_CPP_EXTERN Handler *getOverride();
@@ -49,6 +49,7 @@ class ConnectionImpl : public Endpoint
     PROTON_CPP_EXTERN pn_connection_t *getPnConnection();
     PROTON_CPP_EXTERN Container &getContainer();
     PROTON_CPP_EXTERN std::string getHostname();
+    PROTON_CPP_EXTERN Link getLinkHead(Endpoint::State mask);
     virtual PROTON_CPP_EXTERN Connection &getConnection();
     static Connection &getReactorReference(pn_connection_t *);
     static ConnectionImpl *getImpl(const Connection &c) { return c.impl; }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0db6ea9/proton-c/bindings/cpp/src/ContainerImpl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/ContainerImpl.cpp b/proton-c/bindings/cpp/src/ContainerImpl.cpp
index 8bdd540..0b75f1c 100644
--- a/proton-c/bindings/cpp/src/ContainerImpl.cpp
+++ b/proton-c/bindings/cpp/src/ContainerImpl.cpp
@@ -122,6 +122,8 @@ class OverrideHandler : public Handler
             ConnectionImpl *cimpl = getConnectionContext(conn);
             if (cimpl)
                 cimpl->reactorDetach();
+            // TODO: remember all connections and do reactorDetach of zombies connections
+            // not pn_connection_release'd at PN_REACTOR_FINAL.
         }
     }
 };

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0db6ea9/proton-c/bindings/cpp/src/Link.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Link.cpp b/proton-c/bindings/cpp/src/Link.cpp
index 3356b38..aab01d9 100644
--- a/proton-c/bindings/cpp/src/Link.cpp
+++ b/proton-c/bindings/cpp/src/Link.cpp
@@ -33,12 +33,6 @@
 namespace proton {
 namespace reactor {
 
-namespace {
-
-static inline void throwIfNull(pn_link_t *l) { if (!l) throw ProtonException(MSG("Disassociated
link")); }
-
-}
-
 template class ProtonHandle<pn_link_t>;
 typedef ProtonImplRef<Link> PI;
 
@@ -67,12 +61,10 @@ void Link::verifyType(pn_link_t *l) {} // Generic link can be sender or
receiver
 pn_link_t *Link::getPnLink() const { return impl; }
 
 void Link::open() {
-    throwIfNull(impl);
     pn_link_open(impl);
 }
 
 void Link::close() {
-    throwIfNull(impl);
     pn_link_close(impl);
 }
 
@@ -85,15 +77,34 @@ bool Link::isReceiver() {
 }
 
 int Link::getCredit() {
-    throwIfNull(impl);
     return pn_link_credit(impl);
 }
 
+Terminus Link::getSource() {
+    return Terminus(pn_link_source(impl), this);
+}
+
+Terminus Link::getTarget() {
+    return Terminus(pn_link_target(impl), this);
+}
+
+Terminus Link::getRemoteSource() {
+    return Terminus(pn_link_remote_source(impl), this);
+}
+
+Terminus Link::getRemoteTarget() {
+    return Terminus(pn_link_remote_target(impl), this);
+}
+
 Connection &Link::getConnection() {
-    throwIfNull(impl);
     pn_session_t *s = pn_link_session(impl);
     pn_connection_t *c = pn_session_connection(s);
     return ConnectionImpl::getReactorReference(c);
 }
 
+Link Link::getNext(Endpoint::State mask) {
+
+    return Link(pn_link_next(impl, (pn_state_t) mask));
+}
+
 }} // namespace proton::reactor

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0db6ea9/proton-c/bindings/cpp/src/Receiver.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Receiver.cpp b/proton-c/bindings/cpp/src/Receiver.cpp
index 557e736..ad9a6d1 100644
--- a/proton-c/bindings/cpp/src/Receiver.cpp
+++ b/proton-c/bindings/cpp/src/Receiver.cpp
@@ -34,6 +34,8 @@ namespace reactor {
 Receiver::Receiver(pn_link_t *lnk) : Link(lnk) {}
 Receiver::Receiver() : Link(0) {}
 
+Receiver::Receiver(const Link& c) : Link(c.getPnLink()) {}
+
 void Receiver::verifyType(pn_link_t *lnk) {
     if (lnk && pn_link_is_sender(lnk))
         throw ProtonException(MSG("Creating receiver with sender context"));

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0db6ea9/proton-c/bindings/cpp/src/Sender.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Sender.cpp b/proton-c/bindings/cpp/src/Sender.cpp
index 74d4b0f..c521ad1 100644
--- a/proton-c/bindings/cpp/src/Sender.cpp
+++ b/proton-c/bindings/cpp/src/Sender.cpp
@@ -46,6 +46,9 @@ void Sender::verifyType(pn_link_t *lnk) {
         throw ProtonException(MSG("Creating sender with receiver context"));
 }
 
+Sender::Sender(const Link& c) : Link(c.getPnLink()) {}
+
+
 namespace{
 // revisit if thread safety required
 uint64_t tagCounter = 0;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0db6ea9/proton-c/bindings/cpp/src/Terminus.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/Terminus.cpp b/proton-c/bindings/cpp/src/Terminus.cpp
new file mode 100644
index 0000000..f66979e
--- /dev/null
+++ b/proton-c/bindings/cpp/src/Terminus.cpp
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/cpp/Link.h"
+#include "proton/link.h"
+
+namespace proton {
+namespace reactor {
+
+template class ProtonHandle<pn_terminus_t>;
+typedef ProtonImplRef<Terminus> PI;
+
+// Note: the pn_terminus_t is not ref counted.  We count the parent link.
+
+Terminus::Terminus() : link(0) {
+    impl = 0;
+}
+
+Terminus::Terminus(pn_terminus_t *p, Link *l) : link(l) {
+    impl = p;
+    pn_incref(link->getPnLink());
+}
+Terminus::Terminus(const Terminus& c) : ProtonHandle<pn_terminus_t>() {
+    impl = c.impl;
+    link = c.link;
+    pn_incref(link->getPnLink());
+}
+Terminus& Terminus::operator=(const Terminus& c) {
+    if (impl == c.impl) return *this;
+    if (impl) pn_decref(link->getPnLink());
+    impl = c.impl;
+    link = c.link;
+    pn_incref(link->getPnLink());
+    return *this;
+}
+Terminus::~Terminus() {
+    if (impl)
+        pn_decref(link->getPnLink());
+}
+
+pn_terminus_t *Terminus::getPnTerminus() { return impl; }
+
+Terminus::Type Terminus::getType() {
+    return (Type) pn_terminus_get_type(impl);
+}
+
+void Terminus::setType(Type type) {
+    pn_terminus_set_type(impl, (pn_terminus_type_t) type);
+}
+
+Terminus::ExpiryPolicy Terminus::getExpiryPolicy() {
+    return (ExpiryPolicy) pn_terminus_get_type(impl);
+}
+
+void Terminus::setExpiryPolicy(ExpiryPolicy policy) {
+    pn_terminus_set_expiry_policy(impl, (pn_expiry_policy_t) policy);
+}
+
+Terminus::DistributionMode Terminus::getDistributionMode() {
+    return (DistributionMode) pn_terminus_get_type(impl);
+}
+
+void Terminus::setDistributionMode(DistributionMode mode) {
+    pn_terminus_set_distribution_mode(impl, (pn_distribution_mode_t) mode);
+}
+
+std::string Terminus::getAddress() {
+    const char *addr = pn_terminus_get_address(impl);
+    return addr ? std::string(addr) : std::string();
+}
+
+void Terminus::setAddress(std::string &addr) {
+    pn_terminus_set_address(impl, addr.c_str());
+}
+
+bool Terminus::isDynamic() {
+    return (Type) pn_terminus_is_dynamic(impl);
+}
+
+void Terminus::setDynamic(bool d) {
+    pn_terminus_set_dynamic(impl, d);
+}
+
+}} // namespace proton::reactor


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message