qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [1/3] qpid-proton git commit: PROTON-1046: C++ multi-threaded controller and improved broker example
Date Wed, 27 Apr 2016 14:54:57 GMT
Repository: qpid-proton
Updated Branches:
  refs/heads/master b53a684e7 -> deccf354a


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/include/proton/sender.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/sender.hpp b/proton-c/bindings/cpp/include/proton/sender.hpp
index 6f13abe..43c0747 100644
--- a/proton-c/bindings/cpp/include/proton/sender.hpp
+++ b/proton-c/bindings/cpp/include/proton/sender.hpp
@@ -25,6 +25,7 @@
 #include "proton/export.hpp"
 #include "proton/link.hpp"
 #include "proton/message.hpp"
+#include "proton/tracker.hpp"
 
 #include "proton/types.h"
 #include <string>
@@ -33,8 +34,6 @@ struct pn_connection_t;
 
 namespace proton {
 
-class tracker;
-
 /// A link for sending messages.
 class
 PN_CPP_CLASS_EXTERN sender : public internal::link

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/include/proton/work_queue.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/work_queue.hpp b/proton-c/bindings/cpp/include/proton/work_queue.hpp
new file mode 100644
index 0000000..1fb84ce
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/work_queue.hpp
@@ -0,0 +1,75 @@
+#ifndef PROTON_WORK_QUEUE_HPP
+#define PROTON_WORK_QUEUE_HPP
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+pp * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <proton/handler.hpp>
+#include <proton/connection_options.hpp>
+
+#include <functional>
+#include <memory>
+
+namespace proton {
+
+class connection;
+
+/// A work_queue takes work (in the form of function objects) that will be be
+/// serialized with other activity on a connection. Typically the work is a call
+/// to user-defined member functions on the handler(s) associated with a
+/// connection, which will be called serialized with respect to
+/// proton::handler::on_* event functions.
+///
+class work_queue : public std::enable_shared_from_this<work_queue> {
+  public:
+    work_queue(const work_queue&) = delete;
+    virtual ~work_queue() {}
+
+    /// Get the work_queue associated with a connection.
+    /// @throw proton::error if this is not a controller-managed connection.
+    PN_CPP_EXTERN static std::shared_ptr<work_queue> get(const proton::connection&);
+
+    /// push a function object on the queue to be invoked in a safely serialized
+    /// away.
+    ///
+    /// @return true if `f()` was pushed and will be called. False if the
+    /// work_queue is already closed and f() will never be called.
+    ///
+    /// Note 1: On returning true, the application can rely on f() being called
+    /// eventually. However f() should check the state when it executes as
+    /// links, sessions or even the connection may have closed by the time f()
+    /// is executed.
+    ///
+    /// Note 2: You must not push() in a handler or work_queue function on the
+    /// *same connection* as the work_queue you are pushing to. That could cause
+    /// a deadlock.
+    ///
+    virtual bool push(std::function<void()>) = 0;
+
+    /// Get the controller associated with this work_queue.
+    virtual class controller& controller() const = 0;
+
+  protected:
+    work_queue() {}
+};
+
+}
+
+
+#endif // PROTON_WORK_QUEUE_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/connection_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection_options.cpp b/proton-c/bindings/cpp/src/connection_options.cpp
index 4a0956b..19a494a 100644
--- a/proton-c/bindings/cpp/src/connection_options.cpp
+++ b/proton-c/bindings/cpp/src/connection_options.cpp
@@ -145,9 +145,11 @@ class connection_options::impl {
 };
 
 connection_options::connection_options() : impl_(new impl()) {}
+
 connection_options::connection_options(const connection_options& x) : impl_(new impl())
{
     *this = x;
 }
+
 connection_options::~connection_options() {}
 
 connection_options& connection_options::operator=(const connection_options& x) {
@@ -155,7 +157,16 @@ connection_options& connection_options::operator=(const connection_options&
x) {
     return *this;
 }
 
-void connection_options::update(const connection_options& x) { impl_->update(*x.impl_);
}
+connection_options& connection_options::update(const connection_options& x) {
+    impl_->update(*x.impl_);
+    return *this;
+}
+
+connection_options connection_options::update(const connection_options& x) const {
+    connection_options copy(*this);
+    copy.update(x);
+    return copy;
+}
 
 connection_options& connection_options::handler(class handler *h) { impl_->handler
= h->messaging_adapter_.get(); return *this; }
 connection_options& connection_options::max_frame_size(uint32_t n) { impl_->max_frame_size
= n; return *this; }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/contexts.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/contexts.hpp b/proton-c/bindings/cpp/src/contexts.hpp
index a60c1fa..0aa539e 100644
--- a/proton-c/bindings/cpp/src/contexts.hpp
+++ b/proton-c/bindings/cpp/src/contexts.hpp
@@ -40,6 +40,8 @@ struct pn_acceptor_t;
 namespace proton {
 
 class proton_handler;
+class work_queue;
+
 
 // Base class for C++ classes that are used as proton contexts.
 // Contexts are pn_objects managed by pn reference counts, the C++ value is allocated in-place.
@@ -81,12 +83,13 @@ class context {
 // Connection context used by all connections.
 class connection_context : public context {
   public:
-    connection_context() : default_session(0) {}
+    connection_context() : default_session(0), work_queue(0) {}
 
     // Used by all connections
     pn_session_t *default_session; // Owned by connection.
     message event_message;      // re-used by messaging_adapter for performance.
     id_generator link_gen;      // Link name generator.
+    class work_queue* work_queue; // Work queue if this is proton::controller connection.
 
     internal::pn_unique_ptr<proton_handler> handler;
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/controller.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/controller.cpp b/proton-c/bindings/cpp/src/controller.cpp
new file mode 100644
index 0000000..73403c2
--- /dev/null
+++ b/proton-c/bindings/cpp/src/controller.cpp
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "contexts.hpp"
+
+#include <proton/error.hpp>
+#include <proton/controller.hpp>
+#include <proton/work_queue.hpp>
+
+#include <proton/io/default_controller.hpp>
+
+#include <utility>
+#include <memory>
+
+static proton::io::default_controller::make_fn make_default_controller;
+
+namespace proton {
+
+std::unique_ptr<controller> controller::create() {
+    if (!make_default_controller)
+        throw error("no default controller");
+    return make_default_controller();
+}
+
+controller& controller::get(const proton::connection& c) {
+    return work_queue::get(c)->controller();
+}
+
+std::shared_ptr<work_queue> work_queue::get(const proton::connection& c) {
+    work_queue* wq = connection_context::get(c).work_queue;
+    if (!wq)
+        throw proton::error("connection has no controller");
+    return wq->shared_from_this();
+}
+
+namespace io {
+// Register a default controller factory.
+default_controller::default_controller(default_controller::make_fn f) {
+    make_default_controller = f;
+}
+} // namespace io
+
+} // namespace proton

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/engine_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/engine_test.cpp b/proton-c/bindings/cpp/src/engine_test.cpp
index d94abdc..a6fd71e 100644
--- a/proton-c/bindings/cpp/src/engine_test.cpp
+++ b/proton-c/bindings/cpp/src/engine_test.cpp
@@ -74,8 +74,6 @@ struct in_memory_engine : public connection_engine {
 /// A pair of engines that talk to each other in-memory.
 struct engine_pair {
     byte_stream ab, ba;
-    connection_engine::container cont;
-
     in_memory_engine a, b;
 
     engine_pair(handler& ha, handler& hb,
@@ -152,49 +150,7 @@ void test_engine_prefix() {
     ASSERT_EQUAL("y/1", quick_pop(hb.receivers).name());
 }
 
-void test_container_prefix() {
-    /// Let the container set the options.
-    record_handler ha, hb;
-    connection_engine::container ca("a"), cb("b");
-    engine_pair e(ha, hb, ca.make_options(), cb.make_options());
-
-    ASSERT_EQUAL("a", e.a.connection().container_id());
-    ASSERT_EQUAL("b", e.b.connection().container_id());
-
-    e.a.connection().open();
-    sender s = e.a.connection().open_sender("x");
-    ASSERT_EQUAL("1/1", s.name());
-
-    while (ha.senders.empty() || hb.receivers.empty()) e.process();
-
-    ASSERT_EQUAL("1/1", quick_pop(ha.senders).name());
-    ASSERT_EQUAL("1/1", quick_pop(hb.receivers).name());
-
-    e.a.connection().open_receiver("y");
-    while (ha.receivers.empty() || hb.senders.empty()) e.process();
-    ASSERT_EQUAL("1/2", quick_pop(ha.receivers).name());
-    ASSERT_EQUAL("1/2", quick_pop(hb.senders).name());
-
-    // Open a second connection in each container, make sure links have different IDs.
-    record_handler ha2, hb2;
-    engine_pair e2(ha2, hb2, ca.make_options(), cb.make_options());
-
-    ASSERT_EQUAL("a", e2.a.connection().container_id());
-    ASSERT_EQUAL("b", e2.b.connection().container_id());
-
-    e2.b.connection().open();
-    receiver r = e2.b.connection().open_receiver("z");
-    ASSERT_EQUAL("2/1", r.name());
-
-    while (ha2.senders.empty() || hb2.receivers.empty()) e2.process();
-
-    ASSERT_EQUAL("2/1", quick_pop(ha2.senders).name());
-    ASSERT_EQUAL("2/1", quick_pop(hb2.receivers).name());
-};
-
 void test_endpoint_close() {
-    // Make sure conditions are sent to the remote end.
-
     record_handler ha, hb;
     engine_pair e(ha, hb);
     e.a.connection().open();
@@ -246,7 +202,6 @@ void test_transport_close() {
 int main(int, char**) {
     int failed = 0;
     RUN_TEST(failed, test_engine_prefix());
-    RUN_TEST(failed, test_container_prefix());
     RUN_TEST(failed, test_endpoint_close());
     RUN_TEST(failed, test_transport_close());
     return failed;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/io/connection_engine.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/connection_engine.cpp b/proton-c/bindings/cpp/src/io/connection_engine.cpp
index e2a9356..ffef3b7 100644
--- a/proton-c/bindings/cpp/src/io/connection_engine.cpp
+++ b/proton-c/bindings/cpp/src/io/connection_engine.cpp
@@ -42,38 +42,7 @@
 namespace proton {
 namespace io {
 
-namespace {
-std::string  make_id(const std::string s="") {
-    return s.empty() ? uuid::random().str() : s;
-}
-}
-
-class connection_engine::container::impl {
-  public:
-    impl(const std::string s="") : id_(make_id(s)) {}
-
-    const std::string id_;
-    id_generator id_gen_;
-    connection_options options_;
-};
-
-connection_engine::container::container(const std::string& s) : impl_(new impl(s)) {}
-
-connection_engine::container::~container() {}
-
-std::string connection_engine::container::id() const { return impl_->id_; }
-
-connection_options connection_engine::container::make_options() {
-    connection_options opts = impl_->options_;
-    opts.container_id(id()).link_prefix(impl_->id_gen_.next()+"/");
-    return opts;
-}
-
-void connection_engine::container::options(const connection_options &opts) {
-    impl_->options_ = opts;
-}
-
-connection_engine::connection_engine(class handler &h, const connection_options&
opts) :
+connection_engine::connection_engine(class handler &h, const connection_options&
opts):
     handler_(h),
     connection_(internal::take_ownership(pn_connection()).get()),
     transport_(internal::take_ownership(pn_transport()).get()),
@@ -85,16 +54,12 @@ connection_engine::connection_engine(class handler &h, const connection_options&
     pn_connection_collect(connection_.pn_object(), collector_.get());
     opts.apply(connection_);
 
-    // Provide defaults for connection_id and link_prefix if not set.
-    std::string cid = connection_.container_id();
-    if (cid.empty()) {
-        cid = make_id();
-        pn_connection_set_container(connection_.pn_object(), cid.c_str());
-    }
+    // Provide local random defaults for connection_id and link_prefix if not by opts.
+    if (connection_.container_id().empty())
+        pn_connection_set_container(connection_.pn_object(), uuid::random().str().c_str());
     id_generator &link_gen = connection_context::get(connection_).link_gen;
-    if (link_gen.prefix().empty()) {
-        link_gen.prefix(make_id()+"/");
-    }
+    if (link_gen.prefix().empty())
+        link_gen.prefix(uuid::random().str()+"/");
 }
 
 connection_engine::~connection_engine() {
@@ -108,11 +73,15 @@ bool connection_engine::dispatch() {
          e;
          e = pn_collector_peek(collector_.get()))
     {
-        proton_event(e, 0).dispatch(h);
+        proton_event pe(e, 0);
+        try {
+            pe.dispatch(h);
+        } catch (const std::exception& e) {
+            close(error_condition("exception", e.what()));
+        }
         pn_collector_pop(collector_.get());
     }
-    return !(pn_transport_closed(transport_.pn_object()) ||
-          (connection().closed() && write_buffer().size == 0));
+    return !(pn_transport_closed(transport_.pn_object()));
 }
 
 mutable_buffer connection_engine::read_buffer() {
@@ -124,7 +93,8 @@ mutable_buffer connection_engine::read_buffer() {
 }
 
 void connection_engine::read_done(size_t n) {
-    pn_transport_process(transport_.pn_object(), n);
+    if (n > 0)
+        pn_transport_process(transport_.pn_object(), n);
 }
 
 void connection_engine::read_close() {
@@ -140,7 +110,8 @@ const_buffer connection_engine::write_buffer() const {
 }
 
 void connection_engine::write_done(size_t n) {
-    pn_transport_pop(transport_.pn_object(), n);
+    if (n > 0)
+        pn_transport_pop(transport_.pn_object(), n);
 }
 
 void connection_engine::write_close() {
@@ -161,4 +132,8 @@ proton::transport connection_engine::transport() const {
     return transport_;
 }
 
+void connection_engine::work_queue(class work_queue* wq) {
+    connection_context::get(connection()).work_queue = wq;
+}
+
 }}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/io/posix/socket.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/posix/socket.cpp b/proton-c/bindings/cpp/src/io/posix/socket.cpp
deleted file mode 100644
index 204b530..0000000
--- a/proton-c/bindings/cpp/src/io/posix/socket.cpp
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "msg.hpp"
-
-#include <proton/error_condition.hpp>
-#include <proton/io/socket.hpp>
-#include <proton/url.hpp>
-
-#include <errno.h>
-#include <string.h>
-#include <fcntl.h>
-#include <netdb.h>
-#include <sys/socket.h>
-#include <sys/select.h>
-#include <sys/types.h>
-#include <unistd.h>
-
-namespace proton {
-namespace io {
-namespace socket {
-
-io_error::io_error(const std::string& s) : error(s) {}
-
-const descriptor INVALID_DESCRIPTOR = -1;
-
-std::string error_str() {
-    char buf[512] = "Unknown error";
-#ifdef _GNU_SOURCE
-    // GNU strerror_r returns the message
-    return ::strerror_r(errno, buf, sizeof(buf));
-#else
-    // POSIX strerror_r doesn't return the buffer
-    ::strerror_r(errno, buf, sizeof(buf));
-    return std::string(buf)
-#endif
-}
-
-namespace {
-
-template <class T> T check(T result, const std::string& msg=std::string()) {
-    if (result < 0) throw io_error(msg + error_str());
-    return result;
-}
-
-void gai_check(int result, const std::string& msg="") {
-    if (result) throw io_error(msg + gai_strerror(result));
-}
-
-}
-
-void engine::init() {
-    check(fcntl(socket_, F_SETFL, fcntl(socket_, F_GETFL, 0) | O_NONBLOCK), "set nonblock:
");
-}
-
-engine::engine(descriptor fd, handler& h, const connection_options &opts)
-    : connection_engine(h, opts), socket_(fd)
-{
-    init();
-}
-
-engine::engine(const url& u, handler& h, const connection_options& opts)
-    : connection_engine(h, opts), socket_(connect(u))
-{
-    init();
-    connection().open();
-}
-
-engine::~engine() {}
-
-void engine::read() {
-    mutable_buffer rbuf = read_buffer();
-    if (rbuf.size > 0) {
-        ssize_t n = ::read(socket_, rbuf.data, rbuf.size);
-        if (n > 0)
-            read_done(n);
-        else if (n == 0)
-            read_close();
-        else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK)
-            close(error_condition("io_error", error_str()));
-    }
-}
-
-void engine::write() {
-    const_buffer wbuf = write_buffer();
-    if (wbuf.size > 0) {
-        ssize_t n = ::write(socket_, wbuf.data, wbuf.size);
-        if (n > 0)
-            write_done(n);
-        else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
-            close(error_condition("io_error", error_str()));
-        }
-    }
-}
-
-void engine::run() {
-    while (dispatch()) {
-        fd_set rd, wr;
-        FD_ZERO(&rd);
-        if (read_buffer().size)
-            FD_SET(socket_, &rd);
-        FD_ZERO(&wr);
-        if (write_buffer().size)
-            FD_SET(socket_, &wr);
-        int n = ::select(FD_SETSIZE, &rd, &wr, NULL, NULL);
-        if (n < 0) {
-            close(error_condition("select: ", error_str()));
-            break;
-        }
-        if (FD_ISSET(socket_, &rd)) {
-            read();
-        }
-        if (FD_ISSET(socket_, &wr))
-            write();
-    }
-    ::close(socket_);
-}
-
-namespace {
-struct auto_addrinfo {
-    struct addrinfo *ptr;
-    auto_addrinfo() : ptr(0) {}
-    ~auto_addrinfo() { ::freeaddrinfo(ptr); }
-    addrinfo* operator->() const { return ptr; }
-};
-}
-
-descriptor connect(const proton::url& u) {
-    descriptor fd = INVALID_DESCRIPTOR;
-    try{
-        auto_addrinfo addr;
-        gai_check(::getaddrinfo(u.host().empty() ? 0 : u.host().c_str(),
-                                u.port().empty() ? 0 : u.port().c_str(),
-                                0, &addr.ptr), u.str()+": ");
-        fd = check(::socket(addr->ai_family, SOCK_STREAM, 0), "connect: ");
-        check(::connect(fd, addr->ai_addr, addr->ai_addrlen), "connect: ");
-        return fd;
-    } catch (...) {
-        if (fd >= 0) close(fd);
-        throw;
-    }
-}
-
-listener::listener(const std::string& host, const std::string &port) : socket_(INVALID_DESCRIPTOR)
{
-    try {
-        auto_addrinfo addr;
-        gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
-                                port.empty() ? 0 : port.c_str(), 0, &addr.ptr),
-                  "listener address invalid: ");
-        socket_ = check(::socket(addr->ai_family, SOCK_STREAM, 0), "listen: ");
-        int yes = 1;
-        check(setsockopt(socket_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)), "setsockopt:
");
-        check(::bind(socket_, addr->ai_addr, addr->ai_addrlen), "bind: ");
-        check(::listen(socket_, 32), "listen: ");
-    } catch (...) {
-        if (socket_ >= 0) close(socket_);
-        throw;
-    }
-}
-
-listener::~listener() { ::close(socket_); }
-
-descriptor listener::accept(std::string& host_str, std::string& port_str) {
-    struct sockaddr_storage addr;
-    socklen_t size = sizeof(addr);
-    int fd = check(::accept(socket_, (struct sockaddr *)&addr, &size), "accept: ");
-    char host[NI_MAXHOST], port[NI_MAXSERV];
-    gai_check(getnameinfo((struct sockaddr *) &addr, sizeof(addr),
-                          host, sizeof(host), port, sizeof(port), 0),
-              "accept invalid remote address: ");
-    host_str = host;
-    port_str = port;
-    return fd;
-}
-
-// Empty stubs, only needed on windows.
-void initialize() {}
-void finalize() {}
-
-}}}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/io/windows/socket.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/windows/socket.cpp b/proton-c/bindings/cpp/src/io/windows/socket.cpp
deleted file mode 100644
index f312525..0000000
--- a/proton-c/bindings/cpp/src/io/windows/socket.cpp
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "msg.hpp"
-
-#include <proton/io/socket.hpp>
-#include <proton/url.hpp>
-
-#define FD_SETSIZE 2048
-#ifndef _WIN32_WINNT
-#define _WIN32_WINNT 0x0501
-#endif
-#if _WIN32_WINNT < 0x0501
-#error "Proton requires Windows API support for XP or later."
-#endif
-#include <winsock2.h>
-#include <mswsock.h>
-#include <Ws2tcpip.h>
-
-#include <ctype.h>
-#include <errno.h>
-#include <stdio.h>
-#include <assert.h>
-
-namespace proton {
-namespace io {
-namespace socket {
-
-const descriptor INVALID_DESCRIPTOR = INVALID_SOCKET;
-
-std::string error_str() {
-    HRESULT code = WSAGetLastError();
-    char err[1024] = {0};
-    FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS |
-                  FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err),
NULL);
-    return err;
-}
-
-io_error::io_error(const std::string& s) : error(s) {}
-
-namespace {
-
-template <class T> T check(T result, const std::string& msg=std::string()) {
-    if (result == SOCKET_ERROR)
-        throw io_error(msg + error_str());
-    return result;
-}
-
-void gai_check(int result, const std::string& msg="") {
-    if (result)
-        throw io_error(msg + gai_strerror(result));
-}
-
-} // namespace
-
-void initialize() {
-    WSADATA unused;
-    check(WSAStartup(0x0202, &unused), "can't load WinSock: "); // Version 2.2
-}
-
-void finalize() {
-    WSACleanup();
-}
-
-void engine::init() {
-    u_long nonblock = 1;
-    check(::ioctlsocket(socket_, FIONBIO, &nonblock), "ioctlsocket: ");
-}
-
-engine::engine(descriptor fd, handler& h, const connection_options &opts)
-    : connection_engine(h, opts), socket_(fd)
-{
-    init();
-}
-
-engine::engine(const url& u, handler& h, const connection_options &opts)
-    : connection_engine(h, opts), socket_(connect(u))
-{
-    init();
-    connection().open();
-}
-
-engine::~engine() {}
-
-void engine::read() {
-    mutable_buffer rbuf = read_buffer();
-    if (rbuf.size > 0) {
-        int n = ::recv(socket_, rbuf.data, rbuf.size, 0);
-        if (n > 0)
-            read_done(n);
-        else if (n == 0)
-            read_close();
-        else if (n == SOCKET_ERROR && WSAGetLastError() != WSAEWOULDBLOCK)
-            close(error_condition("io_error", error_str()));
-    }
-}
-
-void engine::write() {
-    const_buffer wbuf = write_buffer();
-    if (wbuf.size > 0) {
-    int n = ::send(socket_, wbuf.data, wbuf.size, 0);
-    if (n > 0)
-        write_done(n);
-    else if (n == SOCKET_ERROR && WSAGetLastError() != WSAEWOULDBLOCK)
-        close(error_condition("io_error", error_str()));
-    }
-}
-
-void engine::run() {
-    while (dispatch()) {
-        fd_set rd, wr;
-        FD_ZERO(&rd);
-        if (read_buffer().size)
-            FD_SET(socket_, &rd);
-        FD_ZERO(&wr);
-        if (write_buffer().size)
-            FD_SET(socket_, &wr);
-        int n = ::select(FD_SETSIZE, &rd, &wr, NULL, NULL);
-        if (n < 0) {
-            close(error_condition("select: ", error_str()));
-            break;
-        }
-        if (FD_ISSET(socket_, &rd)) {
-            read();
-        }
-        if (FD_ISSET(socket_, &wr))
-            write();
-    }
-    ::closesocket(socket_);
-}
-
-namespace {
-struct auto_addrinfo {
-    struct addrinfo *ptr;
-    auto_addrinfo() : ptr(0) {}
-    ~auto_addrinfo() { ::freeaddrinfo(ptr); }
-    addrinfo* operator->() const { return ptr; }
-};
-
-static const char *amqp_service(const char *port) {
-  // Help older Windows to know about amqp[s] ports
-  if (port) {
-    if (!strcmp("amqp", port)) return "5672";
-    if (!strcmp("amqps", port)) return "5671";
-  }
-  return port;
-}
-}
-
-
-descriptor connect(const proton::url& u) {
-    // convert "0.0.0.0" to "127.0.0.1" on Windows for outgoing sockets
-    std::string host = (u.host() == "0.0.0.0") ? "127.0.0.1" : u.host();
-    descriptor fd = INVALID_SOCKET;
-    try{
-        auto_addrinfo addr;
-        gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
-                                amqp_service(u.port().empty() ? 0 : u.port().c_str()),
-                                0, &addr.ptr),
-                  "connect address invalid: ");
-        fd = check(::socket(addr->ai_family, SOCK_STREAM, 0), "connect socket: ");
-        check(::connect(fd, addr->ai_addr, addr->ai_addrlen), "connect: ");
-        return fd;
-    } catch (...) {
-        if (fd != INVALID_SOCKET) ::closesocket(fd);
-        throw;
-    }
-}
-
-listener::listener(const std::string& host, const std::string &port) : socket_(INVALID_SOCKET)
{
-    try {
-        auto_addrinfo addr;
-        gai_check(::getaddrinfo(host.empty() ? 0 : host.c_str(),
-                                port.empty() ? 0 : port.c_str(), 0, &addr.ptr),
-                  "listener address invalid: ");
-        socket_ = check(::socket(addr->ai_family, SOCK_STREAM, 0), "listener socket: ");
-        bool yes = true;
-        check(setsockopt(socket_, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char*)&yes,
sizeof(yes)), "setsockopt: ");
-        check(::bind(socket_, addr->ai_addr, addr->ai_addrlen), "listener bind: ");
-        check(::listen(socket_, 32), "listener listen: ");
-    } catch (...) {
-        if (socket_ != INVALID_SOCKET) ::closesocket(socket_);
-        throw;
-    }
-}
-
-listener::~listener() { ::closesocket(socket_); }
-
-descriptor listener::accept(std::string& host_str, std::string& port_str) {
-    struct sockaddr_storage addr;
-    socklen_t size = sizeof(addr);
-    int fd = check(::accept(socket_, (struct sockaddr *)&addr, &size), "accept: ");
-    char host[NI_MAXHOST], port[NI_MAXSERV];
-    gai_check(getnameinfo((struct sockaddr *) &addr, sizeof(addr),
-                          host, sizeof(host), port, sizeof(port), 0),
-              "accept invalid remote address: ");
-    host_str = host;
-    port_str = port;
-    return fd;
-}
-
-}}}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/proton-c/bindings/cpp/src/messaging_adapter.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/messaging_adapter.cpp b/proton-c/bindings/cpp/src/messaging_adapter.cpp
index 1981726..a1ba250 100644
--- a/proton-c/bindings/cpp/src/messaging_adapter.cpp
+++ b/proton-c/bindings/cpp/src/messaging_adapter.cpp
@@ -68,7 +68,10 @@ void messaging_adapter::on_link_flow(proton_event &pe) {
     pn_event_t *pne = pe.pn_event();
     pn_link_t *lnk = pn_event_link(pne);
     sender s(lnk);
-    if (lnk && pn_link_is_sender(lnk) && pn_link_credit(lnk) > 0) {
+    int state = pn_link_state(lnk);
+    if (lnk && pn_link_is_sender(lnk) && pn_link_credit(lnk) > 0 &&
+        (state&PN_LOCAL_ACTIVE) && (state&PN_REMOTE_ACTIVE))
+    {
         // create on_message extended event
         delegate_.on_sendable(s);
     }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/tests/tools/apps/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/tests/tools/apps/cpp/CMakeLists.txt b/tests/tools/apps/cpp/CMakeLists.txt
index 0c120f2..2bc1bc5 100644
--- a/tests/tools/apps/cpp/CMakeLists.txt
+++ b/tests/tools/apps/cpp/CMakeLists.txt
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-include_directories("${CMAKE_SOURCE_DIR}/examples/cpp")
+include_directories("${CMAKE_SOURCE_DIR}/examples/cpp" "${CMAKE_SOURCE_DIR}/examples/cpp/lib")
 add_executable(reactor_send_cpp reactor_send.cpp)
 
 target_link_libraries(reactor_send_cpp qpid-proton qpid-proton-cpp)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/deccf354/tests/tools/apps/cpp/reactor_send.cpp
----------------------------------------------------------------------
diff --git a/tests/tools/apps/cpp/reactor_send.cpp b/tests/tools/apps/cpp/reactor_send.cpp
index a3dc003..d4045b4 100644
--- a/tests/tools/apps/cpp/reactor_send.cpp
+++ b/tests/tools/apps/cpp/reactor_send.cpp
@@ -114,7 +114,7 @@ int main(int argc, char **argv) {
     int message_count = 10;
     int message_size = 100;
     bool replying = false;
-    options opts(argc, argv);
+    example::options opts(argc, argv);
     opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
     opts.add_value(message_count, 'c', "messages", "send COUNT messages", "COUNT");
     opts.add_value(message_size, 'b', "bytes", "send binary messages BYTES long", "BYTES");
@@ -124,7 +124,7 @@ int main(int argc, char **argv) {
         reactor_send send(address, message_count, message_size, replying);
         proton::container(send).run();
         return 0;
-    } catch (const bad_option& e) {
+    } catch (const example::bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;
     } catch (const std::exception& e) {
         std::cerr << e.what() << std::endl;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message