Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 39039200CCC for ; Fri, 21 Jul 2017 19:02:03 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 378CD16A8D7; Fri, 21 Jul 2017 17:02:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C842716A8C2 for ; Fri, 21 Jul 2017 19:02:00 +0200 (CEST) Received: (qmail 10330 invoked by uid 500); 21 Jul 2017 17:01:59 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 10184 invoked by uid 99); 21 Jul 2017 17:01:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Jul 2017 17:01:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 035C4F326A; Fri, 21 Jul 2017 17:01:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: astitcher@apache.org To: commits@qpid.apache.org Date: Fri, 21 Jul 2017 17:02:01 -0000 Message-Id: <3e5878a095164e339048b33985de66be@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [05/20] qpid-proton git commit: PROTON-1400: [C++ binding] Proactor container implementation - Remove all reactor use - Rearrange object context code - Change container includes to proactor container includes - Add sender/receiver options API to connecti archived-at: Fri, 21 Jul 2017 17:02:03 -0000 PROTON-1400: [C++ binding] Proactor container implementation - Remove all reactor use - Rearrange object context code - Change container includes to proactor container includes - Add sender/receiver options API to connection so we never need container in handlers - Rework connection_driver remove all use of container - Change signature of listener_handler callbacks to supply the listener Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/9fad779c Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/9fad779c Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/9fad779c Branch: refs/heads/master Commit: 9fad779c98dcc2ccc75e5055d7333e9dd862c235 Parents: 6f88f52 Author: Andrew Stitcher Authored: Wed Feb 8 02:32:36 2017 -0500 Committer: Andrew Stitcher Committed: Fri Jul 21 12:50:06 2017 -0400 ---------------------------------------------------------------------- examples/cpp/ssl.cpp | 9 +- proton-c/bindings/cpp/CMakeLists.txt | 6 +- .../bindings/cpp/include/proton/connection.hpp | 6 + .../bindings/cpp/include/proton/container.hpp | 6 - .../cpp/include/proton/internal/config.hpp | 8 + .../cpp/include/proton/io/connection_driver.hpp | 25 +- .../include/proton/io/container_impl_base.hpp | 144 ------- .../cpp/include/proton/listen_handler.hpp | 6 +- .../bindings/cpp/include/proton/listener.hpp | 18 +- proton-c/bindings/cpp/src/connection.cpp | 27 +- .../bindings/cpp/src/connection_driver_test.cpp | 11 +- .../bindings/cpp/src/connection_options.cpp | 30 +- proton-c/bindings/cpp/src/container.cpp | 20 +- proton-c/bindings/cpp/src/container_test.cpp | 11 +- proton-c/bindings/cpp/src/contexts.cpp | 55 +-- proton-c/bindings/cpp/src/event_loop.cpp | 2 +- proton-c/bindings/cpp/src/include/contexts.hpp | 56 +-- .../cpp/src/include/messaging_adapter.hpp | 2 - .../cpp/src/include/proactor_container_impl.hpp | 133 ++++++ .../src/include/proactor_event_loop_impl.hpp | 54 +++ .../bindings/cpp/src/include/proton_bits.hpp | 18 +- .../bindings/cpp/src/include/proton_event.hpp | 16 +- .../cpp/src/include/test_dummy_container.hpp | 82 ---- .../bindings/cpp/src/io/connection_driver.cpp | 31 +- proton-c/bindings/cpp/src/listener.cpp | 11 +- proton-c/bindings/cpp/src/messaging_adapter.cpp | 21 +- .../cpp/src/proactor_container_impl.cpp | 419 +++++++++++++++++++ proton-c/bindings/cpp/src/receiver.cpp | 1 - proton-c/bindings/cpp/src/receiver_options.cpp | 2 +- proton-c/bindings/cpp/src/reconnect_timer.cpp | 1 - proton-c/bindings/cpp/src/sender_options.cpp | 2 +- proton-c/bindings/cpp/src/session_options.cpp | 2 +- 32 files changed, 765 insertions(+), 470 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/examples/cpp/ssl.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/ssl.cpp b/examples/cpp/ssl.cpp index 2e901c2..00bbccd 100644 --- a/examples/cpp/ssl.cpp +++ b/examples/cpp/ssl.cpp @@ -66,16 +66,16 @@ namespace { struct server_handler : public proton::messaging_handler { - std::string url; + proton::listener listener; void on_connection_open(proton::connection &c) OVERRIDE { std::cout << "Inbound server connection connected via SSL. Protocol: " << c.transport().ssl().protocol() << std::endl; - c.container().stop_listening(url); // Just expecting the one connection. + listener.stop(); // Just expecting the one connection. } void on_transport_error(proton::transport &t) OVERRIDE { - t.connection().container().stop_listening(url); + listener.stop(); } void on_message(proton::delivery &, proton::message &m) OVERRIDE { @@ -122,8 +122,7 @@ class hello_world_direct : public proton::messaging_handler { } else throw std::logic_error("bad verify mode: " + verify); c.client_connection_options(client_opts); - s_handler.url = url; - c.listen(url); + s_handler.listener = c.listen(url); c.open_sender(url); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt index d1c6fd1..295a99e 100644 --- a/proton-c/bindings/cpp/CMakeLists.txt +++ b/proton-c/bindings/cpp/CMakeLists.txt @@ -32,9 +32,8 @@ set(qpid-proton-cpp-source src/map.cpp src/connection.cpp src/connection_options.cpp - src/connector.cpp src/container.cpp - src/container_impl.cpp + src/proactor_container_impl.cpp src/contexts.cpp src/data.cpp src/decimal.cpp @@ -58,7 +57,6 @@ set(qpid-proton-cpp-source src/proton_bits.cpp src/proton_event.cpp src/proton_handler.cpp - src/reactor.cpp src/receiver.cpp src/receiver_options.cpp src/reconnect_timer.cpp @@ -91,7 +89,7 @@ set_source_files_properties ( add_library(qpid-proton-cpp SHARED ${qpid-proton-cpp-source}) -target_link_libraries (qpid-proton-cpp ${PLATFORM_LIBS} qpid-proton) +target_link_libraries (qpid-proton-cpp ${PLATFORM_LIBS} qpid-proton-core qpid-proton-proactor) set_target_properties ( qpid-proton-cpp http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/include/proton/connection.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/connection.hpp b/proton-c/bindings/cpp/include/proton/connection.hpp index a4046be..331ba82 100644 --- a/proton-c/bindings/cpp/include/proton/connection.hpp +++ b/proton-c/bindings/cpp/include/proton/connection.hpp @@ -106,6 +106,12 @@ PN_CPP_CLASS_EXTERN connection : public internal::object, publi PN_CPP_EXTERN receiver open_receiver(const std::string &addr, const receiver_options &); + /// @copydoc container::sender_options + PN_CPP_EXTERN class sender_options sender_options() const; + + /// @copydoc container::receiver_options + PN_CPP_EXTERN class receiver_options receiver_options() const; + /// Return all sessions on this connection. PN_CPP_EXTERN session_range sessions() const; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/include/proton/container.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp index 6f10c3c..be83e5e 100644 --- a/proton-c/bindings/cpp/include/proton/container.hpp +++ b/proton-c/bindings/cpp/include/proton/container.hpp @@ -73,12 +73,6 @@ class PN_CPP_CLASS_EXTERN container { /// Connect to `url` and send an open request to the remote peer. PN_CPP_EXTERN returned connect(const std::string& url); - /// @cond INTERNAL - /// Stop listening on url, must match the url string given to listen(). - /// You can also use the proton::listener object returned by listen() - PN_CPP_EXTERN void stop_listening(const std::string& url); - /// @endcond - /// Start listening on url. /// /// Calls to the @ref listen_handler are serialized for this listener, http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/include/proton/internal/config.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/internal/config.hpp b/proton-c/bindings/cpp/include/proton/internal/config.hpp index da7f480..79d201c 100644 --- a/proton-c/bindings/cpp/include/proton/internal/config.hpp +++ b/proton-c/bindings/cpp/include/proton/internal/config.hpp @@ -95,6 +95,14 @@ #define PN_CPP_HAS_CHRONO PN_CPP_HAS_CPP11 #endif +#ifndef PN_CPP_HAS_STD_MUTEX +#define PN_CPP_HAS_STD_MUTEX PN_CPP_HAS_CPP11 +#endif + +#ifndef PN_CPP_HAS_STD_ATOMIC +#define PN_CPP_HAS_STD_ATOMIC PN_CPP_HAS_CPP11 +#endif + #endif // PROTON_INTERNAL_CONFIG_HPP /// @endcond http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp index 56deb00..8d0be85 100644 --- a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp +++ b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp @@ -94,28 +94,11 @@ struct const_buffer { class PN_CPP_CLASS_EXTERN connection_driver { public: - /// An engine that is not associated with a proton::container or - /// proton::event_loop. - /// - /// Accessing the container or event_loop for this connection in - /// a proton::messaging_handler will throw a proton::error exception. - /// + /// An engine without a container id. PN_CPP_EXTERN connection_driver(); - /// Create a connection driver associated with a proton::container and - /// optional event_loop. If the event_loop is not provided attempts to use - /// it will throw proton::error. - /// - /// Takes ownership of the event_loop. Note the proton::connection created - /// by this connection_driver can outlive the connection_driver itself if - /// the user pins it in memory using the proton::thread_safe<> template. - /// The event_loop is deleted when, and only when, the proton::connection is. - /// - PN_CPP_EXTERN connection_driver(proton::container&); -#if PN_CPP_HAS_RVALUE_REFERENCES - /// @copydoc connection_driver() - PN_CPP_EXTERN connection_driver(proton::container&, event_loop&& loop); -#endif + /// Create a connection driver associated with a container id. + PN_CPP_EXTERN connection_driver(const std::string&); PN_CPP_EXTERN ~connection_driver(); @@ -207,8 +190,8 @@ PN_CPP_CLASS_EXTERN connection_driver { connection_driver(const connection_driver&); connection_driver& operator=(const connection_driver&); + std::string container_id_; messaging_handler* handler_; - proton::container* container_; pn_connection_driver_t driver_; }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/include/proton/io/container_impl_base.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/io/container_impl_base.hpp b/proton-c/bindings/cpp/include/proton/io/container_impl_base.hpp deleted file mode 100644 index a04b4ff..0000000 --- a/proton-c/bindings/cpp/include/proton/io/container_impl_base.hpp +++ /dev/null @@ -1,144 +0,0 @@ -#ifndef PROTON_IO_CONTAINER_IMPL_BASE_HPP -#define PROTON_IO_CONTAINER_IMPL_BASE_HPP - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "../container.hpp" - -#include -#include -#include - -namespace proton { -namespace io { - -/// **Experimental** - A base container implementation. -/// -/// This is a thread-safe partial implementation of the -/// proton::container interface to reduce boilerplate code in -/// container implementations. Requires C++11. -/// -/// You can ignore this class if you want to implement the functions -/// in a different way. -class container_impl_base : public standard_container { - public: - // Pull in base class functions here so that name search finds all the overloads - using standard_container::open_receiver; - using standard_container::open_sender; - - /// @see proton::container::client_connection_options - void client_connection_options(const connection_options & opts) { - store(client_copts_, opts); - } - - /// @see proton::container::client_connection_options - connection_options client_connection_options() const { - return load(client_copts_); - } - - /// @see proton::container::server_connection_options - void server_connection_options(const connection_options & opts) { - store(server_copts_, opts); - } - - /// @see proton::container::server_connection_options - connection_options server_connection_options() const { - return load(server_copts_); - } - - /// @see proton::container::sender_options - void sender_options(const class sender_options & opts) { - store(sender_opts_, opts); - } - - /// @see proton::container::sender_options - class sender_options sender_options() const { - return load(sender_opts_); - } - - /// @see proton::container::receiver_options - void receiver_options(const class receiver_options & opts) { - store(receiver_opts_, opts); - } - - /// @see proton::container::receiver_options - class receiver_options receiver_options() const { - return load(receiver_opts_); - } - - /// @see proton::container::open_sender - returned open_sender( - const std::string &url, const class sender_options &opts, const connection_options &copts) - { - return open_link(url, opts, copts, &connection::open_sender); - } - - /// @see proton::container::open_receiver - returned open_receiver( - const std::string &url, const class receiver_options &opts, const connection_options &copts) - { - return open_link(url, opts, copts, &connection::open_receiver); - } - - private: - template - returned open_link( - const std::string &url_str, const Opts& opts, const connection_options& copts, - T (connection::*open_fn)(const std::string&, const Opts&)) - { - std::string addr = url(url_str).path(); - std::shared_ptr > ts_connection = connect(url_str, copts); - std::promise > result_promise; - auto do_open = [ts_connection, addr, opts, open_fn, &result_promise]() { - try { - connection c = ts_connection->unsafe(); - returned s = make_thread_safe((c.*open_fn)(addr, opts)); - result_promise.set_value(s); - } catch (...) { - result_promise.set_exception(std::current_exception()); - } - }; - ts_connection->event_loop()->inject(do_open); - std::future > result_future = result_promise.get_future(); - if (!result_future.valid()) - throw error(url_str+": connection closed"); - return result_future.get(); - } - - mutable std::mutex lock_; - template T load(const T& v) const { - std::lock_guard g(lock_); - return v; - } - template void store(T& v, const T& x) const { - std::lock_guard g(lock_); - v = x; - } - connection_options client_copts_, server_copts_; - class receiver_options receiver_opts_; - class sender_options sender_opts_; -}; - -} // io -} // proton - -#endif // PROTON_IO_CONTAINER_IMPL_BASE_HPP http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/include/proton/listen_handler.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/listen_handler.hpp b/proton-c/bindings/cpp/include/proton/listen_handler.hpp index 99f7558..08d5e76 100644 --- a/proton-c/bindings/cpp/include/proton/listen_handler.hpp +++ b/proton-c/bindings/cpp/include/proton/listen_handler.hpp @@ -41,14 +41,14 @@ class listen_handler { /// the connection. messaging_handler::on_connection_open() will be called with /// the proton::connection, it can call connection::open() to accept or /// connection::close() to reject the connection. - virtual connection_options on_accept()= 0; + virtual connection_options on_accept(listener&)= 0; /// Called if there is a listening error, with an error message. /// close() will also be called. - virtual void on_error(const std::string&) {} + virtual void on_error(listener&, const std::string&) {} /// Called when this listen_handler is no longer needed, and can be deleted. - virtual void on_close() {} + virtual void on_close(listener&) {} }; } // proton http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/include/proton/listener.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/listener.hpp b/proton-c/bindings/cpp/include/proton/listener.hpp index 4b4ca24..c7f95a7 100644 --- a/proton-c/bindings/cpp/include/proton/listener.hpp +++ b/proton-c/bindings/cpp/include/proton/listener.hpp @@ -20,30 +20,30 @@ * under the License. */ -#include "./fwd.hpp" #include "./internal/export.hpp" -#include +struct pn_listener_t; namespace proton { /// A listener for incoming connections. class PN_CPP_CLASS_EXTERN listener { + /// @cond INTERNAL + listener(pn_listener_t*); + /// @endcond + public: /// Create an empty listener. PN_CPP_EXTERN listener(); - /// @cond INTERNAL - PN_CPP_EXTERN listener(container&, const std::string&); - /// @endcond - /// Stop listening on the address provided to the call to /// container::listen that returned this listener. PN_CPP_EXTERN void stop(); - private: - std::string url_; - container* container_; + private: + pn_listener_t* listener_; + + friend class container; }; } // proton http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/connection.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/connection.cpp b/proton-c/bindings/cpp/src/connection.cpp index f706df4..113a06f 100644 --- a/proton-c/bindings/cpp/src/connection.cpp +++ b/proton-c/bindings/cpp/src/connection.cpp @@ -38,7 +38,6 @@ #include #include #include -#include #include namespace proton { @@ -72,13 +71,7 @@ std::string connection::user() const { container& connection::container() const { class container* c = connection_context::get(pn_object()).container; - if (!c) { - pn_reactor_t *r = pn_object_reactor(pn_object()); - if (r) - c = &container_context::get(r); - } - if (!c) - throw proton::error("connection does not have a container"); + if (!c) throw proton::error("No container"); return *c; } @@ -133,7 +126,7 @@ sender connection::open_sender(const std::string &addr) { return open_sender(addr, sender_options()); } -sender connection::open_sender(const std::string &addr, const sender_options &opts) { +sender connection::open_sender(const std::string &addr, const class sender_options &opts) { return default_session().open_sender(addr, opts); } @@ -141,11 +134,25 @@ receiver connection::open_receiver(const std::string &addr) { return open_receiver(addr, receiver_options()); } -receiver connection::open_receiver(const std::string &addr, const receiver_options &opts) +receiver connection::open_receiver(const std::string &addr, const class receiver_options &opts) { return default_session().open_receiver(addr, opts); } +class sender_options connection::sender_options() const { + connection_context& ctx = connection_context::get(pn_object()); + return ctx.container ? + ctx.container->sender_options() : + proton::sender_options(); +} + +class receiver_options connection::receiver_options() const { + connection_context& ctx = connection_context::get(pn_object()); + return ctx.container ? + ctx.container->receiver_options() : + proton::receiver_options(); +} + error_condition connection::error() const { return make_wrapper(pn_connection_remote_condition(pn_object())); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/connection_driver_test.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/connection_driver_test.cpp b/proton-c/bindings/cpp/src/connection_driver_test.cpp index a5771f9..ae18ebe 100644 --- a/proton-c/bindings/cpp/src/connection_driver_test.cpp +++ b/proton-c/bindings/cpp/src/connection_driver_test.cpp @@ -31,6 +31,7 @@ #include "proton/sender.hpp" #include "proton/sender_options.hpp" #include "proton/source_options.hpp" +#include "proton/thread_safe.hpp" #include "proton/types_fwd.hpp" #include "proton/uuid.hpp" @@ -57,8 +58,8 @@ struct in_memory_driver : public connection_driver { byte_stream& writes; int spinning; - in_memory_driver(byte_stream& rd, byte_stream& wr) : - reads(rd), writes(wr), spinning(0) {} + in_memory_driver(byte_stream& rd, byte_stream& wr, const std::string& name) : + connection_driver(name), reads(rd), writes(wr), spinning(0) {} void do_read() { mutable_buffer rbuf = read_buffer(); @@ -102,8 +103,10 @@ struct driver_pair { byte_stream ab, ba; in_memory_driver a, b; - driver_pair(const connection_options& oa, const connection_options& ob) - : a(ba, ab), b(ab, ba) + driver_pair(const connection_options& oa, const connection_options& ob, + const std::string& name="" + ) : + a(ba, ab, name+"a"), b(ab, ba, name+"b") { a.connect(oa); b.accept(ob); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/connection_options.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/connection_options.cpp b/proton-c/bindings/cpp/src/connection_options.cpp index 506e84e..4644094 100644 --- a/proton-c/bindings/cpp/src/connection_options.cpp +++ b/proton-c/bindings/cpp/src/connection_options.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include "proton/fwd.hpp" #include "proton/connection_options.hpp" #include "proton/messaging_handler.hpp" #include "proton/reconnect_timer.hpp" @@ -27,12 +28,12 @@ #include "acceptor.hpp" #include "contexts.hpp" -#include "connector.hpp" #include "messaging_adapter.hpp" #include "msg.hpp" #include "proton_bits.hpp" #include +#include #include namespace proton { @@ -74,15 +75,14 @@ class connection_options::impl { */ void apply_unbound(connection& c) { pn_connection_t *pnc = unwrap(c); - container::impl::connector *outbound = dynamic_cast( - connection_context::get(unwrap(c)).handler.get()); // Only apply connection options if uninit. bool uninit = c.uninitialized(); if (!uninit) return; + bool outbound = !connection_context::get(pnc).listener_context_; if (reconnect.set && outbound) - outbound->reconnect_timer(reconnect.value); + connection_context::get(pnc).reconnect.reset(new reconnect_timer(reconnect.value)); if (container_id.set) pn_connection_set_container(pnc, container_id.value.c_str()); if (virtual_host.set) @@ -97,31 +97,23 @@ class connection_options::impl { // Transport options. pnt is NULL between reconnect attempts // and if there is a pipelined open frame. pn_connection_t *pnc = unwrap(c); - container::impl::connector *outbound = dynamic_cast( - connection_context::get(unwrap(c)).handler.get()); - pn_transport_t *pnt = pn_connection_transport(pnc); if (!pnt) return; // SSL - if (outbound && outbound->address().scheme() == url::AMQPS) { + connection_context& cc = connection_context::get(pnc); + bool outbound = !cc.listener_context_; + if (outbound && ssl_client_options.set) { // A side effect of pn_ssl() is to set the ssl peer // hostname to the connection hostname, which has // already been adjusted for the virtual_host option. pn_ssl_t *ssl = pn_ssl(pnt); if (pn_ssl_init(ssl, ssl_client_options.value.pn_domain(), NULL)) throw error(MSG("client SSL/TLS initialization error")); - } else if (!outbound) { - // TODO aconway 2016-05-13: reactor only - pn_acceptor_t *pnp = pn_connection_acceptor(pnc); - if (pnp) { - listener_context &lc(listener_context::get(pnp)); - if (lc.ssl) { - pn_ssl_t *ssl = pn_ssl(pnt); - if (pn_ssl_init(ssl, ssl_server_options.value.pn_domain(), NULL)) - throw error(MSG("server SSL/TLS initialization error")); - } - } + } else if (!outbound && ssl_server_options.set) { + pn_ssl_t *ssl = pn_ssl(pnt); + if (pn_ssl_init(ssl, ssl_server_options.value.pn_domain(), NULL)) + throw error(MSG("server SSL/TLS initialization error")); } // SASL http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/container.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container.cpp b/proton-c/bindings/cpp/src/container.cpp index 3daa925..b98da78 100644 --- a/proton-c/bindings/cpp/src/container.cpp +++ b/proton-c/bindings/cpp/src/container.cpp @@ -27,7 +27,7 @@ #include "proton/listener.hpp" #include "proton/thread_safe.hpp" -#include "container_impl.hpp" +#include "proactor_container_impl.hpp" namespace proton { @@ -65,24 +65,12 @@ returned container::open_receiver(const std::string &url, const proton return open_receiver(url, receiver_options(), co); } -namespace{ - struct listen_opts : public listen_handler { - connection_options opts; - listen_opts(const connection_options& o) : opts(o) {} - connection_options on_accept() { return opts; } - void on_close() { delete this; } - }; -} - listener container::listen(const std::string& url, const connection_options& opts) { - // Note: listen_opts::on_close() calls delete(this) so this is not a leak. - // The container will always call on_closed() even if there are errors or exceptions. - listen_opts* lh = new listen_opts(opts); - return listen(url, *lh); + return impl_->listen(url, opts); } listener container::listen(const std::string &url) { - return listen(url, connection_options()); + return impl_->listen(url); } void container::stop() { stop(error_condition()); } @@ -93,8 +81,6 @@ returned container::connect(const std::string& url, const connection listener container::listen(const std::string& url, listen_handler& l) { return impl_->listen(url, l); } -void container::stop_listening(const std::string& url) { impl_->stop_listening(url); } - void container::run() { impl_->run(); } void container::auto_stop(bool set) { impl_->auto_stop(set); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/container_test.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container_test.cpp b/proton-c/bindings/cpp/src/container_test.cpp index e02aff5..d210268 100644 --- a/proton-c/bindings/cpp/src/container_test.cpp +++ b/proton-c/bindings/cpp/src/container_test.cpp @@ -124,12 +124,12 @@ struct test_listener : public proton::listen_handler { bool on_accept_, on_close_; std::string on_error_; test_listener() : on_accept_(false), on_close_(false) {} - proton::connection_options on_accept() PN_CPP_OVERRIDE { + proton::connection_options on_accept(proton::listener&) PN_CPP_OVERRIDE { on_accept_ = true; return proton::connection_options(); } - void on_close() PN_CPP_OVERRIDE { on_close_ = true; } - void on_error(const std::string& e) PN_CPP_OVERRIDE { on_error_ = e; } + void on_close(proton::listener&) PN_CPP_OVERRIDE { on_close_ = true; } + void on_error(proton::listener&, const std::string& e) PN_CPP_OVERRIDE { on_error_ = e; } }; int test_container_bad_address() { @@ -179,6 +179,11 @@ class stop_tester : public proton::messaging_handler { state = 5; } + void on_transport_error(proton::transport & t) PN_CPP_OVERRIDE { + // Do nothing - ignore transport errors - we're going to get one when + // the container stops. + } + public: stop_tester(): state(0) {} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/contexts.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/contexts.cpp b/proton-c/bindings/cpp/src/contexts.cpp index b1a234f..81ef5eb 100644 --- a/proton-c/bindings/cpp/src/contexts.cpp +++ b/proton-c/bindings/cpp/src/contexts.cpp @@ -21,7 +21,6 @@ #include "contexts.hpp" #include "msg.hpp" -#include "reactor.hpp" #include "proton_bits.hpp" #include "proton/error.hpp" @@ -29,8 +28,9 @@ #include #include #include +#include #include -#include +#include "proton/reconnect_timer.hpp" #include #include @@ -48,16 +48,10 @@ pn_class_t cpp_context_class = PN_CLASS(cpp_context); // Handles PN_HANDLE(CONNECTION_CONTEXT) -PN_HANDLE(CONTAINER_CONTEXT) PN_HANDLE(LISTENER_CONTEXT) +PN_HANDLE(SESSION_CONTEXT) PN_HANDLE(LINK_CONTEXT) -void set_context(pn_record_t* record, pn_handle_t handle, const pn_class_t *clazz, void* value) -{ - pn_record_def(record, handle, clazz); - pn_record_set(record, handle, value); -} - template T* get_context(pn_record_t* record, pn_handle_t handle) { return reinterpret_cast(pn_record_get(record, handle)); @@ -71,45 +65,24 @@ void *context::alloc(size_t n) { return pn_object_new(&cpp_context_class, n); } pn_class_t* context::pn_class() { return &cpp_context_class; } +connection_context::connection_context() : + container(0), default_session(0), link_gen(0), handler(0), listener_context_(0) +{} -context::id connection_context::id(pn_connection_t* c) { - return context::id(pn_connection_attachments(c), CONNECTION_CONTEXT); -} - -void container_context::set(const reactor& r, container& c) { - set_context(pn_reactor_attachments(unwrap(r)), CONTAINER_CONTEXT, PN_VOID, &c); +connection_context& connection_context::get(pn_connection_t *c) { + return ref(id(pn_connection_attachments(c), CONNECTION_CONTEXT)); } -container &container_context::get(pn_reactor_t *pn_reactor) { - container *ctx = get_context(pn_reactor_attachments(pn_reactor), CONTAINER_CONTEXT); - if (!ctx) throw error(MSG("Reactor has no C++ container context")); - return *ctx; +listener_context& listener_context::get(pn_listener_t* l) { + return ref(id(pn_listener_attachments(l), LISTENER_CONTEXT)); } -listener_context& listener_context::get(pn_acceptor_t* a) { - // TODO aconway 2016-05-13: reactor only - // A Proton C pn_acceptor_t is really just a selectable - pn_selectable_t *sel = reinterpret_cast(a); - - listener_context* ctx = - get_context(pn_selectable_attachments(sel), LISTENER_CONTEXT); - if (!ctx) { - ctx = context::create(); - set_context(pn_selectable_attachments(sel), LISTENER_CONTEXT, context::pn_class(), ctx); - pn_decref(ctx); - } - return *ctx; +link_context& link_context::get(pn_link_t* l) { + return ref(id(pn_link_attachments(l), LINK_CONTEXT)); } -link_context& link_context::get(pn_link_t* l) { - link_context* ctx = - get_context(pn_link_attachments(l), LINK_CONTEXT); - if (!ctx) { - ctx = context::create(); - set_context(pn_link_attachments(l), LINK_CONTEXT, context::pn_class(), ctx); - pn_decref(ctx); - } - return *ctx; +session_context& session_context::get(pn_session_t* s) { + return ref(id(pn_session_attachments(s), SESSION_CONTEXT)); } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/event_loop.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/event_loop.cpp b/proton-c/bindings/cpp/src/event_loop.cpp index ea4ee71..ab39aa7 100644 --- a/proton-c/bindings/cpp/src/event_loop.cpp +++ b/proton-c/bindings/cpp/src/event_loop.cpp @@ -20,7 +20,7 @@ #include "proton/event_loop.hpp" #include "contexts.hpp" -#include "event_loop_impl.hpp" +#include "proactor_event_loop_impl.hpp" #include #include http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/include/contexts.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/contexts.hpp b/proton-c/bindings/cpp/src/include/contexts.hpp index 742b346..c096a6e 100644 --- a/proton-c/bindings/cpp/src/include/contexts.hpp +++ b/proton-c/bindings/cpp/src/include/contexts.hpp @@ -34,16 +34,16 @@ #include "proton_handler.hpp" -struct pn_session_t; -struct pn_event_t; -struct pn_reactor_t; struct pn_record_t; -struct pn_acceptor_t; +struct pn_link_t; +struct pn_session_t; +struct pn_connection_t; +struct pn_listener_t; namespace proton { class proton_handler; -class reactor; +class reconnect_timer; // Base class for C++ classes that are used as proton contexts. // Contexts are pn_objects managed by pn reference counts, the C++ value is allocated in-place. @@ -82,51 +82,53 @@ class context { static void *alloc(size_t n); }; +class listener_context; + // Connection context used by all connections. class connection_context : public context { public: - connection_context() : container(0), default_session(0), link_gen(0) {} + connection_context(); + static connection_context& get(pn_connection_t *c); class container* container; pn_session_t *default_session; // Owned by connection. message event_message; // re-used by messaging_adapter for performance. io::link_namer* link_gen; // Link name generator. - internal::pn_unique_ptr handler; + messaging_handler* handler; + internal::pn_unique_ptr reconnect; + listener_context* listener_context_; event_loop event_loop_; - - static connection_context& get(pn_connection_t *c) { return ref(id(c)); } - - protected: - static context::id id(pn_connection_t*); -}; - -void container_context(const reactor&, container&); - -class container_context { - public: - static void set(const reactor& r, container& c); - static container& get(pn_reactor_t*); }; class listener_context : public context { public: - static listener_context& get(pn_acceptor_t* c); - listener_context() : listen_handler_(0), ssl(false) {} - connection_options get_options() { return listen_handler_->on_accept(); } - class listen_handler* listen_handler_; - bool ssl; + listener_context() : listen_handler_(0) {} + static listener_context& get(pn_listener_t* c); + + listen_handler* listen_handler_; + internal::pn_unique_ptr connection_options_; }; class link_context : public context { public: + link_context() : handler(0), credit_window(10), pending_credit(0), auto_accept(true), auto_settle(true), draining(false) {} static link_context& get(pn_link_t* l); - link_context() : credit_window(10), auto_accept(true), auto_settle(true), draining(false), pending_credit(0) {} + + messaging_handler* handler; int credit_window; + uint32_t pending_credit; bool auto_accept; bool auto_settle; bool draining; - uint32_t pending_credit; +}; + +class session_context : public context { + public: + session_context() : handler(0) {} + static session_context& get(pn_session_t* s); + + messaging_handler* handler; }; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/include/messaging_adapter.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/messaging_adapter.hpp b/proton-c/bindings/cpp/src/include/messaging_adapter.hpp index 5371eec..d7eb6a0 100644 --- a/proton-c/bindings/cpp/src/include/messaging_adapter.hpp +++ b/proton-c/bindings/cpp/src/include/messaging_adapter.hpp @@ -39,8 +39,6 @@ class messaging_adapter : public proton_handler public: messaging_adapter(messaging_handler &delegate) : delegate_(delegate) {} - void on_reactor_init(proton_event &e); - void on_reactor_final(proton_event & e); void on_link_flow(proton_event &e); void on_delivery(proton_event &e); void on_connection_remote_open(proton_event &e); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp new file mode 100644 index 0000000..8c12c02 --- /dev/null +++ b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp @@ -0,0 +1,133 @@ +#ifndef PROTON_CPP_PROACTOR_CONTAINERIMPL_H +#define PROTON_CPP_PROACTOR_CONTAINERIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/fwd.hpp" +#include "proton/container.hpp" +#include "proton/connection.hpp" +#include "proton/connection_options.hpp" +#include "proton/duration.hpp" +#include "proton/error_condition.hpp" +#include "proton/messaging_handler.hpp" +#include "proton/receiver.hpp" +#include "proton/receiver_options.hpp" +#include "proton/sender.hpp" +#include "proton/sender_options.hpp" + +#include "proton_bits.hpp" +#include "proton_handler.hpp" + +#include +#include +#include +#include + +struct pn_proactor_t; +struct pn_listener_t; +struct pn_event_t; + +namespace proton { + +class container::impl { + public: + impl(container& c, const std::string& id, messaging_handler* = 0); + ~impl(); + std::string id() const { return id_; } + returned connect(const std::string&, const connection_options&); + returned open_sender( + const std::string&, const proton::sender_options &, const connection_options &); + returned open_receiver( + const std::string&, const proton::receiver_options &, const connection_options &); + listener listen(const std::string&); + listener listen(const std::string&, const connection_options& lh); + listener listen(const std::string&, listen_handler& lh); + void client_connection_options(const connection_options &); + connection_options client_connection_options() const { return client_connection_options_; } + void server_connection_options(const connection_options &); + connection_options server_connection_options() const { return server_connection_options_; } + void sender_options(const proton::sender_options&); + class sender_options sender_options() const { return sender_options_; } + void receiver_options(const proton::receiver_options&); + class receiver_options receiver_options() const { return receiver_options_; } + void run(); + void stop(const error_condition& err); + void auto_stop(bool set); + void schedule(duration, void_function0&); +#if PN_CPP_HAS_STD_FUNCTION + void schedule(duration, std::function); +#endif + template static void set_handler(T s, messaging_handler* h); + template static messaging_handler* get_handler(T s); + + private: + pn_listener_t* listen_common_lh(const std::string&); + connection connect_common(const std::string&, const connection_options&); + + // Event loop to run in each container thread + static void thread(impl&); + bool handle(pn_event_t*); + void run_timer_jobs(); + + container& container_; + + struct scheduled { + timestamp time; // duration from epoch for task +#if PN_CPP_HAS_STD_FUNCTION + std::function task; +#else + void_function0* task_; + void task(); +#endif + + // We want to get to get the *earliest* first so test is "reversed" + bool operator < (const scheduled& r) const { return r.time < time; } + }; + std::vector deferred_; // This vector is kept as a heap + + pn_proactor_t* proactor_; + messaging_handler* handler_; + std::string id_; + connection_options client_connection_options_; + connection_options server_connection_options_; + proton::sender_options sender_options_; + proton::receiver_options receiver_options_; + + proton::error_condition stop_err_; + bool auto_stop_; + bool stopping_; +}; + +template +void container::impl::set_handler(T s, messaging_handler* mh) { + internal::set_messaging_handler(s, mh); +} + +template +messaging_handler* container::impl::get_handler(T s) { + return internal::get_messaging_handler(s); +} + + +} + +#endif /*!PROTON_CPP_PROACTOR_CONTAINERIMPL_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/include/proactor_event_loop_impl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/proactor_event_loop_impl.hpp b/proton-c/bindings/cpp/src/include/proactor_event_loop_impl.hpp new file mode 100644 index 0000000..8fa7acf --- /dev/null +++ b/proton-c/bindings/cpp/src/include/proactor_event_loop_impl.hpp @@ -0,0 +1,54 @@ +#ifndef PROTON_CPP_EVENT_LOOP_IMPL_HPP +#define PROTON_CPP_EVENT_LOOP_IMPL_HPP + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/fwd.hpp" + +struct pn_connection_t; + +namespace proton { + +class event_loop::impl { + public: + impl(pn_connection_t*); + + bool inject(void_function0& f); +#if PN_CPP_HAS_STD_FUNCTION + bool inject(std::function f); + typedef std::vector > jobs; +#else + typedef std::vector jobs; +#endif + + + void run_all_jobs(); + void finished(); + + jobs jobs_; + pn_connection_t* connection_; + bool finished_; +}; + +} + +#endif // PROTON_CPP_EVENT_LOOP_IMPL_HPP http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/include/proton_bits.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/proton_bits.hpp b/proton-c/bindings/cpp/src/include/proton_bits.hpp index 53f2230..e72f343 100644 --- a/proton-c/bindings/cpp/src/include/proton_bits.hpp +++ b/proton-c/bindings/cpp/src/include/proton_bits.hpp @@ -24,6 +24,8 @@ #include #include +#include "contexts.hpp" + /**@file * * Assorted internal proton utilities. @@ -65,6 +67,7 @@ class terminus; class source; class target; class reactor; +class messaging_handler; std::string error_str(long code); @@ -127,12 +130,19 @@ public: static typename wrapped::type* unwrap(const T& t) { return t.pn_object(); } }; -// Get attachments for various proton-c types +template struct context {}; +template <> struct context {typedef link_context type; }; +template <> struct context {typedef link_context type; }; +template <> struct context {typedef link_context type; }; +template <> struct context {typedef session_context type; }; +template <> struct context {typedef connection_context type; }; + +template +inline void set_messaging_handler(T t, messaging_handler* mh) { context::type::get(factory::unwrap(t)).handler = mh; } + template -inline pn_record_t* get_attachments(T*); +inline messaging_handler* get_messaging_handler(T* t) { return context::type>::type::get(t).handler; } -template <> inline pn_record_t* get_attachments(pn_session_t* s) { return pn_session_attachments(s); } -template <> inline pn_record_t* get_attachments(pn_link_t* l) { return pn_link_attachments(l); } } template http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/include/proton_event.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/proton_event.hpp b/proton-c/bindings/cpp/src/include/proton_event.hpp index 374da85..be324e7 100644 --- a/proton-c/bindings/cpp/src/include/proton_event.hpp +++ b/proton-c/bindings/cpp/src/include/proton_event.hpp @@ -266,23 +266,12 @@ class proton_event }; ///@} - proton_event(pn_event_t *ce, class container* cont) : - pn_event_(ce), - container_(cont) + proton_event(pn_event_t *ce) : + pn_event_(ce) {} pn_event_t* pn_event() const { return pn_event_; } - /** Return a reference to the container, throws proton::error if there is none. */ - class container& container() const { - if (!container_) - throw proton::error("event does not have a container"); - return *container_; - } - - /** Return a pointer to the container if there is one, NULL otherwise. */ - class container* container_ptr() const { return container_; } - /// Get type of event event_type type() const { return event_type(pn_event_type(pn_event_)); } @@ -290,7 +279,6 @@ class proton_event private: pn_event_t *pn_event_; - class container* container_; }; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/include/test_dummy_container.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/test_dummy_container.hpp b/proton-c/bindings/cpp/src/include/test_dummy_container.hpp deleted file mode 100644 index daed435..0000000 --- a/proton-c/bindings/cpp/src/include/test_dummy_container.hpp +++ /dev/null @@ -1,82 +0,0 @@ -#ifndef TEST_DUMMY_CONTAINER_HPP -#define TEST_DUMMY_CONTAINER_HPP - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "proton/container.hpp" -#include "proton/event_loop.hpp" -#include "proton/thread_safe.hpp" - -namespace test { - -using namespace proton; - -class dummy_container : public standard_container { - public: - dummy_container(const std::string cid="") : - id_(cid), fail("not implemented for dummy_container") {} - - // Pull in base class functions here so that name search finds all the overloads - using standard_container::stop; - using standard_container::connect; - using standard_container::listen; - using standard_container::open_receiver; - using standard_container::open_sender; - - returned connect(const std::string&, const connection_options&) { throw fail; } - listener listen(const std::string& , listen_handler& ) { throw fail; } - void stop_listening(const std::string&) { throw fail; } - void run() { throw fail; } - void auto_stop(bool) { throw fail; } - void stop(const proton::error_condition& ) { throw fail; } - returned open_sender(const std::string &, const proton::sender_options &, const connection_options&) { throw fail; } - returned open_receiver( const std::string &, const proton::receiver_options &, const connection_options &) { throw fail; } - std::string id() const { return id_; } - void client_connection_options(const connection_options &o) { ccopts_ = o; } - connection_options client_connection_options() const { return ccopts_; } - void server_connection_options(const connection_options &o) { scopts_ = o; } - connection_options server_connection_options() const { return scopts_; } - void sender_options(const class sender_options &o) { sopts_ = o; } - class sender_options sender_options() const { return sopts_; } - void receiver_options(const class receiver_options &o) { ropts_ = o; } - class receiver_options receiver_options() const { return ropts_; } -#if PN_CPP_HAS_STD_FUNCTION - void schedule(duration, std::function) { throw fail; } -#endif - void schedule(duration, void_function0&) { throw fail; } - - private: - std::string id_; - connection_options ccopts_, scopts_; - class sender_options sopts_; - class receiver_options ropts_; - std::runtime_error fail; -}; - -class dummy_event_loop : public event_loop { -#if PN_CPP_HAS_CPP11 - bool inject(std::function f) PN_CPP_OVERRIDE { f(); return true; } -#endif - bool inject(proton::void_function0& h) PN_CPP_OVERRIDE { h(); return true; } -}; - -} - -#endif // TEST_DUMMY_CONTAINER_HPP http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/io/connection_driver.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/io/connection_driver.cpp b/proton-c/bindings/cpp/src/io/connection_driver.cpp index da8c2a4..d7c5e5c 100644 --- a/proton-c/bindings/cpp/src/io/connection_driver.cpp +++ b/proton-c/bindings/cpp/src/io/connection_driver.cpp @@ -47,23 +47,12 @@ void connection_driver::init() { } } -connection_driver::connection_driver() : handler_(0), container_(0) { init(); } +connection_driver::connection_driver() : handler_(0) { init(); } -connection_driver::connection_driver(class container& cont) : handler_(0), container_(&cont) { +connection_driver::connection_driver(const std::string& id) : container_id_(id), handler_(0) { init(); - connection_context& ctx = connection_context::get(unwrap(connection())); - ctx.container = container_; } -#if PN_CPP_HAS_RVALUE_REFERENCES -connection_driver::connection_driver(class container& cont, event_loop&& loop) : handler_(0), container_(&cont) { - init(); - connection_context& ctx = connection_context::get(unwrap(connection())); - ctx.container = container_; - ctx.event_loop_ = loop.impl_.get(); -} -#endif - connection_driver::~connection_driver() { pn_connection_driver_destroy(&driver_); } @@ -79,10 +68,7 @@ void connection_driver::configure(const connection_options& opts, bool server) { void connection_driver::connect(const connection_options& opts) { connection_options all; - if (container_) { - all.container_id(container_->id()); - all.update(container_->client_connection_options()); - } + all.container_id(container_id_); all.update(opts); configure(all, false); connection().open(); @@ -90,10 +76,7 @@ void connection_driver::connect(const connection_options& opts) { void connection_driver::accept(const connection_options& opts) { connection_options all; - if (container_) { - all.container_id(container_->id()); - all.update(container_->server_connection_options()); - } + all.container_id(container_id_); all.update(opts); configure(all, true); } @@ -105,7 +88,7 @@ bool connection_driver::has_events() const { bool connection_driver::dispatch() { pn_event_t* c_event; while ((c_event = pn_connection_driver_next_event(&driver_)) != NULL) { - proton_event cpp_event(c_event, container_); + proton_event cpp_event(c_event); try { if (handler_ != 0) { messaging_adapter adapter(*handler_); @@ -163,8 +146,4 @@ proton::transport connection_driver::transport() const { return make_wrapper(driver_.transport); } -proton::container* connection_driver::container() const { - return container_; -} - }} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/listener.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/listener.cpp b/proton-c/bindings/cpp/src/listener.cpp index 2639f5e..a9ca53d 100644 --- a/proton-c/bindings/cpp/src/listener.cpp +++ b/proton-c/bindings/cpp/src/listener.cpp @@ -18,12 +18,15 @@ */ #include "proton/listener.hpp" -#include "proton/container.hpp" + +#include + +#include "contexts.hpp" namespace proton { -listener::listener() : container_(0) {} -listener::listener(container& c, const std::string& u) : url_(u), container_(&c) {} -void listener::stop() { if (container_) container_->stop_listening(url_); } +listener::listener(): listener_(0) {} +listener::listener(pn_listener_t* l) : listener_(l) {} +void listener::stop() { if (listener_) pn_listener_close(listener_); } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/messaging_adapter.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/messaging_adapter.cpp b/proton-c/bindings/cpp/src/messaging_adapter.cpp index a70703e..613808b 100644 --- a/proton-c/bindings/cpp/src/messaging_adapter.cpp +++ b/proton-c/bindings/cpp/src/messaging_adapter.cpp @@ -58,16 +58,6 @@ void credit_topup(pn_link_t *link) { } } -void messaging_adapter::on_reactor_init(proton_event &pe) { - container* c = pe.container_ptr(); - if (c) delegate_.on_container_start(*c); -} - -void messaging_adapter::on_reactor_final(proton_event &pe) { - container* c = pe.container_ptr(); - if (c) delegate_.on_container_stop(*c); -} - void messaging_adapter::on_link_flow(proton_event &pe) { pn_event_t *pne = pe.pn_event(); pn_link_t *lnk = pn_event_link(pne); @@ -281,24 +271,17 @@ void messaging_adapter::on_link_local_open(proton_event &pe) { void messaging_adapter::on_link_remote_open(proton_event &pe) { pn_link_t *lnk = pn_event_link(pe.pn_event()); - container* c = pe.container_ptr(); if (pn_link_is_receiver(lnk)) { receiver r(make_wrapper(lnk)); delegate_.on_receiver_open(r); if (is_local_unititialised(pn_link_state(lnk))) { - if (c) - r.open(c->receiver_options()); - else - r.open(); + r.open(r.connection().receiver_options()); } } else { sender s(make_wrapper(lnk)); delegate_.on_sender_open(s); if (is_local_unititialised(pn_link_state(lnk))) { - if (c) - s.open(c->sender_options()); - else - s.open(); + s.open(s.connection().sender_options()); } } credit_topup(lnk); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/proactor_container_impl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp new file mode 100644 index 0000000..2b6b1de --- /dev/null +++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "proactor_container_impl.hpp" +#include "proactor_event_loop_impl.hpp" + +#include "proton/error_condition.hpp" +#include "proton/function.hpp" +#include "proton/listener.hpp" +#include "proton/listen_handler.hpp" +#include "proton/thread_safe.hpp" +#include "proton/url.hpp" + +#include "proton/connection.h" +#include "proton/listener.h" +#include "proton/proactor.h" +#include "proton/transport.h" + +#include "contexts.hpp" +#include "messaging_adapter.hpp" +#include "proton_bits.hpp" +#include "proton_event.hpp" + +#include + +#include +#include + +namespace proton { + +event_loop::impl::impl(pn_connection_t* c) + : connection_(c), finished_(false) +{} + +void event_loop::impl::finished() { + finished_ = true; +} + +#if PN_CPP_HAS_STD_FUNCTION +bool event_loop::impl::inject(std::function f) { + // Note this is an unbounded work queue. + // A resource-safe implementation should be bounded. + if (finished_) + return false; + jobs_.push_back(f); + pn_connection_wake(connection_); + return true; +} + +bool event_loop::impl::inject(proton::void_function0& f) { + return inject([&f]() { f(); }); +} + +void event_loop::impl::run_all_jobs() { + decltype(jobs_) j; + { + std::swap(j, jobs_); + } + // Run queued work, but ignore any exceptions + for (auto& f : j) try { + f(); + } catch (...) {}; +} +#else +bool event_loop::impl::inject(proton::void_function0& f) { + // Note this is an unbounded work queue. + // A resource-safe implementation should be bounded. + if (finished_) + return false; + jobs_.push_back(&f); + pn_connection_wake(connection_); + return true; +} + +void event_loop::impl::run_all_jobs() { + // Run queued work, but ignore any exceptions + for (event_loop::impl::jobs::iterator f = jobs_.begin(); f != jobs_.end(); ++f) try { + (**f)(); + } catch (...) {}; + jobs_.clear(); + return; +} +#endif +container::impl::impl(container& c, const std::string& id, messaging_handler* mh) + : container_(c), proactor_(pn_proactor()), handler_(mh), id_(id), + auto_stop_(true), stopping_(false) +{} + +container::impl::~impl() { + try { + stop(error_condition("exception", "container shut-down")); + //wait(); + } catch (...) {} + pn_proactor_free(proactor_); +} + +proton::connection container::impl::connect_common( + const std::string& addr, + const proton::connection_options& user_opts) +{ + if (stopping_) + throw proton::error("container is stopping"); + + connection_options opts = client_connection_options_; // Defaults + opts.update(user_opts); + messaging_handler* mh = opts.handler(); + + proton::url url(addr); + pn_connection_t *pnc = pn_connection(); + connection_context& cc(connection_context::get(pnc)); + cc.container = &container_; + cc.handler = mh; + cc.event_loop_ = new event_loop::impl(pnc); + + pn_connection_set_container(pnc, id_.c_str()); + pn_connection_set_hostname(pnc, url.host().c_str()); + if (!url.user().empty()) + pn_connection_set_user(pnc, url.user().c_str()); + if (!url.password().empty()) + pn_connection_set_password(pnc, url.password().c_str()); + + connection conn = make_wrapper(pnc); + conn.open(opts); + // Figure out correct string len then create connection address + int len = pn_proactor_addr(0, 0, url.host().c_str(), url.port().c_str()); + std::vector caddr(len+1); + pn_proactor_addr(&caddr[0], len+1, url.host().c_str(), url.port().c_str()); + pn_proactor_connect(proactor_, pnc, &caddr[0]); + return conn; +} + +proton::returned container::impl::connect( + const std::string& addr, + const proton::connection_options& user_opts) +{ + connection conn = connect_common(addr, user_opts); + return make_thread_safe(conn); +} + +returned container::impl::open_sender(const std::string &url, const proton::sender_options &o1, const connection_options &o2) { + proton::sender_options lopts(sender_options_); + lopts.update(o1); + connection conn = connect_common(url, o2); + + return make_thread_safe(conn.default_session().open_sender(proton::url(url).path(), lopts)); +} + +returned container::impl::open_receiver(const std::string &url, const proton::receiver_options &o1, const connection_options &o2) { + proton::receiver_options lopts(receiver_options_); + lopts.update(o1); + connection conn = connect_common(url, o2); + + return make_thread_safe( + conn.default_session().open_receiver(proton::url(url).path(), lopts)); +} + +pn_listener_t* container::impl::listen_common_lh(const std::string& addr) { + if (stopping_) + throw proton::error("container is stopping"); + + proton::url url(addr); + + // Figure out correct string len then create connection address + int len = pn_proactor_addr(0, 0, url.host().c_str(), url.port().c_str()); + std::vector caddr(len+1); + pn_proactor_addr(&caddr[0], len+1, url.host().c_str(), url.port().c_str()); + + pn_listener_t* listener = pn_listener(); + pn_proactor_listen(proactor_, listener, &caddr[0], 16); + return listener; +} + +proton::listener container::impl::listen(const std::string& addr) { + pn_listener_t* listener = listen_common_lh(addr); + return proton::listener(listener); +} + +proton::listener container::impl::listen(const std::string& addr, const proton::connection_options& opts) { + pn_listener_t* listener = listen_common_lh(addr); + listener_context& lc=listener_context::get(listener); + lc.connection_options_.reset(new connection_options(opts)); + return proton::listener(listener); +} + +proton::listener container::impl::listen(const std::string& addr, proton::listen_handler& lh) { + pn_listener_t* listener = listen_common_lh(addr); + listener_context& lc=listener_context::get(listener); + lc.listen_handler_ = &lh; + return proton::listener(listener); +} + +#if PN_CPP_HAS_STD_FUNCTION +void container::impl::schedule(duration delay, void_function0& f) { + schedule(delay, [&f](){ f(); } ); +} + +void container::impl::schedule(duration delay, std::function f) { + // Set timeout + pn_proactor_set_timeout(proactor_, delay.milliseconds()); + + // Record timeout; Add callback to timeout sorted list + deferred_.emplace_back(scheduled{timestamp::now()+delay, f}); + std::push_heap(deferred_.begin(), deferred_.end()); +} +#else +void container::impl::scheduled::task() {(*task_)();} + +void container::impl::schedule(duration delay, void_function0& f) { + // Set timeout + pn_proactor_set_timeout(proactor_, delay.milliseconds()); + + // Record timeout; Add callback to timeout sorted list + scheduled s={timestamp::now()+delay, &f}; + deferred_.push_back(s); + std::push_heap(deferred_.begin(), deferred_.end()); +} +#endif + +void container::impl::client_connection_options(const connection_options &opts) { + client_connection_options_ = opts; +} + +void container::impl::server_connection_options(const connection_options &opts) { + server_connection_options_ = opts; +} + +void container::impl::sender_options(const proton::sender_options &opts) { + sender_options_ = opts; +} + +void container::impl::receiver_options(const proton::receiver_options &opts) { + receiver_options_ = opts; +} + +void container::impl::run_timer_jobs() { + // Check head of timer queue + timestamp now = timestamp::now(); + scheduled* next = &deferred_.front(); + + // So every scheduled element that has past run and remove head + while ( next->time<=now ) { + next->task(); + std::pop_heap(deferred_.begin(), deferred_.end()); + deferred_.pop_back(); + // If there are no more scheduled items finish now + if ( deferred_.size()==0 ) return; + next = &deferred_.front(); + }; + + // To get here we know we must have at least one more thing scheduled + pn_proactor_set_timeout(proactor_, (next->time-now).milliseconds()); +} + +bool container::impl::handle(pn_event_t* event) { + + // If we have any pending connection work, do it now + pn_connection_t* c = pn_event_connection(event); + if (c) { + event_loop::impl* loop = connection_context::get(c).event_loop_.impl_.get(); + loop->run_all_jobs(); + } + + // Process events that shouldn't be sent to messaging_handler + switch (pn_event_type(event)) { + + case PN_PROACTOR_INACTIVE: /* listener and all connections closed */ + return auto_stop_; + + // We never interrupt the proactor so ignore + case PN_PROACTOR_INTERRUPT: + return false; + + case PN_PROACTOR_TIMEOUT: + // Maybe we got a timeout and have nothing scheduled (not sure if this is possible) + if ( deferred_.size()==0 ) return false; + + run_timer_jobs(); + return false; + + case PN_LISTENER_OPEN: + return false; + + case PN_LISTENER_ACCEPT: { + pn_listener_t* l = pn_event_listener(event); + pn_connection_t* c = pn_connection(); + listener_context &lc(listener_context::get(l)); + pn_connection_set_container(c, id_.c_str()); + connection_options opts = server_connection_options_; + if (lc.listen_handler_) { + listener lstr(l); + opts.update(lc.listen_handler_->on_accept(lstr)); + } + else if (!!lc.connection_options_) opts.update(*lc.connection_options_); + lc.connection_options_.reset(new connection_options(opts)); + // Handler applied separately + connection_context& cc = connection_context::get(c); + cc.container = &container_; + cc.listener_context_ = &lc; + cc.handler = opts.handler(); + cc.event_loop_ = new event_loop::impl(c); + pn_listener_accept(l, c); + return false; + } + case PN_LISTENER_CLOSE: { + pn_listener_t* l = pn_event_listener(event); + listener_context &lc(listener_context::get(l)); + listener lstnr(l); + if (lc.listen_handler_) { + pn_condition_t* c = pn_listener_condition(l); + if (pn_condition_is_set(c)) { + lc.listen_handler_->on_error(lstnr, make_wrapper(c).what()); + } + lc.listen_handler_->on_close(lstnr); + } + return false; + } + // If the event was just connection wake then there isn't anything more to do + case PN_CONNECTION_WAKE: + return false; + + // Connection driver will bind a new transport to the connection at this point + case PN_CONNECTION_INIT: + return false; + + case PN_CONNECTION_BOUND: { + // Need to apply post bind connection options + pn_connection_t* c = pn_event_connection(event); + connection conn = make_wrapper(c); + connection_context& cc = connection_context::get(c); + if (cc.listener_context_) { + cc.listener_context_->connection_options_->apply_bound(conn); + } else { + client_connection_options_.apply_bound(conn); + } + + return false; + } + default: + break; + } + + // Figure out the handler for the primary object for event + messaging_handler* mh = 0; + + // First try for a link (send/receiver) handler + pn_link_t *link = pn_event_link(event); + if (link) mh = get_handler(link); + + // Try for session handler if no link handler + pn_session_t *session = pn_event_session(event); + if (session && !mh) mh = get_handler(session); + + // Try for connection handler if none of the above + pn_connection_t *connection = pn_event_connection(event); + if (connection && !mh) mh = get_handler(connection); + + // Use container handler if nothing more specific (must be a container handler) + if (!mh) mh = handler_; + + // If we still have no handler don't do anything! + // This is pretty unusual, but possible if we use the default constructor for container + if (!mh) return false; + + // TODO: Currently create a throwaway messaging_adapter and proton_event so we can call dispatch, a bit inefficient + messaging_adapter ma(*mh); + proton_event pe(event); + pe.dispatch(ma); + return false; +} + +void container::impl::thread(container::impl& ci) { + bool finished = false; + do { + pn_event_batch_t *events = pn_proactor_wait(ci.proactor_); + pn_event_t *e; + while ((e = pn_event_batch_next(events))) { + finished = ci.handle(e) || finished; + } + pn_proactor_done(ci.proactor_, events); + } while(!finished); +} + +void container::impl::run() { + // Have to "manually" generate container events + if (handler_) handler_->on_container_start(container_); + thread(*this); + if (handler_) handler_->on_container_stop(container_); +} + +void container::impl::auto_stop(bool set) { + auto_stop_ = set; +} + +void container::impl::stop(const proton::error_condition& err) { + auto_stop_ = true; + stopping_ = true; + pn_condition_t* error_condition = pn_condition(); + set_error_condition(err, error_condition); + pn_proactor_disconnect(proactor_, error_condition); + pn_condition_free(error_condition); +} + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/receiver.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/receiver.cpp b/proton-c/bindings/cpp/src/receiver.cpp index 68d55d0..b7239a5 100644 --- a/proton-c/bindings/cpp/src/receiver.cpp +++ b/proton-c/bindings/cpp/src/receiver.cpp @@ -34,7 +34,6 @@ #include #include #include -#include namespace proton { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/receiver_options.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/receiver_options.cpp b/proton-c/bindings/cpp/src/receiver_options.cpp index 4a4d80f..2b134bc 100644 --- a/proton-c/bindings/cpp/src/receiver_options.cpp +++ b/proton-c/bindings/cpp/src/receiver_options.cpp @@ -27,7 +27,7 @@ #include #include "contexts.hpp" -#include "container_impl.hpp" +#include "proactor_container_impl.hpp" #include "messaging_adapter.hpp" #include "proton_bits.hpp" http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/reconnect_timer.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/reconnect_timer.cpp b/proton-c/bindings/cpp/src/reconnect_timer.cpp index c63f8a1..a299b0e 100644 --- a/proton-c/bindings/cpp/src/reconnect_timer.cpp +++ b/proton-c/bindings/cpp/src/reconnect_timer.cpp @@ -23,7 +23,6 @@ #include "proton/error.hpp" #include "msg.hpp" #include -#include namespace proton { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/sender_options.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/sender_options.cpp b/proton-c/bindings/cpp/src/sender_options.cpp index 4f501e6..9305666 100644 --- a/proton-c/bindings/cpp/src/sender_options.cpp +++ b/proton-c/bindings/cpp/src/sender_options.cpp @@ -24,7 +24,7 @@ #include "proton/source_options.hpp" #include "proton/target_options.hpp" -#include "container_impl.hpp" +#include "proactor_container_impl.hpp" #include "contexts.hpp" #include "messaging_adapter.hpp" #include "proton_bits.hpp" http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9fad779c/proton-c/bindings/cpp/src/session_options.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/session_options.cpp b/proton-c/bindings/cpp/src/session_options.cpp index 2147fd4..fc03ebb 100644 --- a/proton-c/bindings/cpp/src/session_options.cpp +++ b/proton-c/bindings/cpp/src/session_options.cpp @@ -27,7 +27,7 @@ #include #include "messaging_adapter.hpp" -#include "container_impl.hpp" +#include "proactor_container_impl.hpp" #include "proton_bits.hpp" namespace proton { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org