Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 550E8200C24 for ; Thu, 23 Feb 2017 23:51:13 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 53A52160B3E; Thu, 23 Feb 2017 22:51:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5B4F0160B7E for ; Thu, 23 Feb 2017 23:51:10 +0100 (CET) Received: (qmail 77267 invoked by uid 500); 23 Feb 2017 22:51:09 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 76349 invoked by uid 99); 23 Feb 2017 22:51:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Feb 2017 22:51:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8E386DFF83; Thu, 23 Feb 2017 22:51:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aconway@apache.org To: commits@qpid.apache.org Date: Thu, 23 Feb 2017 22:51:29 -0000 Message-Id: <1c6014a013e74d85b654f728caa7d366@git.apache.org> In-Reply-To: <18e12ab11c6247358369966c9b6620b7@git.apache.org> References: <18e12ab11c6247358369966c9b6620b7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [22/38] qpid-proton git commit: PROTON-1403: C proactor tests, fixes & additions archived-at: Thu, 23 Feb 2017 22:51:13 -0000 PROTON-1403: C proactor tests, fixes & additions proactor API additions: - PN_LISTENER_OPEN: event when listener is listening and connects will succeed. - pn_proactor_grab(): non-blocking version of pn_proactor_wait(), used in tests src/tests/test_tools.h - simple C test framework src/tests/proactor.c - initial tests for basic proactor functionality src/proactor/libuv.c - fixed some assertion bugs and memory leaks - renaming and simplifying the code examples/broker.c: exit with non-0 if broker stops because of an error Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b987a6a7 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b987a6a7 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b987a6a7 Branch: refs/heads/go1 Commit: b987a6a70f30dc593bbea6c98ebae36ec77e4b7d Parents: 9bd99eb Author: Alan Conway Authored: Fri Jan 13 16:41:43 2017 -0500 Committer: Alan Conway Committed: Thu Feb 16 17:58:20 2017 -0500 ---------------------------------------------------------------------- CMakeLists.txt | 2 +- examples/c/proactor/CMakeLists.txt | 6 +- examples/c/proactor/broker.c | 6 +- examples/c/proactor/direct.c | 5 +- examples/c/proactor/send.c | 2 +- proton-c/CMakeLists.txt | 6 - .../cpp/include/proton/io/connection_driver.hpp | 1 + proton-c/include/proton/event.h | 17 +- proton-c/include/proton/listener.h | 2 +- proton-c/include/proton/proactor.h | 34 +- proton-c/src/core/connection_driver.c | 4 +- proton-c/src/core/event.c | 2 + proton-c/src/proactor/libuv.c | 518 ++++++++++--------- proton-c/src/tests/CMakeLists.txt | 9 +- proton-c/src/tests/proactor.c | 217 ++++++++ proton-c/src/tests/test_tools.h | 140 +++++ 16 files changed, 698 insertions(+), 273 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index b538ffd..294fd03 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -71,7 +71,7 @@ if (CMAKE_BUILD_TYPE MATCHES "Coverage") make_directory(coverage_results) add_custom_target(coverage WORKING_DIRECTORY ./coverage_results - COMMAND ${CMAKE_SOURCE_DIR}/bin/record-coverage.sh ${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR}) + CgOMMAND ${CMAKE_SOURCE_DIR}/bin/record-coverage.sh ${CMAKE_SOURCE_DIR} ${CMAKE_BINARY_DIR}) endif() if (${CMAKE_SYSTEM_NAME} MATCHES "Darwin") http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/examples/c/proactor/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/c/proactor/CMakeLists.txt b/examples/c/proactor/CMakeLists.txt index 4189cf5..153f35f 100644 --- a/examples/c/proactor/CMakeLists.txt +++ b/examples/c/proactor/CMakeLists.txt @@ -29,7 +29,7 @@ set(CMAKE_REQUIRED_LIBRARIES ${CMAKE_REQUIRED_LIBRARIES} ${Proton_LIBRARIES}) check_function_exists(pn_proactor HAS_PROACTOR) cmake_pop_check_state() -if (HAS_PROACTOR) +if(HAS_PROACTOR) add_definitions(${COMPILE_WARNING_FLAGS} ${WERROR} ${COMPILE_PLATFORM_FLAGS} ${LINK_TIME_OPTIMIZATION}) @@ -48,6 +48,6 @@ foreach(name broker send receive direct) endforeach() set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV}) -add_test(c-proactor ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/test.py -v) +add_test(c-example-proactor ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/test.py -v) -endif() +endif(HAS_PROACTOR) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/examples/c/proactor/broker.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c index ebf4068..2a338e1 100644 --- a/examples/c/proactor/broker.c +++ b/examples/c/proactor/broker.c @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -288,8 +289,11 @@ static void session_unsub(broker_t *b, pn_session_t *ssn) { } } +static int exit_code = 0; + static void check_condition(pn_event_t *e, pn_condition_t *cond) { if (pn_condition_is_set(cond)) { + exit_code = 1; const char *ename = e ? pn_event_type_name(pn_event_type(e)) : "UNKNOWN"; fprintf(stderr, "%s: %s: %s\n", ename, pn_condition_get_name(cond), pn_condition_get_description(cond)); @@ -483,5 +487,5 @@ int main(int argc, char **argv) { } pn_proactor_free(b.proactor); free(threads); - return 0; + return exit_code; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/examples/c/proactor/direct.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/direct.c b/examples/c/proactor/direct.c index 26f1b33..3d0a7d1 100644 --- a/examples/c/proactor/direct.c +++ b/examples/c/proactor/direct.c @@ -22,9 +22,10 @@ #include #include #include -#include #include +#include #include +#include #include #include #include @@ -59,7 +60,7 @@ typedef struct app_data_t { static const int BATCH = 1000; /* Batch size for unlimited receive */ -int exit_code = 0; +static int exit_code = 0; static void check_condition(pn_event_t *e, pn_condition_t *cond) { if (pn_condition_is_set(cond)) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/examples/c/proactor/send.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c index 48fcecd..bba5d3e 100644 --- a/examples/c/proactor/send.c +++ b/examples/c/proactor/send.c @@ -50,7 +50,7 @@ typedef struct app_data_t { bool finished; } app_data_t; -int exit_code = 0; +static int exit_code = 0; static void check_condition(pn_event_t *e, pn_condition_t *cond) { if (pn_condition_is_set(cond)) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt index 30a77e2..0731b67 100644 --- a/proton-c/CMakeLists.txt +++ b/proton-c/CMakeLists.txt @@ -278,12 +278,6 @@ if (CMAKE_C_COMPILER_ID MATCHES "Clang") if (ENABLE_WARNING_ERROR) set (COMPILE_WARNING_FLAGS "-Werror ${COMPILE_WARNING_FLAGS}") endif (ENABLE_WARNING_ERROR) -endif() - -if (CMAKE_CXX_COMPILER_ID MATCHES "Clang") - if (ENABLE_WARNING_ERROR) - set (WERROR "-Werror") - endif (ENABLE_WARNING_ERROR) # TODO aconway 2016-01-06: we should be able to clean up the code and turn on # some of these warnings. set (CXX_WARNING_FLAGS "${COMPILE_WARNING_FLAGS} -Wno-c++98-compat -Wno-c++98-compat-pedantic -Wno-float-equal -Wno-padded -Wno-sign-conversion -Wno-switch-enum -Wno-weak-vtables -Wno-exit-time-destructors -Wno-global-constructors -Wno-shorten-64-to-32 -Wno-documentation -Wno-documentation-unknown-command -Wno-old-style-cast -Wno-missing-noreturn") http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp index 4a0efe9..759b1fc 100644 --- a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp +++ b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp @@ -113,6 +113,7 @@ PN_CPP_CLASS_EXTERN connection_driver { /// PN_CPP_EXTERN connection_driver(proton::container&); #if PN_CPP_HAS_RVALUE_REFERENCES + /// @copydoc connection_driver() PN_CPP_EXTERN connection_driver(proton::container&, event_loop&& loop); #endif http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/include/proton/event.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h index 4a88368..3cfcc82 100644 --- a/proton-c/include/proton/event.h +++ b/proton-c/include/proton/event.h @@ -321,7 +321,8 @@ typedef enum { PN_CONNECTION_WAKE, /** - * Indicates the listener is ready to call pn_listener_accept() + * Indicates the listener has an incoming connection, call pn_listener_accept() + * to accept it. * Events of this type point to the @ref pn_listener_t. */ PN_LISTENER_ACCEPT, @@ -350,7 +351,13 @@ typedef enum { * * Events of this type point to the @ref pn_proactor_t. */ - PN_PROACTOR_INACTIVE + PN_PROACTOR_INACTIVE, + + /** + * Indicates the listener is listeneing. + * Events of this type point to the @ref pn_listener_t. + */ + PN_LISTENER_OPEN } pn_event_type_t; @@ -537,9 +544,9 @@ PN_EXTERN pn_transport_t *pn_event_transport(pn_event_t *event); PN_EXTERN pn_record_t *pn_event_attachments(pn_event_t *event); /** - * **Experimental** - A batch of events to handle. Call - * pn_event_batch_next() in a loop until it returns NULL to handle - * them. + * **Experimental** - A batch of events that must be handled in sequence. + * Call pn_event_batch_next() in a loop until it returns NULL to extract + * the events. */ typedef struct pn_event_batch_t pn_event_batch_t; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/include/proton/listener.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/listener.h b/proton-c/include/proton/listener.h index 18feca7..2038c06 100644 --- a/proton-c/include/proton/listener.h +++ b/proton-c/include/proton/listener.h @@ -20,7 +20,7 @@ * under the License. */ -#include +#include #include #ifdef __cplusplus http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/include/proton/proactor.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h index 71a7dda..af3acbc 100644 --- a/proton-c/include/proton/proactor.h +++ b/proton-c/include/proton/proactor.h @@ -20,10 +20,9 @@ * under the License. */ -#include -#include -#include #include +#include +#include #ifdef __cplusplus extern "C" { @@ -95,25 +94,34 @@ PNP_EXTERN int pn_proactor_listen(pn_proactor_t *proactor, pn_listener_t *listen const char *host, const char *port, int backlog); /** - * Wait for events to handle. + * Wait until there is at least one event to handle. + * Always returns a non-empty batch of events. * - * Handle events in the returned batch by calling - * pn_event_batch_next() until it returns NULL. You must call - * pn_proactor_done() when you are finished with the batch. + * You must call pn_proactor_done() when you are finished with the batch, you + * must not use the batch pointer after calling pn_proactor_done(). * - * If you call pn_proactor_done() before finishing the batch, the - * remaining events will be returned again by another call - * pn_proactor_wait(). This is less efficient, but allows you to - * handle part of a batch and then hand off the rest to another - * thread. + * Normally it is most efficient to handle the entire batch in one thread, but + * you can call pn_proactor_done() on an unfinished the batch. The remaining + * events will be returned by another call to pn_proactor_done(), possibly in a + * different thread. + * + * @note You can generate events to force threads to wake up from + * pn_proactor_wait() using pn_proactor_interrupt(), pn_proactor_set_timeout() + * and pn_connection_wake() * * @note Thread-safe: can be called concurrently. Events in a single * batch must be handled in sequence, but batches returned by separate - * calls to pn_proactor_wait() can be handled concurrently. + * calls can be handled concurrently. */ PNP_EXTERN pn_event_batch_t *pn_proactor_wait(pn_proactor_t *proactor); /** + * Return a batch of events if one is available immediately, otherwise return NULL. If it + * does return an event batch, the rules are the same as for pn_proactor_wait() + */ +PNP_EXTERN pn_event_batch_t *pn_proactor_grab(pn_proactor_t *proactor); + +/** * Call when done handling a batch of events. * * Must be called exactly once to match each call to http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/core/connection_driver.c ---------------------------------------------------------------------- diff --git a/proton-c/src/core/connection_driver.c b/proton-c/src/core/connection_driver.c index 3393e64..0d1db21 100644 --- a/proton-c/src/core/connection_driver.c +++ b/proton-c/src/core/connection_driver.c @@ -127,7 +127,9 @@ pn_event_t* pn_connection_driver_next_event(pn_connection_driver_t *d) { } bool pn_connection_driver_has_event(pn_connection_driver_t *d) { - return pn_collector_peek(pn_connection_collector(d->connection)); + /* FIXME aconway 2017-02-15: this is ugly */ + pn_collector_t *c = pn_connection_collector(d->connection); + return pn_collector_more(c) || (pn_collector_peek(c) && pn_collector_peek(c) != pn_collector_prev(c)); } bool pn_connection_driver_finished(pn_connection_driver_t *d) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/core/event.c ---------------------------------------------------------------------- diff --git a/proton-c/src/core/event.c b/proton-c/src/core/event.c index 41ff6d1..e213c8f 100644 --- a/proton-c/src/core/event.c +++ b/proton-c/src/core/event.c @@ -395,6 +395,8 @@ const char *pn_event_type_name(pn_event_type_t type) return "PN_PROACTOR_TIMEOUT"; case PN_PROACTOR_INACTIVE: return "PN_PROACTOR_INACTIVE"; + case PN_LISTENER_OPEN: + return "PN_LISTENER_OPEN"; default: return "PN_UNKNOWN"; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/proactor/libuv.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c index d17136a..0ccfcda 100644 --- a/proton-c/src/proactor/libuv.c +++ b/proton-c/src/proactor/libuv.c @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -40,37 +41,34 @@ #include /* - libuv loop functions are thread unsafe. The only exception is uv_async_send() - which is a thread safe "wakeup" that can wake the uv_loop from another thread. + libuv functions are thread unsafe. The exception is uv_async_send(), a thread safe + "wakeup" that can wake the uv_loop from another thread. - To provide concurrency the proactor uses a "leader-worker-follower" model, - threads take turns at the roles: + To provide concurrency proactor uses a "leader-worker-follower" model, threads take + turns at the roles: - - a single "leader" calls libuv functions and runs the uv_loop in short bursts - to generate work. When there is work available it gives up leadership and - becomes a "worker" + - a single "leader" thread uses libuv, it runs the uv_loop the in short bursts to + generate work. Once there is work it becomes becomes a "worker" thread, another thread + takes over as leader. - - "workers" handle events concurrently for distinct connections/listeners - They do as much work as they can get, when none is left they become "followers" + - "workers" handle events for separate connections or listeners concurrently. They do as + much work as they can, when none is left they become "followers" - - "followers" wait for the leader to generate work and become workers. - When the leader itself becomes a worker, one of the followers takes over. + - "followers" wait for the leader to generate work. One follower becomes the new leader, + the others become workers or continue to follow till they can get work. - This model is symmetric: any thread can take on any role based on run-time - requirements. It also allows the IO and non-IO work associated with an IO - wake-up to be processed in a single thread with no context switches. + Any thread in a pool can take on any role necessary at run-time. All the work generated + by an IO wake-up for a single connection can be processed in a single single worker + thread to minimize context switching. Function naming: - - on_* - called in leader thread by uv_run(). + - on_* - called in leader thread via uv_run(). - leader_* - called in leader thread (either leader_q processing or from an on_ function) - - worker_* - called in worker thread - *_lh - called with the relevant lock held LIFECYCLE: pconnection_t and pn_listener_t objects must not be deleted until all their - UV handles have received an on_close(). Freeing resources is always initiated by - uv_close() of the uv_tcp_t handle, and completed in on_close() handler functions when it - is safe. The only exception is when an error occurs that prevents a pn_connection_t or - pn_listener_t from being associated with a uv handle at all. + UV handles have received a close callback. Freeing resources is initiated by uv_close() + of the uv_tcp_t handle, and executed in an on_close() handler when it is safe. */ const char *COND_NAME = "proactor"; @@ -82,12 +80,12 @@ const char *AMQPS_PORT_NAME = "amqps"; PN_HANDLE(PN_PROACTOR) /* pn_proactor_t and pn_listener_t are plain C structs with normal memory management. - Class definitions are for identification as pn_event_t context only. + CLASSDEF is for identification when used as a pn_event_t context. */ PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor) PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener) -/* A psocket (connection or listener) has the following *mutually exclusive* states. */ +/* A psocket (connection or listener) has the following mutually exclusive states. */ typedef enum { ON_WORKER, /* On worker_q or in use by user code in worker thread */ ON_LEADER, /* On leader_q or in use the leader loop */ @@ -105,12 +103,11 @@ typedef struct psocket_t { void (*action)(struct psocket_t*); /* deferred action for leader */ void (*wakeup)(struct psocket_t*); /* wakeup action for leader */ - /* Only used by leader when it owns the psocket */ + /* Only used by leader thread when it owns the psocket */ uv_tcp_t tcp; char host[NI_MAXHOST]; char port[NI_MAXSERV]; bool is_conn; - } psocket_t; /* Special value for psocket.next pointer when socket is not on any any list. */ @@ -138,6 +135,7 @@ static inline const char* fixstr(const char* str) { return str[0] == '\001' ? NULL : str; } +/* Holds a psocket and a pn_connection_driver */ typedef struct pconnection_t { psocket_t psocket; @@ -150,10 +148,11 @@ typedef struct pconnection_t { uv_write_t write; uv_shutdown_t shutdown; size_t writing; /* size of pending write request, 0 if none pending */ - bool reading; /* true if a read request is pending */ - bool server; /* accept, not connect */ + bool server; /* accepting not connecting */ } pconnection_t; + +/* pn_listener_t with a psocket_t */ struct pn_listener_t { psocket_t psocket; @@ -169,6 +168,8 @@ struct pn_listener_t { /* Only used in leader thread */ size_t connections; /* number of connections waiting to be accepted */ + int err; /* uv error code, 0 = OK, UV_EOF = closed */ + const char *what; /* static description string */ }; typedef struct queue { psocket_t *front, *back; } queue; @@ -198,9 +199,9 @@ struct pn_proactor_t { bool batch_working; /* batch is being processed in a worker thread */ }; -static bool push_lh(queue *q, psocket_t *ps) { - if (ps->next != &UNLISTED) /* Don't move if already listed. */ - return false; +/* Push ps to back of q. Must not be on a different queue */ +static void push_lh(queue *q, psocket_t *ps) { + assert(ps->next == &UNLISTED); ps->next = NULL; if (!q->front) { q->front = q->back = ps; @@ -208,9 +209,9 @@ static bool push_lh(queue *q, psocket_t *ps) { q->back->next = ps; q->back = ps; } - return true; } +/* Pop returns front of q or NULL if empty */ static psocket_t* pop_lh(queue *q) { psocket_t *ps = q->front; if (ps) { @@ -220,29 +221,49 @@ static psocket_t* pop_lh(queue *q) { return ps; } -/* Set state and action and push to relevant queue */ -static inline void set_state_lh(psocket_t *ps, psocket_state_t state, void (*action)(psocket_t*)) { - /* Illegal if ps is already listed under a different state */ - assert(ps->next == &UNLISTED || ps->state == state); - ps->state = state; - if (action && !ps->action) { - ps->action = action; +/* Queue an action for the leader thread */ +static void to_leader(psocket_t *ps, void (*action)(psocket_t*)) { + uv_mutex_lock(&ps->proactor->lock); + ps->action = action; + if (ps->next == &UNLISTED) { + ps->state = ON_LEADER; + push_lh(&ps->proactor->leader_q, ps); + } + uv_mutex_unlock(&ps->proactor->lock); + uv_async_send(&ps->proactor->async); /* Wake leader */ +} + +/* Push to the worker thread */ +static void to_worker(psocket_t *ps) { + uv_mutex_lock(&ps->proactor->lock); + /* If already ON_WORKER do nothing */ + if (ps->next == &UNLISTED && ps->state != ON_WORKER) { + ps->state = ON_WORKER; + push_lh(&ps->proactor->worker_q, ps); } - switch(state) { - case ON_LEADER: push_lh(&ps->proactor->leader_q, ps); break; - case ON_WORKER: push_lh(&ps->proactor->worker_q, ps); break; - case ON_UV: - assert(ps->next == &UNLISTED); - break; /* No queue for UV loop */ + uv_mutex_unlock(&ps->proactor->lock); +} + +/* Set state to ON_UV */ +static void to_uv(psocket_t *ps) { + uv_mutex_lock(&ps->proactor->lock); + if (ps->next == &UNLISTED) { + ps->state = ON_UV; } + uv_mutex_unlock(&ps->proactor->lock); } -/* Set state and action, push to queue and notify leader. Thread safe. */ -static void set_state(psocket_t *ps, psocket_state_t state, void (*action)(psocket_t*)) { +/* Called in any thread to set a wakeup action */ +static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) { uv_mutex_lock(&ps->proactor->lock); - set_state_lh(ps, state, action); - uv_async_send(&ps->proactor->async); + ps->wakeup = action; + /* If ON_WORKER we'll do the wakeup in pn_proactor_done() */ + if (ps->next == &UNLISTED && ps->state != ON_WORKER) { + push_lh(&ps->proactor->leader_q, ps); + ps->state = ON_LEADER; /* Otherwise notify the leader */ + } uv_mutex_unlock(&ps->proactor->lock); + uv_async_send(&ps->proactor->async); /* Wake leader */ } static inline pconnection_t *as_pconnection(psocket_t* ps) { @@ -308,7 +329,6 @@ static void on_close_pconnection_final(uv_handle_t *h) { /* Close event for uv_tcp_t of a psocket_t */ static void on_close_psocket(uv_handle_t *h) { - /* No assert(ps->state == ON_UV); may be called in other states during shutdown. */ psocket_t *ps = (psocket_t*)h->data; if (ps->is_conn) { leader_count(ps->proactor, -1); @@ -329,29 +349,43 @@ static pconnection_t *get_pconnection(pn_connection_t* c) { return (pconnection_t*) pn_record_get(r, PN_PROACTOR); } -static void leader_unwatch(psocket_t *ps); +static void pconnection_to_worker(pconnection_t *pc); +static void listener_to_worker(pn_listener_t *l); -static void leader_error(psocket_t *ps, int err, const char* what) { - assert(ps->state != ON_WORKER); - if (ps->is_conn) { - pn_connection_driver_t *driver = &as_pconnection(ps)->driver; +int pconnection_error(pconnection_t *pc, int err, const char* what) { + if (err) { + pn_connection_driver_t *driver = &pc->driver; pn_connection_driver_bind(driver); /* Bind so errors will be reported */ pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s", - what, fixstr(ps->host), fixstr(ps->port), + what, fixstr(pc->psocket.host), fixstr(pc->psocket.port), uv_strerror(err)); pn_connection_driver_close(driver); - } else { - pn_listener_t *l = as_listener(ps); - pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s", - what, fixstr(ps->host), fixstr(ps->port), - uv_strerror(err)); - pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE); - l->closing = true; + pconnection_to_worker(pc); + } + return err; +} + +static int listener_error(pn_listener_t *l, int err, const char* what) { + if (err) { + l->err = err; + l->what = what; + listener_to_worker(l); } - leader_unwatch(ps); /* Worker to handle the error */ + return err; } -/* uv-initialization */ +static int psocket_error(psocket_t *ps, int err, const char* what) { + if (err) { + if (ps->is_conn) { + pconnection_error(as_pconnection(ps), err, "initialization"); + } else { + listener_error(as_listener(ps), err, "initialization"); + } + } + return err; +} + +/* psocket uv-initialization */ static int leader_init(psocket_t *ps) { ps->state = ON_LEADER; leader_count(ps->proactor, +1); @@ -365,9 +399,8 @@ static int leader_init(psocket_t *ps) { pc->timer.data = ps; } } - } - if (err) { - leader_error(ps, err, "initialization"); + } else { + psocket_error(ps, err, "initialization"); } return err; } @@ -375,33 +408,27 @@ static int leader_init(psocket_t *ps) { /* Outgoing connection */ static void on_connect(uv_connect_t *connect, int err) { pconnection_t *pc = (pconnection_t*)connect->data; - assert(pc->psocket.state == ON_UV); if (!err) { - leader_unwatch(&pc->psocket); + pconnection_to_worker(pc); } else { - leader_error(&pc->psocket, err, "on connect to"); + pconnection_error(pc, err, "on connect to"); } } /* Incoming connection ready to be accepted */ static void on_connection(uv_stream_t* server, int err) { - /* Unlike most on_* functions, this one can be called by the leader thrad when the + /* Unlike most on_* functions, this one can be called by the leader thread when the * listener is ON_WORKER, because there's no way to stop libuv from calling - * on_connection() in leader_unwatch(). Just increase a counter and deal with it in the - * worker thread. + * on_connection(). Just increase a counter and generate events in to_worker. */ pn_listener_t *l = (pn_listener_t*) server->data; - assert(l->psocket.state == ON_UV); - if (!err) { - ++l->connections; - leader_unwatch(&l->psocket); - } else { - leader_error(&l->psocket, err, "on connection from"); - } + l->err = err; + if (!err) ++l->connections; + listener_to_worker(l); /* If already ON_WORKER it will stay there */ } +/* FIXME aconway 2017-02-16: listener events in unwatch*/ static void leader_accept(pn_listener_t * l) { - assert(l->psocket.state == ON_UV); assert(l->accepting); pconnection_t *pc = l->accepting; l->accepting = NULL; @@ -410,10 +437,10 @@ static void leader_accept(pn_listener_t * l) { err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp); } if (!err) { - leader_unwatch(&pc->psocket); + pconnection_to_worker(pc); } else { - leader_error(&pc->psocket, err, "accepting from"); - leader_error(&l->psocket, err, "accepting from"); + pconnection_error(pc, err, "accepting from"); + listener_error(l, err, "accepting from"); } } @@ -440,7 +467,7 @@ static void leader_connect(psocket_t *ps) { if (!err) { ps->state = ON_UV; } else { - leader_error(ps, err, "connecting to"); + psocket_error(ps, err, "connecting to"); } } @@ -457,32 +484,26 @@ static void leader_listen(psocket_t *ps) { err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_connection); } if (!err) { - set_state(ps, ON_UV, NULL); + pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_OPEN); + listener_to_worker(l); /* Let worker see the OPEN event */ } else { - leader_error(ps, err, "listening on"); + listener_error(l, err, "listening on"); } } /* Generate tick events and return millis till next tick or 0 if no tick is required */ static pn_millis_t leader_tick(pconnection_t *pc) { assert(pc->psocket.state != ON_WORKER); - pn_transport_t *t = pc->driver.transport; - if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) { - uint64_t now = uv_now(pc->timer.loop); - uint64_t next = pn_transport_tick(t, now); - return next ? next - now : 0; - } - return 0; + uint64_t now = uv_now(pc->timer.loop); + uint64_t next = pn_transport_tick(pc->driver.transport, now); + return next ? next - now : 0; } static void on_tick(uv_timer_t *timer) { - if (!timer->data) return; /* timer closed */ pconnection_t *pc = (pconnection_t*)timer->data; - assert(pc->psocket.state == ON_UV); - uv_timer_stop(&pc->timer); - pn_millis_t next = leader_tick(pc); + pn_millis_t next = leader_tick(pc); /* May generate events */ if (pn_connection_driver_has_event(&pc->driver)) { - leader_unwatch(&pc->psocket); + pconnection_to_worker(pc); } else if (next) { uv_timer_start(&pc->timer, on_tick, next, 0); } @@ -490,31 +511,28 @@ static void on_tick(uv_timer_t *timer) { static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { pconnection_t *pc = (pconnection_t*)stream->data; - assert(pc->psocket.state == ON_UV); if (nread >= 0) { pn_connection_driver_read_done(&pc->driver, nread); - leader_unwatch(&pc->psocket); /* Handle events */ + pconnection_to_worker(pc); } else if (nread == UV_EOF) { /* hangup */ pn_connection_driver_read_close(&pc->driver); - leader_unwatch(&pc->psocket); + pconnection_to_worker(pc); } else { - leader_error(&pc->psocket, nread, "on read from"); + pconnection_error(pc, nread, "on read from"); } } static void on_write(uv_write_t* write, int err) { pconnection_t *pc = (pconnection_t*)write->data; - assert(pc->psocket.state == ON_UV); - size_t writing = pc->writing; - pc->writing = 0; /* This write is done regardless of outcome */ if (err == 0) { - pn_connection_driver_write_done(&pc->driver, writing); - leader_unwatch(&pc->psocket); + pn_connection_driver_write_done(&pc->driver, pc->writing); + pconnection_to_worker(pc); } else if (err == UV_ECANCELED) { - leader_unwatch(&pc->psocket); /* cancelled by leader_unwatch, complete the job */ + pconnection_to_worker(pc); } else { - leader_error(&pc->psocket, err, "on write to"); + pconnection_error(pc, err, "on write to"); } + pc->writing = 0; } static void on_timeout(uv_timer_t *timer) { @@ -527,88 +545,111 @@ static void on_timeout(uv_timer_t *timer) { // Read buffer allocation function for uv, just returns the transports read buffer. static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) { pconnection_t *pc = (pconnection_t*)stream->data; - assert(pc->psocket.state == ON_UV); pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver); *buf = uv_buf_init(rbuf.start, rbuf.size); } -/* Monitor a socket in the UV loop */ -static void leader_watch(psocket_t *ps) { - assert(ps->state != ON_WORKER); - int err = 0; - set_state(ps, ON_UV, NULL); /* Assume we are going to UV loop unless sent to worker or leader. */ - - if (ps->is_conn) { - pconnection_t *pc = as_pconnection(ps); - if (pn_connection_driver_finished(&pc->driver)) { - uv_close((uv_handle_t*)&ps->tcp, on_close_psocket); - return; +static void pconnection_to_uv(pconnection_t *pc) { + to_uv(&pc->psocket); /* Assume we're going to UV unless sent elsewhere */ + if (pn_connection_driver_finished(&pc->driver)) { + if (!uv_is_closing((uv_handle_t*)&pc->psocket)) { + uv_close((uv_handle_t*)&pc->psocket.tcp, on_close_psocket); } - pn_millis_t next_tick = leader_tick(pc); - pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver); - pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver); - if (pn_connection_driver_has_event(&pc->driver)) { - /* Ticks and checking buffers have generated events, send back to worker to process */ - set_state(ps, ON_WORKER, NULL); + return; + } + pn_millis_t next_tick = leader_tick(pc); + pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver); + pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver); + if (pn_connection_driver_has_event(&pc->driver)) { + to_worker(&pc->psocket); /* Ticks/buffer checks generated events */ + return; + } + if (next_tick && + pconnection_error(pc, uv_timer_start(&pc->timer, on_tick, next_tick, 0), "timer start")) { + return; + } + if (wbuf.size > 0) { + uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size); + if (pconnection_error( + pc, uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write), "write")) return; + pc->writing = wbuf.size; + } else if (pn_connection_driver_write_closed(&pc->driver)) { + pc->shutdown.data = &pc->psocket; + if (pconnection_error( + pc, uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, NULL), "shutdown write")) + return; + } + if (rbuf.size > 0) { + if (pconnection_error( + pc, uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read), "read")) + return; + } +} + +static void listener_to_uv(pn_listener_t *l) { + to_uv(&l->psocket); /* Assume we're going to UV unless sent elsewhere */ + if (l->err) { + if (!uv_is_closing((uv_handle_t*)&l->psocket.tcp)) { + uv_close((uv_handle_t*)&l->psocket.tcp, on_close_psocket); } - if (next_tick) { - uv_timer_start(&pc->timer, on_tick, next_tick, 0); - } - if (wbuf.size > 0 && !pc->writing) { - pc->writing = wbuf.size; - uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size); - err = uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write); - } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) { - pc->shutdown.data = ps; - err = uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, NULL); + } else { + if (l->accepting) { + leader_accept(l); } - if (rbuf.size > 0 && !pc->reading) { - pc->reading = true; - err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read); + if (l->connections) { + listener_to_worker(l); } + } +} + +/* Monitor a psocket_t in the UV loop */ +static void psocket_to_uv(psocket_t *ps) { + if (ps->is_conn) { + pconnection_to_uv(as_pconnection(ps)); } else { - pn_listener_t *l = as_listener(ps); - if (l->closing && pn_collector_peek(l->collector)) { - uv_close((uv_handle_t*)&ps->tcp, on_close_psocket); - return; - } else { - if (l->accepting) { - leader_accept(l); - } - if (l->connections) { - leader_unwatch(ps); - } - } + listener_to_uv(as_listener(ps)); } - if (err) { - leader_error(ps, err, "re-watching"); +} + +/* Detach a connection from IO and put it on the worker queue */ +static void pconnection_to_worker(pconnection_t *pc) { + /* Can't go to worker if a write is outstanding or the batch is empty */ + if (!pc->writing && pn_connection_driver_has_event(&pc->driver)) { + uv_read_stop((uv_stream_t*)&pc->psocket.tcp); + uv_timer_stop(&pc->timer); } + to_worker(&pc->psocket); } -/* Detach a socket from IO and put it on the worker queue */ -static void leader_unwatch(psocket_t *ps) { - assert(ps->state != ON_WORKER); /* From ON_UV or ON_LEADER */ - if (ps->is_conn) { - pconnection_t *pc = as_pconnection(ps); - if (!pn_connection_driver_has_event(&pc->driver)) { - /* Don't return an empty event batch, re-attach to UV loop */ - leader_watch(ps); - return; - } else { - if (pc->writing) { - uv_cancel((uv_req_t*)&pc->write); - } - if (pc->reading) { - pc->reading = false; - uv_read_stop((uv_stream_t*)&pc->psocket.tcp); - } - if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) { - uv_timer_stop(&pc->timer); - } +/* TODO aconway 2017-02-16: simplify collector API*/ +static bool collector_has_next(pn_collector_t *c) { + return pn_collector_more(c) || + (pn_collector_peek(c) && pn_collector_peek(c) != pn_collector_prev(c)); +} + +/* Can't really detach a listener, as on_connection can always be called. + Generate events here safely. +*/ +static void listener_to_worker(pn_listener_t *l) { + if (collector_has_next(l->collector)) { /* Already have events */ + to_worker(&l->psocket); + } else if (l->err) { + if (l->err != UV_EOF) { + pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s", + l->what, fixstr(l->psocket.host), fixstr(l->psocket.port), + uv_strerror(l->err)); } + l->err = 0; + pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE); + to_worker(&l->psocket); + } else if (l->connections) { /* Generate accept events one at a time */ + --l->connections; + pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT); + to_worker(&l->psocket); + } else { + listener_to_uv(l); } - set_state(ps, ON_WORKER, NULL); } /* Set the event in the proactor's batch */ @@ -618,7 +659,7 @@ static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) return &p->batch; } -/* Return the next event batch or 0 if no events are ready */ +/* Return the next event batch or 0 if no events are available in the worker_q */ static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) { if (!p->batch_working) { /* Can generate proactor events */ if (p->inactive) { @@ -637,33 +678,14 @@ static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) { for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) { assert(ps->state == ON_WORKER); if (ps->is_conn) { - pconnection_t *pc = as_pconnection(ps); - return &pc->driver.batch; + return &as_pconnection(ps)->driver.batch; } else { /* Listener */ - pn_listener_t *l = as_listener(ps); - /* Generate accept events one at a time */ - if (l->connections && !pn_collector_peek(l->collector)) { - --l->connections; - pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT); - } - return &l->batch; + return &as_listener(ps)->batch; } - set_state_lh(ps, ON_LEADER, NULL); /* No event, back to leader */ } return 0; } -/* Called in any thread to set a wakeup action */ -static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) { - uv_mutex_lock(&ps->proactor->lock); - if (action && !ps->wakeup) { - ps->wakeup = action; - } - set_state_lh(ps, ON_LEADER, NULL); - uv_async_send(&ps->proactor->async); /* Wake leader */ - uv_mutex_unlock(&ps->proactor->lock); -} - pn_listener_t *pn_event_listener(pn_event_t *e) { return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL; } @@ -689,26 +711,52 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { assert(pc->psocket.state == ON_WORKER); if (pn_connection_driver_has_event(&pc->driver)) { /* Process all events before going back to leader */ - set_state(&pc->psocket, ON_WORKER, NULL); + pconnection_to_worker(pc); } else { - set_state(&pc->psocket, ON_LEADER, leader_watch); + to_leader(&pc->psocket, psocket_to_uv); } return; } pn_listener_t *l = batch_listener(batch); if (l) { assert(l->psocket.state == ON_WORKER); - set_state(&l->psocket, ON_LEADER, leader_watch); + to_leader(&l->psocket, psocket_to_uv); return; } pn_proactor_t *bp = batch_proactor(batch); if (bp == p) { uv_mutex_lock(&p->lock); p->batch_working = false; - uv_async_send(&p->async); /* Wake leader */ uv_mutex_unlock(&p->lock); return; } + uv_async_send(&p->async); /* Wake leader */ +} + +/* Process the leader_q, in the leader thread */ +static void leader_process_lh(pn_proactor_t *p) { + if (p->timeout_request) { + p->timeout_request = false; + if (p->timeout) { + uv_timer_start(&p->timer, on_timeout, p->timeout, 0); + } else { + uv_timer_stop(&p->timer); + } + } + for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) { + assert(ps->state == ON_LEADER); + if (ps->wakeup) { + uv_mutex_unlock(&p->lock); + ps->wakeup(ps); + ps->wakeup = NULL; + uv_mutex_lock(&p->lock); + } else if (ps->action) { + uv_mutex_unlock(&p->lock); + ps->action(ps); + ps->action = NULL; + uv_mutex_lock(&p->lock); + } + } } /* Run follower/leader loop till we can return an event and be a worker */ @@ -724,28 +772,7 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) { /* Lead till there is work to do. */ p->has_leader = true; while (batch == NULL) { - if (p->timeout_request) { - p->timeout_request = false; - if (p->timeout) { - uv_timer_start(&p->timer, on_timeout, p->timeout, 0); - } else { - uv_timer_stop(&p->timer); - } - } - for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) { - assert(ps->state == ON_LEADER); - if (ps->wakeup) { - uv_mutex_unlock(&p->lock); - ps->wakeup(ps); - ps->wakeup = NULL; - uv_mutex_lock(&p->lock); - } else if (ps->action) { - uv_mutex_unlock(&p->lock); - ps->action(ps); - ps->action = NULL; - uv_mutex_lock(&p->lock); - } - } + leader_process_lh(p); batch = get_batch_lh(p); if (batch == NULL) { uv_mutex_unlock(&p->lock); @@ -753,7 +780,7 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) { uv_mutex_lock(&p->lock); } } - /* Signal the next leader and return to work */ + /* Signal the next leader and go to work */ p->has_leader = false; uv_cond_signal(&p->cond); } @@ -761,19 +788,36 @@ pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) { return batch; } +pn_event_batch_t *pn_proactor_grab(struct pn_proactor_t* p) { + uv_mutex_lock(&p->lock); + pn_event_batch_t *batch = get_batch_lh(p); + if (batch == NULL && !p->has_leader) { + /* If there is no leader, try a non-waiting lead to generate some work */ + p->has_leader = true; + leader_process_lh(p); + uv_mutex_unlock(&p->lock); + uv_run(&p->loop, UV_RUN_NOWAIT); + uv_mutex_lock(&p->lock); + batch = get_batch_lh(p); + p->has_leader = false; + } + uv_mutex_unlock(&p->lock); + return batch; +} + void pn_proactor_interrupt(pn_proactor_t *p) { uv_mutex_lock(&p->lock); ++p->interrupt; - uv_async_send(&p->async); /* Interrupt the UV loop */ uv_mutex_unlock(&p->lock); + uv_async_send(&p->async); /* Interrupt the UV loop */ } void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) { uv_mutex_lock(&p->lock); p->timeout = t; p->timeout_request = true; - uv_async_send(&p->async); /* Interrupt the UV loop */ uv_mutex_unlock(&p->lock); + uv_async_send(&p->async); /* Interrupt the UV loop */ } int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) { @@ -781,7 +825,7 @@ int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, if (!pc) { return PN_OUT_OF_MEMORY; } - set_state(&pc->psocket, ON_LEADER, leader_connect); + to_leader(&pc->psocket, leader_connect); return 0; } @@ -789,7 +833,7 @@ int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, con { psocket_init(&l->psocket, p, false, host, port); l->backlog = backlog; - set_state(&l->psocket, ON_LEADER, leader_listen); + to_leader(&l->psocket, leader_listen); return 0; } @@ -803,7 +847,7 @@ void leader_wake_connection(psocket_t *ps) { pconnection_t *pc = as_pconnection(ps); pn_connection_t *c = pc->driver.connection; pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE); - leader_unwatch(ps); + pconnection_to_worker(pc); } void pn_connection_wake(pn_connection_t* c) { @@ -850,11 +894,17 @@ void pn_proactor_free(pn_proactor_t *p) { static pn_event_t *listener_batch_next(pn_event_batch_t *batch) { pn_listener_t *l = batch_listener(batch); assert(l->psocket.state == ON_WORKER); + pn_event_t *prev = pn_collector_prev(l->collector); + if (prev && pn_event_type(prev) == PN_LISTENER_CLOSE) { + l->err = UV_EOF; + } return pn_collector_next(l->collector); } static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) { - return pn_collector_next(batch_proactor(batch)->collector); + pn_proactor_t *p = batch_proactor(batch); + assert(p->batch_working); + return pn_collector_next(p->collector); } static void pn_listener_free(pn_listener_t *l) { @@ -867,7 +917,7 @@ static void pn_listener_free(pn_listener_t *l) { } } -pn_listener_t *pn_listener() { +pn_listener_t *pn_listener(void) { pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t)); if (l) { l->batch.next_event = listener_batch_next; @@ -885,8 +935,8 @@ pn_listener_t *pn_listener() { void leader_listener_close(psocket_t *ps) { assert(ps->state = ON_LEADER); pn_listener_t *l = (pn_listener_t*)ps; - l->closing = true; - leader_watch(ps); + l->err = UV_EOF; + listener_to_uv(l); } void pn_listener_close(pn_listener_t* l) { @@ -895,12 +945,10 @@ void pn_listener_close(pn_listener_t* l) { } pn_proactor_t *pn_listener_proactor(pn_listener_t* l) { - assert(l->psocket.state == ON_WORKER); return l ? l->psocket.proactor : NULL; } pn_condition_t* pn_listener_condition(pn_listener_t* l) { - assert(l->psocket.state == ON_WORKER); return l->condition; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/CMakeLists.txt b/proton-c/src/tests/CMakeLists.txt index 59c7665..70b30a0 100644 --- a/proton-c/src/tests/CMakeLists.txt +++ b/proton-c/src/tests/CMakeLists.txt @@ -20,15 +20,15 @@ add_definitions(${COMPILE_WARNING_FLAGS} ${COMPILE_PLATFORM_FLAGS}) if (ENABLE_VALGRIND AND VALGRIND_EXE) - set(memcheck-cmd ${VALGRIND_EXE} --error-exitcode=1 --quiet + set(memcheck-cmd ${VALGRIND_EXE} --error-exitcode=42 --quiet --leak-check=full --trace-children=yes) endif () -macro (pn_add_c_test test file) - add_executable (${test} ${file}) +macro (pn_add_c_test test) + add_executable (${test} ${ARGN}) target_link_libraries (${test} qpid-proton) if (BUILD_WITH_CXX) - set_source_files_properties (${file} PROPERTIES LANGUAGE CXX) + set_source_files_properties (${ARGN} PROPERTIES LANGUAGE CXX) endif (BUILD_WITH_CXX) if (CMAKE_SYSTEM_NAME STREQUAL Windows) add_test (NAME ${test} @@ -49,3 +49,4 @@ pn_add_c_test (c-reactor-tests reactor.c) pn_add_c_test (c-event-tests event.c) pn_add_c_test (c-data-tests data.c) pn_add_c_test (c-condition-tests condition.c) +pn_add_c_test (c-proactor-tests proactor.c) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/tests/proactor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c new file mode 100644 index 0000000..ae5b1f6 --- /dev/null +++ b/proton-c/src/tests/proactor.c @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "test_tools.h" +#include +#include +#include +#include +#include +#include +#include +#include + +static pn_millis_t timeout = 5*1000; /* timeout for hanging tests */ + +static const char *localhost = "127.0.0.1"; /* host for connect/listen */ + +/* Wait for the next single event, return its type */ +static pn_event_type_t wait_next(pn_proactor_t *proactor) { + pn_event_batch_t *events = pn_proactor_wait(proactor); + pn_event_type_t etype = pn_event_type(pn_event_batch_next(events)); + pn_proactor_done(proactor, events); + return etype; +} + +/* Get events until an event of `type` or a PN_TRANSPORT_CLOSED/PN_PROACTOR_TIMEOUT */ +static pn_event_type_t wait_for(pn_proactor_t *proactor, pn_event_type_t etype) { + while (true) { + pn_event_type_t t = wait_next(proactor); + if (t == etype || t == PN_PROACTOR_TIMEOUT) { + return t; + } + } +} + +/* Test that interrupt and timeout events cause pn_proactor_wait() to return. */ +static void test_interrupt_timeout(test_t *t) { + pn_proactor_t *p = pn_proactor(); + pn_proactor_interrupt(p); + pn_event_type_t etype = wait_next(p); + TEST_CHECK(t, PN_PROACTOR_INTERRUPT == etype, pn_event_type_name(etype)); + pn_proactor_set_timeout(p, 1); /* very short timeout */ + etype = wait_next(p); + TEST_CHECK(t, PN_PROACTOR_TIMEOUT == etype, pn_event_type_name(etype)); + pn_proactor_free(p); +} + +/* Test handler return value */ +typedef enum { + H_CONTINUE, /**@<< handler wants more events */ + H_FINISHED, /**@<< handler completed without error */ + H_FAILED /**@<< handler hit an error and cannot continue */ +} handler_state_t; + +typedef handler_state_t (*test_handler_fn)(test_t *, pn_event_t*); + +/* Proactor and handler that take part in a test */ +typedef struct proactor_test_t { + test_t *t; + test_handler_fn handler; + pn_proactor_t *proactor; + handler_state_t state; /* Result of last handler call */ +} proactor_test_t; + + +/* Initialize an array of proactor_test_t */ +static void proactor_test_init(proactor_test_t *pts, size_t n) { + for (proactor_test_t *pt = pts; pt < pts + n; ++pt) { + if (!pt->proactor) pt->proactor = pn_proactor(); + pn_proactor_set_timeout(pt->proactor, timeout); + pt->state = H_CONTINUE; + } +} + +/* Iterate over an array of proactors, draining or handling events with the non-blocking + pn_proactor_grab. Continue till all handlers return H_FINISHED (and return 0) or one + returns H_FAILED (and return non-0) +*/ +int proactor_test_run(proactor_test_t *pts, size_t n) { + /* Make sure pts are initialized */ + proactor_test_init(pts, n); + size_t finished = 0; + do { + finished = 0; + for (proactor_test_t *pt = pts; pt < pts + n; ++pt) { + pn_event_batch_t *events = pn_proactor_grab(pt->proactor); + if (events) { + pn_event_t *e; + while ((e = pn_event_batch_next(events))) { + if (pt->state == H_CONTINUE) { + pt->state = pt->handler(pt->t, e); + } + } + pn_proactor_done(pt->proactor, events); + } + switch (pt->state) { + case H_CONTINUE: break; + case H_FINISHED: ++finished; break; + case H_FAILED: return 1; + } + } + } while (finished < n); + return 0; +} + + +/* Simple test of client connect to a listening server */ +handler_state_t listen_connect_server(test_t *t, pn_event_t *e) { + switch (pn_event_type(e)) { + /* Ignore these events */ + case PN_LISTENER_OPEN: + case PN_CONNECTION_LOCAL_OPEN: + case PN_CONNECTION_REMOTE_OPEN: + case PN_CONNECTION_BOUND: + return H_CONTINUE; + + /* Act on these events */ + case PN_LISTENER_ACCEPT: + pn_listener_accept(pn_event_listener(e), pn_connection()); + return H_CONTINUE; + case PN_CONNECTION_INIT: + pn_connection_open(pn_event_connection(e)); + return H_CONTINUE; + case PN_CONNECTION_REMOTE_CLOSE: + return H_FINISHED; + + default: + TEST_CHECK(t, false, "unexpected event %s", pn_event_type_name(pn_event_type(e))); + return H_FAILED; + break; + } +} + +handler_state_t listen_connect_client(test_t *t, pn_event_t *e) { + switch (pn_event_type(e)) { + /* Ignore these events */ + case PN_CONNECTION_LOCAL_OPEN: + case PN_CONNECTION_BOUND: + return H_CONTINUE; + + /* Act on these events */ + case PN_CONNECTION_INIT: + pn_connection_open(pn_event_connection(e)); + return H_CONTINUE; + case PN_CONNECTION_REMOTE_OPEN: + pn_connection_close(pn_event_connection(e)); + return H_FINISHED; + + /* Unexpected events */ + default: + TEST_CHECK(t, false, "unexpected event %s", pn_event_type_name(pn_event_type(e))); + return H_FAILED; + break; + } +} + +/* Simplest client/server interaction */ +static void test_listen_connect(test_t *t) { + proactor_test_t pts[] = { { t, listen_connect_client }, { t, listen_connect_server } }; + proactor_test_t *client = &pts[0], *server = &pts[1]; + proactor_test_init(pts, 2); + + int port = pick_port(); + char port_str[16]; + snprintf(port_str, sizeof(port_str), "%d", port); + pn_proactor_listen(server->proactor, pn_listener(), localhost, port_str, 4); + pn_event_type_t etype = wait_for(server->proactor, PN_LISTENER_OPEN); + if (TEST_CHECK(t, PN_LISTENER_OPEN == etype, pn_event_type_name(etype))) { + pn_proactor_connect(client->proactor, pn_connection(), localhost, port_str); + proactor_test_run(pts, 2); + } + pn_proactor_free(client->proactor); + pn_proactor_free(server->proactor); +} + +/* Test error handling */ +static void test_listen_connect_error(test_t *t) { + pn_proactor_t *p = pn_proactor(); + pn_proactor_set_timeout(p, timeout); /* In case of hang */ + pn_connection_t *c = pn_connection(); + pn_proactor_connect(p, c, "nosuchost", "nosuchport"); + pn_event_type_t etype = wait_for(p, PN_TRANSPORT_CLOSED); + TEST_CHECK(t, PN_TRANSPORT_CLOSED == etype, pn_event_type_name(etype)); + TEST_CHECK(t, pn_condition_is_set(pn_transport_condition(pn_connection_transport(c))), ""); + + pn_listener_t *l = pn_listener(); + pn_proactor_listen(p, l, "nosuchost", "nosuchport", 1); + etype = wait_for(p, PN_LISTENER_CLOSE); + TEST_CHECK(t, PN_LISTENER_CLOSE == etype, pn_event_type_name(etype)); + TEST_CHECK(t, pn_condition_is_set(pn_listener_condition(l)), ""); + + pn_proactor_free(p); +} + +int main(int argv, char** argc) { + int failed = 0; + RUN_TEST(failed, t, test_interrupt_timeout(&t)); + RUN_TEST(failed, t, test_listen_connect(&t)); + RUN_TEST(failed, t, test_listen_connect_error(&t)); + return failed; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b987a6a7/proton-c/src/tests/test_tools.h ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/test_tools.h b/proton-c/src/tests/test_tools.h new file mode 100644 index 0000000..047ab42 --- /dev/null +++ b/proton-c/src/tests/test_tools.h @@ -0,0 +1,140 @@ +#ifndef TESTS_TEST_TOOLS_H +#define TESTS_TEST_TOOLS_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include +#include +#include + +/* Call via ASSERT macro. */ +static void assert_fail_(const char* cond, const char* file, int line) { + printf("%s:%d: Assertion failed: %s\n", file, line, cond); + abort(); +} + +/* Unconditional assert (does not depend on NDEBUG) for tests. */ +#define ASSERT(expr) \ + ((expr) ? (void)0 : assert_fail_(#expr, __FILE__, __LINE__)) + +/* Call via macro ASSERT_PERROR */ +static void assert_perror_fail_(const char* cond, const char* file, int line) { + perror(cond); + printf("%s:%d: Assertion failed (error above): %s\n", file, line, cond); + abort(); +} + +/* Like ASSERT but also calls perror() to print the current errno error. */ +#define ASSERT_PERROR(expr) \ + ((expr) ? (void)0 : assert_perror_fail_(#expr, __FILE__, __LINE__)) + + +/* A struct to collect the results of a test. + * Declare and initialize with TEST_START(t) where t will be declared as a test_t + */ +typedef struct test_t { + const char* name; + int errors; +} test_t; + +/* if !expr print the printf-style error and increment t->errors. Use via macros. Returns expr. */ +static inline bool test_check_(test_t *t, bool expr, const char *sexpr, const char *file, int line, const char* fmt, ...) { + if (!expr) { + va_list ap; + va_start(ap, fmt); + fprintf(stderr, "%s:%d:[%s] check failed: (%s)", file, line, t->name, sexpr); + if (fmt && *fmt) { + fprintf(stderr, " - "); + vfprintf(stderr, fmt, ap); + } + fprintf(stderr, "\n"); + fflush(stderr); + ++t->errors; + } + return expr; +} + +#define TEST_CHECK(TEST, EXPR, ...) test_check_((TEST), (EXPR), #EXPR, __FILE__, __LINE__, __VA_ARGS__) + +/* T is name of a test_t variable, EXPR is the test expression (which should update T) + FAILED is incremented if the test has errors +*/ +#define RUN_TEST(FAILED, T, EXPR) do { \ + printf("TEST: %s\n", #EXPR); \ + fflush(stdout); \ + test_t T = { #EXPR, 0 }; \ + (EXPR); \ + if (T.errors) { \ + printf("FAIL: %s (%d errors)\n", #EXPR, T.errors); \ + ++(FAILED); \ + } \ + } while(0) + +#if defined(WIN32) + +#include +#include +typedef SOCKET sock_t; +static inline void sock_close(sock_t sock) { closesocket(sock); } + +#else + +#include +#include +#include +#include + +static int port_in_use(int port) { + /* Attempt to bind a dummy socket to test if the port is in use. */ + int dummy_socket = socket(AF_INET, SOCK_STREAM, 0); + ASSERT_PERROR(dummy_socket >= 0); + struct sockaddr_in addr = {0}; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = INADDR_ANY; + addr.sin_port = htons(port); + int ret = bind(dummy_socket, (struct sockaddr *) &addr, sizeof(addr)); + close(dummy_socket); + return ret < 0; +} + +/* Try to pick an unused port by picking random ports till we find one + that is not in use. This is not foolproof as some other process may + grab it before the caller binds or connects. +*/ +static int pick_port(void) { + srand(time(NULL)); + static int MAX_TRIES = 10; + int port = -1; + int i = 0; + do { + /* Pick a random port. Avoid the standard OS ephemeral port range used by + bind(0) - ports can be allocated and re-allocated very rapidly there. + */ + port = (rand()%10000) + 10000; + } while (i++ < MAX_TRIES && port_in_use(port)); + ASSERT(i < MAX_TRIES && "cannot pick a port"); + return port; +} + +#endif + +#endif // TESTS_TEST_TOOLS_H --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org