Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6B29017655 for ; Fri, 13 Feb 2015 14:33:59 +0000 (UTC) Received: (qmail 77970 invoked by uid 500); 13 Feb 2015 14:33:50 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 77936 invoked by uid 500); 13 Feb 2015 14:33:50 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 77838 invoked by uid 99); 13 Feb 2015 14:33:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Feb 2015 14:33:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 89FFAE028F; Fri, 13 Feb 2015 14:33:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rhs@apache.org To: commits@qpid.apache.org Date: Fri, 13 Feb 2015 14:33:51 -0000 Message-Id: <855a875e3bb84b15b8510ab26def0e43@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/3] qpid-proton git commit: removed driver API and bindings removed driver API and bindings Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/4b53bfca Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/4b53bfca Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/4b53bfca Branch: refs/heads/master Commit: 4b53bfca06432e440c95d60648b5e7be54ae4296 Parents: 5f50cb7 Author: Rafael Schloming Authored: Fri Feb 13 08:27:23 2015 -0500 Committer: Rafael Schloming Committed: Fri Feb 13 09:33:16 2015 -0500 ---------------------------------------------------------------------- examples/messenger/c/send-async.c | 1 - proton-c/CMakeLists.txt | 13 +- proton-c/bindings/perl/perl.i | 2 - proton-c/bindings/php/php.i | 98 +-- proton-c/bindings/python/cproton.i | 2 - proton-c/bindings/python/proton/__init__.py | 194 ------ proton-c/bindings/ruby/ruby.i | 2 - proton-c/include/proton/cproton.i | 196 ------ proton-c/include/proton/driver.h | 378 ----------- proton-c/include/proton/driver_extras.h | 68 -- proton-c/src/posix/driver.c | 795 ---------------------- proton-c/src/windows/driver.c | 819 ----------------------- tests/python/proton_tests/common.py | 191 +----- tests/python/proton_tests/engine.py | 149 ++--- tests/python/proton_tests/messenger.py | 15 +- 15 files changed, 98 insertions(+), 2825 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4b53bfca/examples/messenger/c/send-async.c ---------------------------------------------------------------------- diff --git a/examples/messenger/c/send-async.c b/examples/messenger/c/send-async.c index 58cc9d0..de9b023 100644 --- a/examples/messenger/c/send-async.c +++ b/examples/messenger/c/send-async.c @@ -22,7 +22,6 @@ #include "proton/message.h" #include "proton/messenger.h" -#include "proton/driver.h" #include "pncompat/misc_funcs.inc" #include http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4b53bfca/proton-c/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt index 4e90d4e..181b254 100644 --- a/proton-c/CMakeLists.txt +++ b/proton-c/CMakeLists.txt @@ -85,28 +85,26 @@ add_custom_command ( DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/src/protocol.h.py ) -# Select driver +# Select IO impl if(PN_WINAPI) set (pn_io_impl src/windows/io.c src/windows/iocp.c src/windows/write_pipeline.c) set (pn_selector_impl src/windows/selector.c) - set (pn_driver_impl src/windows/driver.c) else(PN_WINAPI) set (pn_io_impl src/posix/io.c) set (pn_selector_impl src/posix/selector.c) - set (pn_driver_impl src/posix/driver.c) endif(PN_WINAPI) # Link in openssl if present if (SSL_IMPL STREQUAL openssl) - set (pn_driver_ssl_impl src/ssl/openssl.c) + set (pn_ssl_impl src/ssl/openssl.c) include_directories ("${OPENSSL_INCLUDE_DIR}") set (SSL_LIB ${OPENSSL_LIBRARIES}) else (SSL_IMPL STREQUAL openssl) if (SSL_IMPL STREQUAL schannel) - set (pn_driver_ssl_impl src/windows/schannel.c) + set (pn_ssl_impl src/windows/schannel.c) set (SSL_LIB Crypt32.lib Secur32.lib) else (SSL_IMPL STREQUAL schannel) - set (pn_driver_ssl_impl src/ssl/ssl_stub.c) + set (pn_ssl_impl src/ssl/ssl_stub.c) endif (SSL_IMPL STREQUAL schannel) endif (SSL_IMPL STREQUAL openssl) @@ -268,9 +266,8 @@ add_subdirectory(../tests/tools/apps/c ../tests/tools/apps/c) set (qpid-proton-platform ${pn_io_impl} ${pn_selector_impl} - ${pn_driver_impl} src/platform.c - ${pn_driver_ssl_impl} + ${pn_ssl_impl} ) set (qpid-proton-core http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4b53bfca/proton-c/bindings/perl/perl.i ---------------------------------------------------------------------- diff --git a/proton-c/bindings/perl/perl.i b/proton-c/bindings/perl/perl.i index a8e069a..cb44188 100644 --- a/proton-c/bindings/perl/perl.i +++ b/proton-c/bindings/perl/perl.i @@ -4,10 +4,8 @@ #include #include #include -#include #include #include -#include #include #include #include http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4b53bfca/proton-c/bindings/php/php.i ---------------------------------------------------------------------- diff --git a/proton-c/bindings/php/php.i b/proton-c/bindings/php/php.i index d6d3bc6..163f8ff 100644 --- a/proton-c/bindings/php/php.i +++ b/proton-c/bindings/php/php.i @@ -32,8 +32,6 @@ #include #include #include -#include -#include #include #include #include @@ -41,6 +39,7 @@ #include #include #include +#include #define zend_error_noreturn zend_error %} @@ -268,99 +267,4 @@ ssize_t pn_sasl_send(pn_sasl_t *sasl, char *STRING, size_t LENGTH); zval_copy_ctor($result); } - -// increment reference count of PHP_CONTEXT on input: -pn_listener_t *pn_listener(pn_driver_t *driver, const char *host, const char *port, void *PHP_CONTEXT); -%ignore pn_listener; - -// increment reference count of PHP_CONTEXT on input: -pn_listener_t *pn_listener_fd(pn_driver_t *driver, int fd, void *PHP_CONTEXT); -%ignore pn_listener_fd; - - -%rename(pn_listener_context) wrap_pn_listener_context; -%inline { - void *wrap_pn_listener_context(pn_listener_t *l) { - zval *result = pn_listener_context(l); - if (!result) { // convert to PHP NULL - ALLOC_INIT_ZVAL(result); - ZVAL_NULL(result); - } - return result; - } -} -%ignore pn_listener_context; - -%rename(pn_listener_set_context) wrap_pn_listener_set_context; -%inline { - void wrap_pn_listener_set_context(pn_listener_t *l, void *PHP_CONTEXT) { - zval *old = pn_listener_context(l); - if (old) { - zval_ptr_dtor(&old); // drop the reference taken on input - } - pn_listener_set_context(l, PHP_CONTEXT); - } -} -%ignore pn_listener_set_context; - -%rename(pn_listener_free) wrap_pn_listener_free; -%inline %{ - void wrap_pn_listener_free(pn_listener_t *l) { - zval *obj = pn_listener_context(l); - if (obj) { - zval_ptr_dtor(&obj); // drop the reference taken on input - } - pn_listener_free(l); - } -%} -%ignore pn_listener_free; - - -// increment reference count of PHP_CONTEXT on input: -pn_connector_t *pn_connector(pn_driver_t *driver, const char *host, const char *port, void *PHP_CONTEXT); -%ignore pn_connector; - -// increment reference count of PHP_CONTEXT on input: -pn_connector_t *pn_connector_fd(pn_driver_t *driver, int fd, void *PHP_CONTEXT); -%ignore pn_connector_fd; - -%rename(pn_connector_context) wrap_pn_connector_context; -%inline { - void *wrap_pn_connector_context(pn_connector_t *c) - { - zval *result = pn_connector_context(c); - if (!result) { // convert to PHP NULL - ALLOC_INIT_ZVAL(result); - ZVAL_NULL(result); - } - return result; - } -} -%ignore pn_connector_context; - -%rename(pn_connector_set_context) wrap_pn_connector_set_context; -%inline { - void wrap_pn_connector_set_context(pn_connector_t *ctor, void *PHP_CONTEXT) { - zval *old = pn_connector_context(ctor); - if (old) { - zval_ptr_dtor(&old); // drop the reference taken on input - } - pn_connector_set_context(ctor, PHP_CONTEXT); - } -} -%ignore pn_connector_set_context; - -%rename(pn_connector_free) wrap_pn_connector_free; -%inline %{ - void wrap_pn_connector_free(pn_connector_t *c) { - zval *obj = pn_connector_context(c); - if (obj) { - zval_ptr_dtor(&obj); // drop the reference taken on input - } - pn_connector_free(c); - } -%} -%ignore pn_connector_free; - - %include "proton/cproton.i" http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4b53bfca/proton-c/bindings/python/cproton.i ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/cproton.i b/proton-c/bindings/python/cproton.i index 3e22c33..26204c8 100644 --- a/proton-c/bindings/python/cproton.i +++ b/proton-c/bindings/python/cproton.i @@ -26,8 +26,6 @@ #include #include #include -#include -#include #include #include #include http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4b53bfca/proton-c/bindings/python/proton/__init__.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py index 69ab0d6..743eafa 100644 --- a/proton-c/bindings/python/proton/__init__.py +++ b/proton-c/bindings/python/proton/__init__.py @@ -3640,196 +3640,6 @@ def _chandler(obj, on_error=None): else: return pn_pyhandler(_cadapter(obj, on_error)) -### -# Driver -### - -class DriverException(ProtonException): - """ - The DriverException class is the root of the driver exception hierarchy. - """ - pass - -class Connector(object): - - @staticmethod - def _wrap_connector(c_cxtr, py_driver=None): - """Maintain only a single instance of this class for each Connector object that - exists in the C Driver. - """ - if not c_cxtr: return None - py_cxtr = pn_void2py(pn_connector_context(c_cxtr)) - if py_cxtr: return py_cxtr - wrapper = Connector(_cxtr=c_cxtr, _py_driver=py_driver) - return wrapper - - def __init__(self, _cxtr, _py_driver): - self._cxtr = _cxtr - assert(_py_driver) - self._driver = weakref.ref(_py_driver) - pn_connector_set_context(self._cxtr, pn_py2void(self)) - self._connection = None - self._driver()._connectors.add(self) - - def _release(self): - """Release the underlying C Engine resource.""" - if hasattr(self, '_cxtr'): - pn_connector_set_context(self._cxtr, pn_py2void(None)) - pn_connector_free(self._cxtr) - del self._cxtr - - def free(self): - """Release the Connector, freeing its resources. - - Call this when you no longer need the Connector. This will allow the - connector's resources to be reclaimed. Once called, you should no longer - reference this connector. - - """ - self.connection = None - d = self._driver() - if d: d._connectors.remove(self) - self._release() - - def next(self): - return Connector._wrap_connector(pn_connector_next(self._cxtr)) - - def process(self): - pn_connector_process(self._cxtr) - - def listener(self): - return Listener._wrap_listener(pn_connector_listener(self._cxtr)) - - def sasl(self): - ## seems easier just to grab the SASL associated with the transport: - trans = self.transport - if trans: - return SASL(self.transport) - return None - - @property - def transport(self): - trans = pn_connector_transport(self._cxtr) - return Transport.wrap(trans) - - def close(self): - return pn_connector_close(self._cxtr) - - @property - def closed(self): - return pn_connector_closed(self._cxtr) - - def _get_connection(self): - return self._connection - - def _set_connection(self, conn): - if conn: - pn_connector_set_connection(self._cxtr, conn._impl) - else: - pn_connector_set_connection(self._cxtr, None) - self._connection = conn - - - connection = property(_get_connection, _set_connection, - doc=""" -Associate a Connection with this Connector. -""") - -class Listener(object): - - @staticmethod - def _wrap_listener(c_lsnr, py_driver=None): - """Maintain only a single instance of this class for each Listener object that - exists in the C Driver. - """ - if not c_lsnr: return None - py_lsnr = pn_void2py(pn_listener_context(c_lsnr)) - if py_lsnr: return py_lsnr - wrapper = Listener(_lsnr=c_lsnr, _py_driver=py_driver) - return wrapper - - def __init__(self, _lsnr, _py_driver): - self._lsnr = _lsnr - assert(_py_driver) - self._driver = weakref.ref(_py_driver) - pn_listener_set_context(self._lsnr, pn_py2void(self)) - self._driver()._listeners.add(self) - - def _release(self): - """Release the underlying C Engine resource.""" - if hasattr(self, '_lsnr'): - pn_listener_set_context(self._lsnr, pn_py2void(None)); - pn_listener_free(self._lsnr) - del self._lsnr - - def free(self): - """Release the Listener, freeing its resources""" - d = self._driver() - if d: d._listeners.remove(self) - self._release() - - def next(self): - return Listener._wrap_listener(pn_listener_next(self._lsnr)) - - def accept(self): - d = self._driver() - if d: - cxtr = pn_listener_accept(self._lsnr) - c = Connector._wrap_connector(cxtr, d) - return c - return None - - def close(self): - pn_listener_close(self._lsnr) - -class Driver(object): - def __init__(self): - self._driver = pn_driver() - self._listeners = set() - self._connectors = set() - - def __del__(self): - # freeing the driver will release all child objects in the C Engine, so - # clean up their references in the corresponding Python objects - for c in self._connectors: - c._release() - for l in self._listeners: - l._release() - if hasattr(self, "_driver") and self._driver: - pn_driver_free(self._driver) - del self._driver - - def wait(self, timeout_sec): - if timeout_sec is None or timeout_sec < 0.0: - t = -1 - else: - t = secs2millis(timeout_sec) - return pn_driver_wait(self._driver, t) - - def wakeup(self): - return pn_driver_wakeup(self._driver) - - def listener(self, host, port): - """Construct a listener""" - return Listener._wrap_listener(pn_listener(self._driver, unicode2utf8(host), port, None), - self) - - def pending_listener(self): - return Listener._wrap_listener(pn_driver_listener(self._driver)) - - def head_listener(self): - return Listener._wrap_listener(pn_listener_head(self._driver)) - - def connector(self, host, port): - return Connector._wrap_connector(pn_connector(self._driver, unicode2utf8(host), port, None), - self) - - def head_connector(self): - return Connector._wrap_connector(pn_connector_head(self._driver)) - - def pending_connector(self): - return Connector._wrap_connector(pn_driver_connector(self._driver)) - class Url(object): """ Simple URL parser/constructor, handles URLs of the form: @@ -3958,18 +3768,14 @@ __all__ = [ "Collector", "Condition", "Connection", - "Connector", "Data", "Delivery", "Disposition", "Described", - "Driver", - "DriverException", "Endpoint", "Event", "Handler", "Link", - "Listener", "Message", "MessageException", "Messenger", http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4b53bfca/proton-c/bindings/ruby/ruby.i ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/ruby.i b/proton-c/bindings/ruby/ruby.i index 947762c..cd0cef3 100644 --- a/proton-c/bindings/ruby/ruby.i +++ b/proton-c/bindings/ruby/ruby.i @@ -22,10 +22,8 @@ #include #include #include -#include #include #include -#include #include #include #include http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4b53bfca/proton-c/include/proton/cproton.i ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/cproton.i b/proton-c/include/proton/cproton.i index bf800a0..badac05 100644 --- a/proton-c/include/proton/cproton.i +++ b/proton-c/include/proton/cproton.i @@ -1058,202 +1058,6 @@ typedef unsigned long int uintptr_t; %include "proton/sasl.h" -%contract pn_driver(void) -{ - ensure: - pn_driver != NULL; -} - -%contract pn_driver_trace(pn_driver_t *driver, pn_trace_t trace) -{ - require: - driver != NULL; - check_trace(trace); -} - -%contract pn_driver_wakeup(pn_driver_t *driver) -{ - require: - driver != NULL; -} - -%contract pn_driver_wait(pn_driver_t *driver, int timeout) -{ - require: - driver != NULL; - timeout >= -1; -} - -/** Get the next listener with pending data in the driver. - * - * @param[in] driver the driver - * @return NULL if no active listener available - */ -%contract pn_driver_listener(pn_driver_t *driver) -{ - require: - driver != NULL; -} - -%contract pn_driver_connector(pn_driver_t *driver) -{ - require: - driver != NULL; -} - -%contract pn_driver_free(pn_driver_t *driver) -{ - require: - driver != NULL; -} - -%contract pn_listener(pn_driver_t *driver, const char *host, - const char *port, void* context) -{ - require: - driver != NULL; - host != NULL; - port != NULL; -} - -%contract pn_listener_fd(pn_driver_t *driver, int fd, void *context) -{ - require: - driver != NULL; - fd >= 0; -} - -%contract pn_listener_trace(pn_listener_t *listener, pn_trace_t trace) -{ - require: - listener != NULL; - check_trace(trace); -} - -%contract pn_listener_accept(pn_listener_t *listener) -{ - require: - listener != NULL; - ensure: - pn_listener_accept != NULL; -} - -%contract pn_listener_context(pn_listener_t *listener) -{ - require: - listener != NULL; -} - -%contract pn_listener_set_context(pn_listener_t *listener, void *context) -{ - require: - listener != NULL; -} - -%contract pn_listener_close(pn_listener_t *listener) -{ - require: - listener != NULL; -} - -%contract pn_listener_free(pn_listener_t *listener) -{ - require: - listener != NULL; -} - - -%contract pn_connector(pn_driver_t *driver, const char *host, - const char *port, void* context) -{ - require: - driver != NULL; - host != NULL; - port != NULL; - ensure: - pn_connector != NULL; -} - -%contract pn_connector_fd(pn_driver_t *driver, int fd, void *context) -{ - require: - driver != NULL; - fd >= 0; - ensure: - pn_connector_fd != NULL; -} - -%contract pn_connector_trace(pn_connector_t *connector, pn_trace_t trace) -{ - require: - connector != NULL; - check_trace(trace); -} - -%contract pn_connector_process(pn_connector_t *connector) -{ - require: - connector != NULL; -} - -%contract pn_connector_listener(pn_connector_t *connector) -{ - require: - connector != NULL; -} - -%contract pn_connector_sasl(pn_connector_t *connector) -{ - require: - connector != NULL; -} - -%contract pn_connector_connection(pn_connector_t *connector) -{ - require: - connector != NULL; -} - -%contract pn_connector_set_connection(pn_connector_t *ctor, pn_connection_t *connection) -{ - require: - ctor != NULL; -} - -%contract pn_connector_context(pn_connector_t *connector) -{ - require: - connector != NULL; -} - -%contract pn_connector_set_context(pn_connector_t *connector, void *context) -{ - require: - connector != NULL; -} - -%contract pn_connector_close(pn_connector_t *connector) -{ - require: - connector != NULL; -} - -%contract pn_connector_closed(pn_connector_t *connector) -{ - require: - connector != NULL; -} - -%contract pn_connector_free(pn_connector_t *connector) -{ - require: - connector != NULL; -} - - -%include "proton/driver.h" -%include "proton/driver_extras.h" - %contract pn_messenger(const char *name) { ensure: http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4b53bfca/proton-c/include/proton/driver.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/driver.h b/proton-c/include/proton/driver.h deleted file mode 100644 index ccf80a0..0000000 --- a/proton-c/include/proton/driver.h +++ /dev/null @@ -1,378 +0,0 @@ -#ifndef PROTON_DRIVER_H -#define PROTON_DRIVER_H 1 - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include -#include -#include -#include -#include -#include -#include - -#ifdef __cplusplus -extern "C" { -#endif - -/** @file - * API for the Driver Layer. - * - * The driver library provides a simple implementation of a driver for - * the proton engine. A driver is responsible for providing input, - * output, and tick events to the bottom half of the engine API. See - * ::pn_transport_input, ::pn_transport_output, and - * ::pn_transport_tick. The driver also provides an interface for the - * application to access the top half of the API when the state of the - * engine may have changed due to I/O or timing events. Additionally - * the driver incorporates the SASL engine as well in order to provide - * a complete network stack: AMQP over SASL over TCP. - * - */ - -typedef struct pn_driver_t pn_driver_t; -typedef struct pn_listener_t pn_listener_t; -typedef struct pn_connector_t pn_connector_t; - -typedef enum { - PN_CONNECTOR_WRITABLE, - PN_CONNECTOR_READABLE -} pn_activate_criteria_t; - -/** Construct a driver - * - * Call pn_driver_free() to release the driver object. - * @return new driver object, NULL if error - */ -PN_EXTERN pn_driver_t *pn_driver(void); - -/** Return the most recent error code. - * - * @param[in] d the driver - * - * @return the most recent error text for d - */ -PN_EXTERN int pn_driver_errno(pn_driver_t *d); - -/** Get additional error information associated with the driver. - * - * Whenever a driver operation fails, additional error information can - * be obtained using this function. The error object that is returned - * may also be used to clear the error condition. - * - * The pointer returned by this operation is valid until the - * driver object is freed. - * - * @param[in] d the driver - * - * @return the driver's error object - */ -PN_EXTERN pn_error_t *pn_driver_error(pn_driver_t *d); - -/** Set the tracing level for the given driver. - * - * @param[in] driver the driver to trace - * @param[in] trace the trace level to use. - * @todo pn_trace_t needs documentation - */ -PN_EXTERN void pn_driver_trace(pn_driver_t *driver, pn_trace_t trace); - -/** Force pn_driver_wait() to return - * - * @param[in] driver the driver to wake up - * - * @return zero on success, an error code on failure - */ -PN_EXTERN int pn_driver_wakeup(pn_driver_t *driver); - -/** Wait for an active connector or listener - * - * @param[in] driver the driver to wait on - * @param[in] timeout maximum time in milliseconds to wait, -1 means - * infinite wait - * - * @return zero on success, an error code on failure - */ -PN_EXTERN int pn_driver_wait(pn_driver_t *driver, int timeout); - -/** Get the next listener with pending data in the driver. - * - * @param[in] driver the driver - * @return NULL if no active listener available - */ -PN_EXTERN pn_listener_t *pn_driver_listener(pn_driver_t *driver); - -/** Get the next active connector in the driver. - * - * Returns the next connector with pending inbound data, available - * capacity for outbound data, or pending tick. - * - * @param[in] driver the driver - * @return NULL if no active connector available - */ -PN_EXTERN pn_connector_t *pn_driver_connector(pn_driver_t *driver); - -/** Free the driver allocated via pn_driver, and all associated - * listeners and connectors. - * - * @param[in] driver the driver to free, no longer valid on - * return - */ -PN_EXTERN void pn_driver_free(pn_driver_t *driver); - - -/** pn_listener - the server API **/ - -/** Construct a listener for the given address. - * - * @param[in] driver driver that will 'own' this listener - * @param[in] host local host address to listen on - * @param[in] port local port to listen on - * @param[in] context application-supplied, can be accessed via - * pn_listener_context() - * @return a new listener on the given host:port, NULL if error - */ -PN_EXTERN pn_listener_t *pn_listener(pn_driver_t *driver, const char *host, - const char *port, void* context); - -/** Access the head listener for a driver. - * - * @param[in] driver the driver whose head listener will be returned - * - * @return the head listener for driver or NULL if there is none - */ -PN_EXTERN pn_listener_t *pn_listener_head(pn_driver_t *driver); - -/** Access the next listener. - * - * @param[in] listener the listener whose next listener will be - * returned - * - * @return the next listener - */ -PN_EXTERN pn_listener_t *pn_listener_next(pn_listener_t *listener); - -/** - * @todo pn_listener_trace needs documentation - */ -PN_EXTERN void pn_listener_trace(pn_listener_t *listener, pn_trace_t trace); - -/** Accept a connection that is pending on the listener. - * - * @param[in] listener the listener to accept the connection on - * @return a new connector for the remote, or NULL on error - */ -PN_EXTERN pn_connector_t *pn_listener_accept(pn_listener_t *listener); - -/** Access the application context that is associated with the listener. - * - * @param[in] listener the listener whose context is to be returned - * @return the application context that was passed to pn_listener() or - * pn_listener_fd() - */ -PN_EXTERN void *pn_listener_context(pn_listener_t *listener); - -PN_EXTERN void pn_listener_set_context(pn_listener_t *listener, void *context); - -/** Close the socket used by the listener. - * - * @param[in] listener the listener whose socket will be closed. - */ -PN_EXTERN void pn_listener_close(pn_listener_t *listener); - -/** Frees the given listener. - * - * Assumes the listener's socket has been closed prior to call. - * - * @param[in] listener the listener object to free, no longer valid - * on return - */ -PN_EXTERN void pn_listener_free(pn_listener_t *listener); - - - - -/** pn_connector - the client API **/ - -/** Construct a connector to the given remote address. - * - * @param[in] driver owner of this connection. - * @param[in] host remote host to connect to. - * @param[in] port remote port to connect to. - * @param[in] context application supplied, can be accessed via - * pn_connector_context() @return a new connector - * to the given remote, or NULL on error. - */ -PN_EXTERN pn_connector_t *pn_connector(pn_driver_t *driver, const char *host, - const char *port, void* context); - -/** Access the head connector for a driver. - * - * @param[in] driver the driver whose head connector will be returned - * - * @return the head connector for driver or NULL if there is none - */ -PN_EXTERN pn_connector_t *pn_connector_head(pn_driver_t *driver); - -/** Access the next connector. - * - * @param[in] connector the connector whose next connector will be - * returned - * - * @return the next connector - */ -PN_EXTERN pn_connector_t *pn_connector_next(pn_connector_t *connector); - -/** Set the tracing level for the given connector. - * - * @param[in] connector the connector to trace - * @param[in] trace the trace level to use. - */ -PN_EXTERN void pn_connector_trace(pn_connector_t *connector, pn_trace_t trace); - -/** Service the given connector. - * - * Handle any inbound data, outbound data, or timing events pending on - * the connector. - * - * @param[in] connector the connector to process. - */ -PN_EXTERN void pn_connector_process(pn_connector_t *connector); - -/** Access the listener which opened this connector. - * - * @param[in] connector connector whose listener will be returned. - * @return the listener which created this connector, or NULL if the - * connector has no listener (e.g. an outbound client - * connection) - */ -PN_EXTERN pn_listener_t *pn_connector_listener(pn_connector_t *connector); - -/** Access the Authentication and Security context of the connector. - * - * @param[in] connector connector whose security context will be - * returned - * @return the Authentication and Security context for the connector, - * or NULL if none - */ -PN_EXTERN pn_sasl_t *pn_connector_sasl(pn_connector_t *connector); - -/** Access the AMQP Connection associated with the connector. - * - * @param[in] connector the connector whose connection will be - * returned - * @return the connection context for the connector, or NULL if none - */ -PN_EXTERN pn_connection_t *pn_connector_connection(pn_connector_t *connector); - -/** Assign the AMQP Connection associated with the connector. - * - * @param[in] connector the connector whose connection will be set. - * @param[in] connection the connection to associate with the - * connector - */ -PN_EXTERN void pn_connector_set_connection(pn_connector_t *connector, pn_connection_t *connection); - -/** Access the application context that is associated with the - * connector. - * - * @param[in] connector the connector whose context is to be returned. - * @return the application context that was passed to pn_connector() - * or pn_connector_fd() - */ -PN_EXTERN void *pn_connector_context(pn_connector_t *connector); - -/** Assign a new application context to the connector. - * - * @param[in] connector the connector which will hold the context. - * @param[in] context new application context to associate with the - * connector - */ -PN_EXTERN void pn_connector_set_context(pn_connector_t *connector, void *context); - -/** Access the name of the connector - * - * @param[in] connector the connector which will hole the name - * @return the name of the connector in the form of a null-terminated character string. - */ -PN_EXTERN const char *pn_connector_name(const pn_connector_t *connector); - -/** Access the transport used by this connector. - * - * @param[in] connector connector whose transport will be returned - * @return the transport, or NULL if none - */ -PN_EXTERN pn_transport_t *pn_connector_transport(pn_connector_t *connector); - -/** Close the socket used by the connector. - * - * @param[in] connector the connector whose socket will be closed - */ -PN_EXTERN void pn_connector_close(pn_connector_t *connector); - -/** Determine if the connector is closed. - * - * @return True if closed, otherwise false - */ -PN_EXTERN bool pn_connector_closed(pn_connector_t *connector); - -/** Destructor for the given connector. - * - * Assumes the connector's socket has been closed prior to call. - * - * @param[in] connector the connector object to free. No longer - * valid on return - */ -PN_EXTERN void pn_connector_free(pn_connector_t *connector); - -/** Activate a connector when a criteria is met - * - * Set a criteria for a connector (i.e. it's transport is writable) that, once met, - * the connector shall be placed in the driver's work queue. - * - * @param[in] connector The connector object to activate - * @param[in] criteria The criteria that must be met prior to activating the connector - */ -PN_EXTERN void pn_connector_activate(pn_connector_t *connector, pn_activate_criteria_t criteria); - -/** Return the activation status of the connector for a criteria - * - * Return the activation status (i.e. readable, writable) for the connector. This function - * has the side-effect of canceling the activation of the criteria. - * - * Please note that this function must not be used for normal AMQP connectors. It is only - * used for connectors created so the driver can track non-AMQP file descriptors. Such - * connectors are never passed into pn_connector_process. - * - * @param[in] connector The connector object to activate - * @param[in] criteria The criteria to test. "Is this the reason the connector appeared - * in the work list?" - * @return true iff the criteria is activated on the connector. - */ -PN_EXTERN bool pn_connector_activated(pn_connector_t *connector, pn_activate_criteria_t criteria); - - -#ifdef __cplusplus -} -#endif - -#endif /* driver.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4b53bfca/proton-c/include/proton/driver_extras.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/driver_extras.h b/proton-c/include/proton/driver_extras.h deleted file mode 100644 index a2323f1..0000000 --- a/proton-c/include/proton/driver_extras.h +++ /dev/null @@ -1,68 +0,0 @@ -#ifndef PROTON_DRIVER_H_EXTRAS -#define PROTON_DRIVER_H_EXTRAS 1 - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifdef __cplusplus -extern "C" { -#endif - -#include -#include - -/** @file - * Additional API for the Driver Layer. - * - * These additional driver functions allow the application to supply - * a separately created socket to the driver library. - * - */ - -/** Create a listener using the existing file descriptor. - * - * @param[in] driver driver that will 'own' this listener - * @param[in] fd existing socket for listener to listen on - * @param[in] context application-supplied, can be accessed via - * pn_listener_context() - * @return a new listener on the given host:port, NULL if error - */ -PN_EXTERN pn_listener_t *pn_listener_fd(pn_driver_t *driver, pn_socket_t fd, void *context); - -PN_EXTERN pn_socket_t pn_listener_get_fd(pn_listener_t *listener); - -/** Create a connector using the existing file descriptor. - * - * @param[in] driver driver that will 'own' this connector. - * @param[in] fd existing socket to use for this connector. - * @param[in] context application-supplied, can be accessed via - * pn_connector_context() - * @return a new connector to the given host:port, NULL if error. - */ -PN_EXTERN pn_connector_t *pn_connector_fd(pn_driver_t *driver, pn_socket_t fd, void *context); - -PN_EXTERN pn_socket_t pn_connector_get_fd(pn_connector_t *connector); - -#ifdef __cplusplus -} -#endif - -#endif /* driver_extras.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4b53bfca/proton-c/src/posix/driver.c ---------------------------------------------------------------------- diff --git a/proton-c/src/posix/driver.c b/proton-c/src/posix/driver.c deleted file mode 100644 index 5d513a4..0000000 --- a/proton-c/src/posix/driver.c +++ /dev/null @@ -1,795 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include "../log_private.h" -#include "util.h" -#include "platform.h" - -/* Decls */ - -#define PN_SEL_RD (0x0001) -#define PN_SEL_WR (0x0002) - -struct pn_driver_t { - pn_error_t *error; - pn_io_t *io; - pn_listener_t *listener_head; - pn_listener_t *listener_tail; - pn_listener_t *listener_next; - pn_connector_t *connector_head; - pn_connector_t *connector_tail; - pn_connector_t *connector_next; - size_t listener_count; - size_t connector_count; - size_t closed_count; - size_t capacity; - struct pollfd *fds; - size_t nfds; - int ctrl[2]; //pipe for updating selectable status - pn_timestamp_t wakeup; - pn_trace_t trace; -}; - -struct pn_listener_t { - pn_driver_t *driver; - pn_listener_t *listener_next; - pn_listener_t *listener_prev; - void *context; - int idx; - int fd; - bool pending; - bool closed; -}; - -#define PN_NAME_MAX (256) - -struct pn_connector_t { - pn_driver_t *driver; - pn_connector_t *connector_next; - pn_connector_t *connector_prev; - char name[PN_NAME_MAX]; - pn_timestamp_t wakeup; - pn_connection_t *connection; - pn_transport_t *transport; - pn_sasl_t *sasl; - pn_listener_t *listener; - void *context; - int idx; - int fd; - int status; - pn_trace_t trace; - bool pending_tick; - bool pending_read; - bool pending_write; - bool closed; - bool input_done; - bool output_done; -}; - -/* Impls */ - -// listener - -static void pn_driver_add_listener(pn_driver_t *d, pn_listener_t *l) -{ - if (!l->driver) return; - LL_ADD(d, listener, l); - l->driver = d; - d->listener_count++; -} - -static void pn_driver_remove_listener(pn_driver_t *d, pn_listener_t *l) -{ - if (!l->driver) return; - - if (l == d->listener_next) { - d->listener_next = l->listener_next; - } - - LL_REMOVE(d, listener, l); - l->driver = NULL; - d->listener_count--; -} - -pn_listener_t *pn_listener(pn_driver_t *driver, const char *host, - const char *port, void* context) -{ - if (!driver) return NULL; - - pn_socket_t sock = pn_listen(driver->io, host, port); - if (sock == PN_INVALID_SOCKET) { - return NULL; - } else { - pn_listener_t *l = pn_listener_fd(driver, sock, context); - - if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) - pn_logf("Listening on %s:%s", host, port); - - return l; - } -} - -pn_listener_t *pn_listener_fd(pn_driver_t *driver, int fd, void *context) -{ - if (!driver) return NULL; - - pn_listener_t *l = (pn_listener_t *) malloc(sizeof(pn_listener_t)); - if (!l) return NULL; - l->driver = driver; - l->listener_next = NULL; - l->listener_prev = NULL; - l->idx = 0; - l->pending = false; - l->fd = fd; - l->closed = false; - l->context = context; - - pn_driver_add_listener(driver, l); - return l; -} - -pn_socket_t pn_listener_get_fd(pn_listener_t *listener) -{ - assert(listener); - return listener->fd; -} - -pn_listener_t *pn_listener_head(pn_driver_t *driver) -{ - return driver ? driver->listener_head : NULL; -} - -pn_listener_t *pn_listener_next(pn_listener_t *listener) -{ - return listener ? listener->listener_next : NULL; -} - -void pn_listener_trace(pn_listener_t *l, pn_trace_t trace) { - // XXX -} - -void *pn_listener_context(pn_listener_t *l) { - return l ? l->context : NULL; -} - -void pn_listener_set_context(pn_listener_t *listener, void *context) -{ - assert(listener); - listener->context = context; -} - -pn_connector_t *pn_listener_accept(pn_listener_t *l) -{ - if (!l || !l->pending) return NULL; - char name[PN_NAME_MAX]; - - pn_socket_t sock = pn_accept(l->driver->io, l->fd, name, PN_NAME_MAX); - if (sock == PN_INVALID_SOCKET) { - return NULL; - } else { - if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) - pn_logf("Accepted from %s", name); - pn_connector_t *c = pn_connector_fd(l->driver, sock, NULL); - snprintf(c->name, PN_NAME_MAX, "%s", name); - c->listener = l; - c->transport = pn_transport(); - pn_transport_set_server(c->transport); - c->sasl = pn_sasl(c->transport); - return c; - } -} - -void pn_listener_close(pn_listener_t *l) -{ - if (!l) return; - if (l->closed) return; - - if (close(l->fd) == -1) - perror("close"); - l->closed = true; -} - -void pn_listener_free(pn_listener_t *l) -{ - if (!l) return; - - if (l->driver) pn_driver_remove_listener(l->driver, l); - free(l); -} - -// connector - -static void pn_driver_add_connector(pn_driver_t *d, pn_connector_t *c) -{ - if (!c->driver) return; - LL_ADD(d, connector, c); - c->driver = d; - d->connector_count++; -} - -static void pn_driver_remove_connector(pn_driver_t *d, pn_connector_t *c) -{ - if (!c->driver) return; - - if (c == d->connector_next) { - d->connector_next = c->connector_next; - } - - LL_REMOVE(d, connector, c); - c->driver = NULL; - d->connector_count--; - if (c->closed) { - d->closed_count--; - } -} - -pn_connector_t *pn_connector(pn_driver_t *driver, const char *host, - const char *port, void *context) -{ - if (!driver) return NULL; - - pn_socket_t sock = pn_connect(driver->io, host, port); - - pn_connector_t *c = pn_connector_fd(driver, sock, context); - c->transport = pn_transport(); - c->sasl = pn_sasl(c->transport); - snprintf(c->name, PN_NAME_MAX, "%s:%s", host, port); - if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) - pn_logf("Connected to %s", c->name); - return c; -} - -pn_connector_t *pn_connector_fd(pn_driver_t *driver, int fd, void *context) -{ - if (!driver) return NULL; - - pn_connector_t *c = (pn_connector_t *) malloc(sizeof(pn_connector_t)); - if (!c) return NULL; - c->driver = driver; - c->connector_next = NULL; - c->connector_prev = NULL; - c->pending_tick = false; - c->pending_read = false; - c->pending_write = false; - c->name[0] = '\0'; - c->idx = 0; - c->fd = fd; - c->status = PN_SEL_RD | PN_SEL_WR; - c->trace = driver->trace; - c->closed = false; - c->wakeup = 0; - c->connection = NULL; - c->transport = NULL; - c->input_done = false; - c->output_done = false; - c->context = context; - c->listener = NULL; - - pn_connector_trace(c, driver->trace); - - pn_driver_add_connector(driver, c); - return c; -} - -pn_socket_t pn_connector_get_fd(pn_connector_t *connector) -{ - assert(connector); - return connector->fd; -} - -pn_connector_t *pn_connector_head(pn_driver_t *driver) -{ - return driver ? driver->connector_head : NULL; -} - -pn_connector_t *pn_connector_next(pn_connector_t *connector) -{ - return connector ? connector->connector_next : NULL; -} - -void pn_connector_trace(pn_connector_t *ctor, pn_trace_t trace) -{ - if (!ctor) return; - ctor->trace = trace; - if (ctor->transport) pn_transport_trace(ctor->transport, trace); -} - -pn_sasl_t *pn_connector_sasl(pn_connector_t *ctor) -{ - return ctor ? ctor->sasl : NULL; -} - -pn_transport_t *pn_connector_transport(pn_connector_t *ctor) -{ - return ctor ? ctor->transport : NULL; -} - -void pn_connector_set_connection(pn_connector_t *ctor, pn_connection_t *connection) -{ - if (!ctor) return; - if (ctor->connection) { - pn_class_decref(PN_OBJECT, ctor->connection); - pn_transport_unbind(ctor->transport); - } - ctor->connection = connection; - if (ctor->connection) { - pn_class_incref(PN_OBJECT, ctor->connection); - pn_transport_bind(ctor->transport, connection); - } - if (ctor->transport) pn_transport_trace(ctor->transport, ctor->trace); -} - -pn_connection_t *pn_connector_connection(pn_connector_t *ctor) -{ - return ctor ? ctor->connection : NULL; -} - -void *pn_connector_context(pn_connector_t *ctor) -{ - return ctor ? ctor->context : NULL; -} - -void pn_connector_set_context(pn_connector_t *ctor, void *context) -{ - if (!ctor) return; - ctor->context = context; -} - -const char *pn_connector_name(const pn_connector_t *ctor) -{ - if (!ctor) return 0; - return ctor->name; -} - -pn_listener_t *pn_connector_listener(pn_connector_t *ctor) -{ - return ctor ? ctor->listener : NULL; -} - -void pn_connector_close(pn_connector_t *ctor) -{ - // XXX: should probably signal engine and callback here - if (!ctor) return; - - ctor->status = 0; - if (close(ctor->fd) == -1) - perror("close"); - ctor->closed = true; - ctor->driver->closed_count++; -} - -bool pn_connector_closed(pn_connector_t *ctor) -{ - return ctor ? ctor->closed : true; -} - -void pn_connector_free(pn_connector_t *ctor) -{ - if (!ctor) return; - - if (ctor->driver) pn_driver_remove_connector(ctor->driver, ctor); - pn_transport_free(ctor->transport); - ctor->transport = NULL; - if (ctor->connection) pn_class_decref(PN_OBJECT, ctor->connection); - ctor->connection = NULL; - free(ctor); -} - -void pn_connector_activate(pn_connector_t *ctor, pn_activate_criteria_t crit) -{ - switch (crit) { - case PN_CONNECTOR_WRITABLE : - ctor->status |= PN_SEL_WR; - break; - - case PN_CONNECTOR_READABLE : - ctor->status |= PN_SEL_RD; - break; - } -} - - -bool pn_connector_activated(pn_connector_t *ctor, pn_activate_criteria_t crit) -{ - bool result = false; - - switch (crit) { - case PN_CONNECTOR_WRITABLE : - result = ctor->pending_write; - ctor->pending_write = false; - ctor->status &= ~PN_SEL_WR; - break; - - case PN_CONNECTOR_READABLE : - result = ctor->pending_read; - ctor->pending_read = false; - ctor->status &= ~PN_SEL_RD; - break; - } - - return result; -} - -static pn_timestamp_t pn_connector_tick(pn_connector_t *ctor, pn_timestamp_t now) -{ - if (!ctor->transport) return 0; - return pn_transport_tick(ctor->transport, now); -} - -void pn_connector_process(pn_connector_t *c) -{ - if (c) { - if (c->closed) return; - - pn_transport_t *transport = c->transport; - - /// - /// Socket read - /// - if (!c->input_done) { - ssize_t capacity = pn_transport_capacity(transport); - if (capacity > 0) { - c->status |= PN_SEL_RD; - if (c->pending_read) { - c->pending_read = false; - ssize_t n = pn_recv(c->driver->io, c->fd, pn_transport_tail(transport), capacity); - if (n < 0) { - if (errno != EAGAIN) { - perror("read"); - c->status &= ~PN_SEL_RD; - c->input_done = true; - pn_transport_close_tail( transport ); - } - } else if (n == 0) { - c->status &= ~PN_SEL_RD; - c->input_done = true; - pn_transport_close_tail( transport ); - } else { - if (pn_transport_process(transport, (size_t) n) < 0) { - c->status &= ~PN_SEL_RD; - c->input_done = true; - } - } - } - } - - capacity = pn_transport_capacity(transport); - - if (capacity < 0) { - c->status &= ~PN_SEL_RD; - c->input_done = true; - } - } - - /// - /// Event wakeup - /// - c->wakeup = pn_connector_tick(c, pn_i_now()); - - /// - /// Socket write - /// - if (!c->output_done) { - ssize_t pending = pn_transport_pending(transport); - if (pending > 0) { - c->status |= PN_SEL_WR; - if (c->pending_write) { - c->pending_write = false; - ssize_t n = pn_send(c->driver->io, c->fd, pn_transport_head(transport), pending); - if (n < 0) { - // XXX - if (errno != EAGAIN) { - perror("send"); - c->output_done = true; - c->status &= ~PN_SEL_WR; - pn_transport_close_head( transport ); - } - } else if (n) { - pn_transport_pop(transport, (size_t) n); - } - } - } else if (pending == 0) { - c->status &= ~PN_SEL_WR; - } else { - c->output_done = true; - c->status &= ~PN_SEL_WR; - } - } - - // Closed? - - if (c->input_done && c->output_done) { - if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) { - pn_logf("Closed %s", c->name); - } - pn_connector_close(c); - } - } -} - -// driver - -pn_driver_t *pn_driver() -{ - pn_driver_t *d = (pn_driver_t *) malloc(sizeof(pn_driver_t)); - if (!d) return NULL; - d->error = pn_error(); - d->io = pn_io(); - d->listener_head = NULL; - d->listener_tail = NULL; - d->listener_next = NULL; - d->connector_head = NULL; - d->connector_tail = NULL; - d->connector_next = NULL; - d->listener_count = 0; - d->connector_count = 0; - d->closed_count = 0; - d->capacity = 0; - d->fds = NULL; - d->nfds = 0; - d->ctrl[0] = 0; - d->ctrl[1] = 0; - d->trace = ((pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) | - (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF) | - (pn_env_bool("PN_TRACE_DRV") ? PN_TRACE_DRV : PN_TRACE_OFF)); - d->wakeup = 0; - - // XXX - if (pipe(d->ctrl)) { - perror("Can't create control pipe"); - } - - return d; -} - -int pn_driver_errno(pn_driver_t *d) -{ - assert(d); - return pn_error_code(d->error); -} - -pn_error_t *pn_driver_error(pn_driver_t *d) -{ - assert(d); - return d->error; -} - -void pn_driver_trace(pn_driver_t *d, pn_trace_t trace) -{ - d->trace = trace; -} - -void pn_driver_free(pn_driver_t *d) -{ - if (!d) return; - - close(d->ctrl[0]); - close(d->ctrl[1]); - while (d->connector_head) - pn_connector_free(d->connector_head); - while (d->listener_head) - pn_listener_free(d->listener_head); - free(d->fds); - pn_error_free(d->error); - pn_io_free(d->io); - free(d); -} - -int pn_driver_wakeup(pn_driver_t *d) -{ - if (d) { - ssize_t count = write(d->ctrl[1], "x", 1); - if (count <= 0) { - return count; - } else { - return 0; - } - } else { - return PN_ARG_ERR; - } -} - -static void pn_driver_rebuild(pn_driver_t *d) -{ - size_t size = d->listener_count + d->connector_count; - while (d->capacity < size + 1) { - d->capacity = d->capacity ? 2*d->capacity : 16; - d->fds = (struct pollfd *) realloc(d->fds, d->capacity*sizeof(struct pollfd)); - } - - d->wakeup = 0; - d->nfds = 0; - - d->fds[d->nfds].fd = d->ctrl[0]; - d->fds[d->nfds].events = POLLIN; - d->fds[d->nfds].revents = 0; - d->nfds++; - - pn_listener_t *l = d->listener_head; - for (unsigned i = 0; i < d->listener_count; i++) { - d->fds[d->nfds].fd = l->fd; - d->fds[d->nfds].events = POLLIN; - d->fds[d->nfds].revents = 0; - l->idx = d->nfds; - d->nfds++; - l = l->listener_next; - } - - pn_connector_t *c = d->connector_head; - for (unsigned i = 0; i < d->connector_count; i++) - { - if (!c->closed) { - d->wakeup = pn_timestamp_min(d->wakeup, c->wakeup); - d->fds[d->nfds].fd = c->fd; - d->fds[d->nfds].events = (c->status & PN_SEL_RD ? POLLIN : 0) | (c->status & PN_SEL_WR ? POLLOUT : 0); - d->fds[d->nfds].revents = 0; - c->idx = d->nfds; - d->nfds++; - } - c = c->connector_next; - } -} - -void pn_driver_wait_1(pn_driver_t *d) -{ - pn_driver_rebuild(d); -} - -int pn_driver_wait_2(pn_driver_t *d, int timeout) -{ - if (d->wakeup) { - pn_timestamp_t now = pn_i_now(); - if (now >= d->wakeup) - timeout = 0; - else - timeout = (timeout < 0) ? d->wakeup-now : pn_min(timeout, d->wakeup - now); - } - int result = poll(d->fds, d->nfds, d->closed_count > 0 ? 0 : timeout); - if (result == -1) - pn_i_error_from_errno(d->error, "poll"); - return result; -} - -int pn_driver_wait_3(pn_driver_t *d) -{ - bool woken = false; - if (d->fds[0].revents & POLLIN) { - woken = true; - //clear the pipe - char buffer[512]; - while (read(d->ctrl[0], buffer, 512) == 512); - } - - pn_listener_t *l = d->listener_head; - while (l) { - l->pending = (l->idx && d->fds[l->idx].revents & POLLIN); - l = l->listener_next; - } - - pn_timestamp_t now = pn_i_now(); - pn_connector_t *c = d->connector_head; - while (c) { - if (c->closed) { - c->pending_read = false; - c->pending_write = false; - c->pending_tick = false; - } else { - int idx = c->idx; - c->pending_read = (idx && d->fds[idx].revents & POLLIN); - c->pending_write = (idx && d->fds[idx].revents & POLLOUT); - c->pending_tick = (c->wakeup && c->wakeup <= now); - if (idx && d->fds[idx].revents & POLLERR) - pn_connector_close(c); - else if (idx && (d->fds[idx].revents & POLLHUP)) { - if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) { - pn_logf("hangup on connector %s", c->name); - } - /* poll() is signalling POLLHUP. to see what happened we need - * to do an actual recv() to get the error code. But we might - * be in a state where we're not interested in input, in that - * case try to get the error code via send() */ - if (d->fds[idx].events & POLLIN) - c->pending_read = true; - else if (d->fds[idx].events & POLLOUT) - c->pending_write = true; - } else if (idx && (d->fds[idx].revents & ~(POLLIN|POLLOUT|POLLERR|POLLHUP))) { - if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) { - pn_logf("Unexpected poll events: %04x on %s", - d->fds[idx].revents, c->name); - } - } - } - c = c->connector_next; - } - - d->listener_next = d->listener_head; - d->connector_next = d->connector_head; - - return woken ? PN_INTR : 0; -} - -// -// XXX - pn_driver_wait has been divided into three internal functions as a -// temporary workaround for a multi-threading problem. A multi-threaded -// application must hold a lock on parts 1 and 3, but not on part 2. -// This temporary change, which is not reflected in the driver's API, allows -// a multi-threaded application to use the three parts separately. -// -// This workaround will eventually be replaced by a more elegant solution -// to the problem. -// -int pn_driver_wait(pn_driver_t *d, int timeout) -{ - pn_driver_wait_1(d); - int result = pn_driver_wait_2(d, timeout); - if (result == -1) - return pn_error_code(d->error); - return pn_driver_wait_3(d); -} - -pn_listener_t *pn_driver_listener(pn_driver_t *d) { - if (!d) return NULL; - - while (d->listener_next) { - pn_listener_t *l = d->listener_next; - d->listener_next = l->listener_next; - - if (l->pending) { - return l; - } - } - - return NULL; -} - -pn_connector_t *pn_driver_connector(pn_driver_t *d) { - if (!d) return NULL; - - while (d->connector_next) { - pn_connector_t *c = d->connector_next; - d->connector_next = c->connector_next; - - if (c->closed || c->pending_read || c->pending_write || c->pending_tick) { - return c; - } - } - - return NULL; -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4b53bfca/proton-c/src/windows/driver.c ---------------------------------------------------------------------- diff --git a/proton-c/src/windows/driver.c b/proton-c/src/windows/driver.c deleted file mode 100644 index 996caac..0000000 --- a/proton-c/src/windows/driver.c +++ /dev/null @@ -1,819 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "selectable.h" -#include "util.h" -#include "platform.h" - -/* - * This driver provides limited thread safety for some operations on pn_connector_t objects. - * - * These calls are: pn_connector_process(), pn_connector_activate(), pn_connector_activated(), - * pn_connector_close(), and others that only touch the connection object, i.e. - * pn_connector_context(). These calls provide limited safety in that simultaneous calls are - * not allowed to the same pn_connector_t object. - * - * The application must call pn_driver_wakeup() and resume its wait loop logic if a call to - * pn_wait() may have overlapped with any of the above calls that could affect a pn_wait() - * outcome. - */ - -/* Decls */ - -#define PN_SEL_RD (0x0001) -#define PN_SEL_WR (0x0002) - -struct pn_driver_t { - pn_error_t *error; - pn_io_t *io; - pn_selector_t *selector; - pn_listener_t *listener_head; - pn_listener_t *listener_tail; - pn_listener_t *listener_next; - pn_connector_t *connector_head; - pn_connector_t *connector_tail; - pn_listener_t *ready_listener_head; - pn_listener_t *ready_listener_tail; - pn_connector_t *ready_connector_head; - pn_connector_t *ready_connector_tail; - pn_selectable_t *ctrl_selectable; - size_t listener_count; - size_t connector_count; - pn_socket_t ctrl[2]; //pipe for updating selectable status - pn_trace_t trace; -}; - -typedef enum {LISTENER, CONNECTOR} sel_type_t; - -struct pn_listener_t { - sel_type_t type; - pn_driver_t *driver; - pn_listener_t *listener_next; - pn_listener_t *listener_prev; - pn_listener_t *ready_listener_next; - pn_listener_t *ready_listener_prev; - void *context; - pn_selectable_t *selectable; - bool pending; - bool closed; -}; - -#define PN_NAME_MAX (256) - -struct pn_connector_t { - sel_type_t type; - pn_driver_t *driver; - pn_connector_t *connector_next; - pn_connector_t *connector_prev; - pn_connector_t *ready_connector_next; - pn_connector_t *ready_connector_prev; - char name[PN_NAME_MAX]; - pn_timestamp_t wakeup; - pn_timestamp_t posted_wakeup; - pn_connection_t *connection; - pn_transport_t *transport; - pn_sasl_t *sasl; - pn_listener_t *listener; - void *context; - pn_selectable_t *selectable; - int idx; - int status; - int posted_status; - pn_trace_t trace; - bool pending_tick; - bool pending_read; - bool pending_write; - bool closed; - bool input_done; - bool output_done; -}; - -static void get_new_events(pn_driver_t *); - -/* Impls */ - -// listener - -static void pn_driver_add_listener(pn_driver_t *d, pn_listener_t *l) -{ - if (!l->driver) return; - LL_ADD(d, listener, l); - l->driver = d; - d->listener_count++; - pn_selector_add(d->selector, l->selectable); -} - -static void ready_listener_list_remove(pn_driver_t *d, pn_listener_t *l) -{ - LL_REMOVE(d, ready_listener, l); - l->ready_listener_next = NULL; - l->ready_listener_prev = NULL; -} - -static void pn_driver_remove_listener(pn_driver_t *d, pn_listener_t *l) -{ - if (!l->driver) return; - - pn_selector_remove(d->selector, l->selectable); - if (l == d->ready_listener_head || l->ready_listener_prev) - ready_listener_list_remove(d, l); - - if (l == d->listener_next) { - d->listener_next = l->listener_next; - } - LL_REMOVE(d, listener, l); - l->driver = NULL; - d->listener_count--; -} - -pn_listener_t *pn_listener(pn_driver_t *driver, const char *host, - const char *port, void* context) -{ - if (!driver) return NULL; - - pn_socket_t sock = pn_listen(driver->io, host, port); - - if (sock == PN_INVALID_SOCKET) { - return NULL; - } else { - pn_listener_t *l = pn_listener_fd(driver, sock, context); - - if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) - fprintf(stderr, "Listening on %s:%s\n", host, port); - return l; - } -} - -pn_listener_t *pn_listener_fd(pn_driver_t *driver, pn_socket_t fd, void *context) -{ - if (!driver) return NULL; - - pn_listener_t *l = (pn_listener_t *) malloc(sizeof(pn_listener_t)); - if (!l) return NULL; - l->type = LISTENER; - l->driver = driver; - l->listener_next = NULL; - l->listener_prev = NULL; - l->ready_listener_next = NULL; - l->ready_listener_prev = NULL; - l->pending = false; - l->closed = false; - l->context = context; - l->selectable = pn_selectable(); - pn_selectable_set_reading(l->selectable, true); - pn_selectable_set_fd(l->selectable, fd); - pni_selectable_set_context(l->selectable, l); - pn_driver_add_listener(driver, l); - return l; -} - -pn_socket_t pn_listener_get_fd(pn_listener_t *listener) -{ - assert(listener); - return pn_selectable_get_fd(listener->selectable); -} - -pn_listener_t *pn_listener_head(pn_driver_t *driver) -{ - return driver ? driver->listener_head : NULL; -} - -pn_listener_t *pn_listener_next(pn_listener_t *listener) -{ - return listener ? listener->listener_next : NULL; -} - -void pn_listener_trace(pn_listener_t *l, pn_trace_t trace) { - // XXX -} - -void *pn_listener_context(pn_listener_t *l) { - return l ? l->context : NULL; -} - -void pn_listener_set_context(pn_listener_t *listener, void *context) -{ - assert(listener); - listener->context = context; -} - -pn_connector_t *pn_listener_accept(pn_listener_t *l) -{ - if (!l || !l->pending) return NULL; - char name[PN_NAME_MAX]; - - pn_socket_t sock = pn_accept(l->driver->io, pn_selectable_get_fd(l->selectable), name, PN_NAME_MAX); - if (sock == PN_INVALID_SOCKET) { - return NULL; - } else { - if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) - fprintf(stderr, "Accepted from %s\n", name); - pn_connector_t *c = pn_connector_fd(l->driver, sock, NULL); - snprintf(c->name, PN_NAME_MAX, "%s", name); - c->listener = l; - c->transport = pn_transport(); - pn_transport_set_server(c->transport); - c->sasl = pn_sasl(c->transport); - return c; - } -} - -void pn_listener_close(pn_listener_t *l) -{ - if (!l) return; - if (l->closed) return; - - pn_close(l->driver->io, pn_selectable_get_fd(l->selectable)); - l->closed = true; -} - -void pn_listener_free(pn_listener_t *l) -{ - if (!l) return; - - if (l->driver) pn_driver_remove_listener(l->driver, l); - pn_selectable_free(l->selectable); - free(l); -} - -// connector - -static void pn_driver_add_connector(pn_driver_t *d, pn_connector_t *c) -{ - if (!c->driver) return; - LL_ADD(d, connector, c); - c->driver = d; - d->connector_count++; - pn_selector_add(d->selector, c->selectable); -} - -static void ready_connector_list_remove(pn_driver_t *d, pn_connector_t *c) -{ - LL_REMOVE(d, ready_connector, c); - c->ready_connector_next = NULL; - c->ready_connector_prev = NULL; -} - -static void pn_driver_remove_connector(pn_driver_t *d, pn_connector_t *c) -{ - if (!c->driver) return; - - pn_selector_remove(d->selector, c->selectable); - if (c == d->ready_connector_head || c->ready_connector_prev) - ready_connector_list_remove(d, c); - - LL_REMOVE(d, connector, c); - c->driver = NULL; - d->connector_count--; -} - -pn_connector_t *pn_connector(pn_driver_t *driver, const char *host, - const char *port, void *context) -{ - if (!driver) return NULL; - - pn_socket_t sock = pn_connect(driver->io, host, port); - pn_connector_t *c = pn_connector_fd(driver, sock, context); - c->transport = pn_transport(); - c->sasl = pn_sasl(c->transport); - snprintf(c->name, PN_NAME_MAX, "%s:%s", host, port); - if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) - fprintf(stderr, "Connected to %s\n", c->name); - return c; -} - -pn_connector_t *pn_connector_fd(pn_driver_t *driver, pn_socket_t fd, void *context) -{ - if (!driver) return NULL; - - pn_connector_t *c = (pn_connector_t *) malloc(sizeof(pn_connector_t)); - if (!c) return NULL; - c->type = CONNECTOR; - c->driver = driver; - c->connector_next = NULL; - c->connector_prev = NULL; - c->ready_connector_next = NULL; - c->ready_connector_prev = NULL; - c->pending_tick = false; - c->pending_read = false; - c->pending_write = false; - c->name[0] = '\0'; - c->status = PN_SEL_RD | PN_SEL_WR; - c->posted_status = -1; - c->trace = driver->trace; - c->closed = false; - c->wakeup = 0; - c->posted_wakeup = 0; - c->connection = NULL; - c->transport = NULL; - c->input_done = false; - c->output_done = false; - c->context = context; - c->listener = NULL; - c->selectable = pn_selectable(); - pn_selectable_set_fd(c->selectable, fd); - pni_selectable_set_context(c->selectable, c); - pn_connector_trace(c, driver->trace); - - pn_driver_add_connector(driver, c); - return c; -} - -pn_socket_t pn_connector_get_fd(pn_connector_t *connector) -{ - assert(connector); - return pn_selectable_get_fd(connector->selectable); -} - -pn_connector_t *pn_connector_head(pn_driver_t *driver) -{ - return driver ? driver->connector_head : NULL; -} - -pn_connector_t *pn_connector_next(pn_connector_t *connector) -{ - return connector ? connector->connector_next : NULL; -} - -void pn_connector_trace(pn_connector_t *ctor, pn_trace_t trace) -{ - if (!ctor) return; - ctor->trace = trace; - if (ctor->transport) pn_transport_trace(ctor->transport, trace); -} - -pn_sasl_t *pn_connector_sasl(pn_connector_t *ctor) -{ - return ctor ? ctor->sasl : NULL; -} - -pn_transport_t *pn_connector_transport(pn_connector_t *ctor) -{ - return ctor ? ctor->transport : NULL; -} - -void pn_connector_set_connection(pn_connector_t *ctor, pn_connection_t *connection) -{ - if (!ctor) return; - if (ctor->connection) { - pn_decref(ctor->connection); - pn_transport_unbind(ctor->transport); - } - ctor->connection = connection; - if (ctor->connection) { - pn_incref(ctor->connection); - pn_transport_bind(ctor->transport, connection); - } - if (ctor->transport) pn_transport_trace(ctor->transport, ctor->trace); -} - -pn_connection_t *pn_connector_connection(pn_connector_t *ctor) -{ - return ctor ? ctor->connection : NULL; -} - -void *pn_connector_context(pn_connector_t *ctor) -{ - return ctor ? ctor->context : NULL; -} - -void pn_connector_set_context(pn_connector_t *ctor, void *context) -{ - if (!ctor) return; - ctor->context = context; -} - -const char *pn_connector_name(const pn_connector_t *ctor) -{ - if (!ctor) return 0; - return ctor->name; -} - -pn_listener_t *pn_connector_listener(pn_connector_t *ctor) -{ - return ctor ? ctor->listener : NULL; -} - -void pn_connector_close(pn_connector_t *ctor) -{ - // XXX: should probably signal engine and callback here - if (!ctor) return; - - ctor->status = 0; - pn_close(ctor->driver->io, pn_selectable_get_fd(ctor->selectable)); - ctor->closed = true; -} - -bool pn_connector_closed(pn_connector_t *ctor) -{ - return ctor ? ctor->closed : true; -} - -void pn_connector_free(pn_connector_t *ctor) -{ - if (!ctor) return; - - if (ctor->driver) pn_driver_remove_connector(ctor->driver, ctor); - pn_transport_free(ctor->transport); - ctor->transport = NULL; - if (ctor->connection) pn_decref(ctor->connection); - ctor->connection = NULL; - pn_selectable_free(ctor->selectable); - free(ctor); -} - -void pn_connector_activate(pn_connector_t *ctor, pn_activate_criteria_t crit) -{ - switch (crit) { - case PN_CONNECTOR_WRITABLE : - ctor->status |= PN_SEL_WR; - break; - - case PN_CONNECTOR_READABLE : - ctor->status |= PN_SEL_RD; - break; - } -} - - -bool pn_connector_activated(pn_connector_t *ctor, pn_activate_criteria_t crit) -{ - bool result = false; - - switch (crit) { - case PN_CONNECTOR_WRITABLE : - result = ctor->pending_write; - ctor->pending_write = false; - ctor->status &= ~PN_SEL_WR; - break; - - case PN_CONNECTOR_READABLE : - result = ctor->pending_read; - ctor->pending_read = false; - ctor->status &= ~PN_SEL_RD; - break; - } - - return result; -} - -static pn_timestamp_t pn_connector_tick(pn_connector_t *ctor, pn_timestamp_t now) -{ - if (!ctor->transport) return 0; - return pn_transport_tick(ctor->transport, now); -} - -void pn_connector_process(pn_connector_t *c) -{ - if (c) { - if (c->closed) return; - - pn_transport_t *transport = c->transport; - pn_socket_t sock = pn_selectable_get_fd(c->selectable); - - /// - /// Socket read - /// - if (!c->input_done) { - ssize_t capacity = pn_transport_capacity(transport); - if (capacity > 0) { - c->status |= PN_SEL_RD; - if (c->pending_read) { - c->pending_read = false; - ssize_t n = pn_recv(c->driver->io, sock, pn_transport_tail(transport), capacity); - if (n < 0) { - if (errno != EAGAIN) { - perror("read"); - c->status &= ~PN_SEL_RD; - c->input_done = true; - pn_transport_close_tail( transport ); - } - } else if (n == 0) { - c->status &= ~PN_SEL_RD; - c->input_done = true; - pn_transport_close_tail( transport ); - } else { - if (pn_transport_process(transport, (size_t) n) < 0) { - c->status &= ~PN_SEL_RD; - c->input_done = true; - } - } - } - } - - capacity = pn_transport_capacity(transport); - - if (capacity < 0) { - c->status &= ~PN_SEL_RD; - c->input_done = true; - } - } - - /// - /// Event wakeup - /// - c->wakeup = pn_connector_tick(c, pn_i_now()); - - /// - /// Socket write - /// - if (!c->output_done) { - ssize_t pending = pn_transport_pending(transport); - if (pending > 0) { - c->status |= PN_SEL_WR; - if (c->pending_write) { - c->pending_write = false; - ssize_t n = pn_send(c->driver->io, sock, pn_transport_head(transport), pending); - if (n < 0) { - // XXX - if (errno != EAGAIN) { - perror("send"); - c->output_done = true; - c->status &= ~PN_SEL_WR; - pn_transport_close_head( transport ); - } - } else if (n) { - pn_transport_pop(transport, (size_t) n); - } - } - } else if (pending == 0) { - c->status &= ~PN_SEL_WR; - } else { - c->output_done = true; - c->status &= ~PN_SEL_WR; - } - } - - // Closed? - - if (c->input_done && c->output_done) { - if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) { - fprintf(stderr, "Closed %s\n", c->name); - } - pn_connector_close(c); - } - } -} - -// driver - -static pn_selectable_t *create_ctrl_selectable(pn_socket_t fd); - -pn_driver_t *pn_driver() -{ - pn_driver_t *d = (pn_driver_t *) malloc(sizeof(pn_driver_t)); - if (!d) return NULL; - - d->error = pn_error(); - d->io = pn_io(); - d->selector = pn_io_selector(d->io); - d->listener_head = NULL; - d->listener_tail = NULL; - d->listener_next = NULL; - d->ready_listener_head = NULL; - d->ready_listener_tail = NULL; - d->connector_head = NULL; - d->connector_tail = NULL; - d->ready_connector_head = NULL; - d->ready_connector_tail = NULL; - d->listener_count = 0; - d->connector_count = 0; - d->ctrl[0] = 0; - d->ctrl[1] = 0; - d->trace = ((pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) | - (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF) | - (pn_env_bool("PN_TRACE_DRV") ? PN_TRACE_DRV : PN_TRACE_OFF)); - - // XXX - if (pn_pipe(d->io, d->ctrl)) { - perror("Can't create control pipe"); - free(d); - return NULL; - } - d->ctrl_selectable = create_ctrl_selectable(d->ctrl[0]); - pn_selector_add(d->selector, d->ctrl_selectable); - - return d; -} - -int pn_driver_errno(pn_driver_t *d) -{ - assert(d); - return pn_error_code(d->error); -} - -pn_error_t *pn_driver_error(pn_driver_t *d) -{ - assert(d); - return d->error; -} - -void pn_driver_trace(pn_driver_t *d, pn_trace_t trace) -{ - d->trace = trace; -} - -void pn_driver_free(pn_driver_t *d) -{ - if (!d) return; - - pn_selectable_free(d->ctrl_selectable); - pn_close(d->io, d->ctrl[0]); - pn_close(d->io, d->ctrl[1]); - while (d->connector_head) - pn_connector_free(d->connector_head); - while (d->listener_head) - pn_listener_free(d->listener_head); - pn_error_free(d->error); - pn_io_free(d->io); - free(d); -} - -int pn_driver_wakeup(pn_driver_t *d) -{ - if (d) { - ssize_t count = pn_write(d->io, d->ctrl[1], "x", 1); - if (count <= 0) { - return count; - } else { - return 0; - } - } else { - return PN_ARG_ERR; - } -} - -void pn_driver_wait_1(pn_driver_t *d) -{ -} - -int pn_driver_wait_2(pn_driver_t *d, int timeout) -{ - // These lists will normally be empty - while (d->ready_listener_head) - ready_listener_list_remove(d, d->ready_listener_head); - while (d->ready_connector_head) - ready_connector_list_remove(d, d->ready_connector_head); - pn_connector_t *c = d->connector_head; - for (unsigned i = 0; i < d->connector_count; i++) - { - // Optimistically use a snapshot of the non-threadsafe vars. - // If they are in flux, the app will guarantee progress with a pn_driver_wakeup(). - int current_status = c->status; - pn_timestamp_t current_wakeup = c->wakeup; - if (c->posted_status != current_status || c->posted_wakeup != current_wakeup) { - c->posted_status = current_status; - c->posted_wakeup = current_wakeup; - pn_selectable_t *sel = c->selectable; - pn_selectable_set_reading(sel, c->posted_status & PN_SEL_RD ? 1 : 0); - pn_selectable_set_writing(sel, c->posted_status & PN_SEL_WR ? 1 : 0); - pn_selectable_set_deadline(sel, c->posted_wakeup); - pn_selector_update(c->driver->selector, c->selectable); - } - if (c->closed) { - c->pending_read = false; - c->pending_write = false; - c->pending_tick = false; - LL_ADD(d, ready_connector, c); - pn_selectable_terminate(c->selectable); - } - c = c->connector_next; - } - - if (d->ready_connector_head) - timeout = 0; // We found closed connections - - int code = pn_selector_select(d->selector, timeout); - if (code) { - pn_error_set(d->error, code, "select"); - return -1; - } - get_new_events(d); - return 0; -} - -int pn_driver_wait_3(pn_driver_t *d) -{ - // no-op with new selector/selectables - return 0; -} - - -// XXX - pn_driver_wait has been divided into three internal functions as a -// temporary workaround for a multi-threading problem. A multi-threaded -// application must hold a lock on parts 1 and 3, but not on part 2. -// This temporary change, which is not reflected in the driver's API, allows -// a multi-threaded application to use the three parts separately. -// -// This workaround will eventually be replaced by a more elegant solution -// to the problem. -// -int pn_driver_wait(pn_driver_t *d, int timeout) -{ - pn_driver_wait_1(d); - int result = pn_driver_wait_2(d, timeout); - if (result == -1) - return pn_error_code(d->error); - return pn_driver_wait_3(d); -} - -static void get_new_events(pn_driver_t *d) -{ - bool woken = false; - int events; - pn_selectable_t *sel; - while ((sel = pn_selector_next(d->selector, &events)) != NULL) { - if (sel == d->ctrl_selectable) { - woken = true; - //clear the pipe - char buffer[512]; - while (pn_read(d->io, d->ctrl[0], buffer, 512) == 512); - continue; - } - - void *ctx = pni_selectable_get_context(sel); - sel_type_t *type = (sel_type_t *) ctx; - if (*type == CONNECTOR) { - pn_connector_t *c = (pn_connector_t *) ctx; - if (!c->closed) { - LL_ADD(d, ready_connector, c); - c->pending_read = events & PN_READABLE; - c->pending_write = events & PN_WRITABLE; - c->pending_tick = events & PN_EXPIRED; - } - } else { - pn_listener_t *l = (pn_listener_t *) ctx; - LL_ADD(d, ready_listener, l); - l->pending = events & PN_READABLE; - } - } -} - -pn_listener_t *pn_driver_listener(pn_driver_t *d) { - if (!d) return NULL; - - pn_listener_t *l = d->ready_listener_head; - while (l) { - ready_listener_list_remove(d, l); - if (l->pending) - return l; - l = d->ready_listener_head; - } - return NULL; -} - -pn_connector_t *pn_driver_connector(pn_driver_t *d) { - if (!d) return NULL; - - pn_connector_t *c = d->ready_connector_head; - while (c) { - ready_connector_list_remove(d, c); - if (c->closed || c->pending_read || c->pending_write || c->pending_tick) { - return c; - } - c = d->ready_connector_head; - } - return NULL; -} - -static pn_selectable_t *create_ctrl_selectable(pn_socket_t fd) -{ - // ctrl input only needs to know about read events, just like a listener. - pn_selectable_t *sel = pn_selectable(); - pn_selectable_set_reading(sel, true); - pn_selectable_set_fd(sel, fd); - return sel; -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4b53bfca/tests/python/proton_tests/common.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/common.py b/tests/python/proton_tests/common.py index e164cc3..4303572 100644 --- a/tests/python/proton_tests/common.py +++ b/tests/python/proton_tests/common.py @@ -23,7 +23,9 @@ from threading import Thread from socket import socket, AF_INET, SOCK_STREAM from subprocess import Popen,PIPE,STDOUT import sys, os, string -from proton import Driver, Connection, Transport, SASL, Endpoint, Delivery, SSL +from proton import Connection, Transport, SASL, Endpoint, Delivery, SSL +from proton.reactors import Container +from proton.handlers import CHandshaker, CFlowController def free_tcp_ports(count=1): @@ -122,194 +124,59 @@ class TestServer(object): """ def __init__(self, **kwargs): self.args = kwargs - self.driver = Driver() + self.reactor = Container(self) self.host = "127.0.0.1" self.port = 0 if "host" in kwargs: self.host = kwargs["host"] if "port" in kwargs: self.port = kwargs["port"] - self.driver_timeout = -1 - self.credit_batch = 10 + self.handlers = [CFlowController(10), CHandshaker()] self.thread = Thread(name="server-thread", target=self.run) self.thread.daemon = True self.running = True + self.conditions = [] def start(self): + self.reactor.start() retry = 0 if self.port == 0: self.port = str(randint(49152, 65535)) retry = 10 - self.listener = self.driver.listener(self.host, self.port) - while not self.listener and retry > 0: - retry -= 1 - self.port = str(randint(49152, 65535)) - self.listener = self.driver.listener(self.host, self.port) - assert self.listener, "No free port for server to listen on!" + while retry > 0: + try: + self.acceptor = self.reactor.acceptor(self.host, self.port) + break + except IOError: + self.port = str(randint(49152, 65535)) + retry -= 1 + assert retry > 0, "No free port for server to listen on!" self.thread.start() def stop(self): self.running = False - self.driver.wakeup() + self.reactor.wakeup() self.thread.join() - if self.listener: - self.listener.close() - cxtr = self.driver.head_connector() - while cxtr: - if not cxtr.closed: - cxtr.close() - cxtr = cxtr.next() # Note: all following methods all run under the thread: def run(self): - while self.running: - self.driver.wait(self.driver_timeout) - self.process_listeners() - self.process_connectors() - - - def process_listeners(self): - """ Service each pending listener - """ - l = self.driver.pending_listener() - while l: - cxtr = l.accept() - assert(cxtr) - self.init_connector(cxtr) - l = self.driver.pending_listener() - - def init_connector(self, cxtr): - """ Initialize a newly accepted connector - """ - sasl = cxtr.sasl() - sasl.mechanisms("ANONYMOUS") - cxtr.connection = Connection() + self.reactor.timeout = 3.14159265359 + while self.reactor.process(): + if not self.running: + self.acceptor.close() + self.reactor.stop() + break + + def on_connection_bound(self, event): if "idle_timeout" in self.args: - cxtr.transport.idle_timeout = self.args["idle_timeout"] - - def process_connectors(self): - """ Service each pending connector - """ - cxtr = self.driver.pending_connector() - while cxtr: - self.process_connector(cxtr) - cxtr = self.driver.pending_connector() - - def process_connector(self, cxtr): - """ Process a pending connector - """ - if not cxtr.closed: - cxtr.process() - sasl = cxtr.sasl() - if sasl.state != SASL.STATE_PASS: - self.authenticate_connector(cxtr) - else: - conn = cxtr.connection - if conn: - self.service_connection(conn) - cxtr.process() - - def authenticate_connector(self, cxtr): - """ Deal with a connector that has not passed SASL - """ - # by default, just permit anyone - sasl = cxtr.sasl() - if sasl.state == SASL.STATE_STEP: - sasl.done(SASL.OK) - - def service_connection(self, conn): - """ Process a Connection - """ - if conn.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_UNINIT: - conn.open() - - # open all pending sessions - ssn = conn.session_head(Endpoint.LOCAL_UNINIT) - while ssn: - self.init_session(ssn) - ssn.open() - ssn = ssn.next(Endpoint.LOCAL_UNINIT) - - # configure and open any pending links - link = conn.link_head(Endpoint.LOCAL_UNINIT) - while link: - self.init_link(link) - link.open() - link = link.next(Endpoint.LOCAL_UNINIT); - - ## Step 2: Now drain all the pending deliveries from the connection's - ## work queue and process them - - delivery = conn.work_head - while delivery: - self.process_delivery(delivery) - delivery = conn.work_head - - ## Step 3: Clean up any links or sessions that have been closed by the - ## remote. If the connection has been closed remotely, clean that up - ## also. - - # teardown any terminating links - link = conn.link_head(Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED) - while link: - link.close() - link = link.next(Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED) - - # teardown any terminating sessions - ssn = conn.session_head(Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED) - while ssn: - ssn.close(ssn) - ssn = ssn.next(Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED) - - if conn.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_CLOSED: - conn.close() - - def init_session(self, ssn): - """ Test-specific Session initialization - """ - pass + event.transport.idle_timeout = self.args["idle_timeout"] - def init_link(self, link): - """ Test-specific Link initialization - """ - pass - - def process_delivery(self, delivery): - """ Test-specific Delivery processing. - """ - pass - - -class TestServerDrain(TestServer): - """ A primitive test server that accepts connections and simply discards any - messages sent to it. - """ - def __init__(self, **kwargs): - TestServer.__init__(self, **kwargs) - - def init_link(self, link): - """ Test-specific Link initialization - """ - if link.is_receiver: - link.flow(self.credit_batch) - - def process_delivery(self, delivery): - """ Just drop any incomming messages - """ - link = delivery.link - if delivery.readable: # inbound data available - m = link.recv(1024) - while m: - #print("Dropping msg...%s" % str(m)) - m = link.recv(1024) - delivery.update(Delivery.ACCEPTED) - delivery.settle() - else: - link.advance() - if link.credit == 0: - link.flow(self.credit_batch) + def on_connection_local_close(self, event): + self.conditions.append(event.connection.condition) + def on_delivery(self, event): + event.delivery.settle() # # Classes that wrap the messenger applications msgr-send and msgr-recv. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org