qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject svn commit: r1572744 [1/2] - in /qpid/proton/trunk: proton-c/ proton-c/bindings/python/ proton-c/include/proton/ proton-c/src/ proton-c/src/dispatcher/ proton-c/src/messenger/ proton-c/src/object/ proton-c/src/posix/ proton-c/src/ssl/ proton-c/src/tran...
Date Thu, 27 Feb 2014 21:25:14 GMT
Author: rhs
Date: Thu Feb 27 21:25:13 2014
New Revision: 1572744

URL: http://svn.apache.org/r1572744
Log:
Refactored messenger's I/O in such a way that it can be driven by an
external poll/epoll/select loop as described in PROTON-525. This
involved factoring a bunch of utility I/O code out of the driver and
into a more basic I/O interface (io.h and selector.h), as well as
introducing the pn_selectable_t interface.

Added:
    qpid/proton/trunk/proton-c/include/proton/io.h
    qpid/proton/trunk/proton-c/include/proton/selectable.h
    qpid/proton/trunk/proton-c/include/proton/selector.h
    qpid/proton/trunk/proton-c/src/posix/io.c
    qpid/proton/trunk/proton-c/src/selectable.c
    qpid/proton/trunk/proton-c/src/selectable.h
    qpid/proton/trunk/proton-c/src/selector.c
Modified:
    qpid/proton/trunk/proton-c/CMakeLists.txt
    qpid/proton/trunk/proton-c/bindings/python/proton.py
    qpid/proton/trunk/proton-c/include/proton/cproton.i
    qpid/proton/trunk/proton-c/include/proton/driver.h
    qpid/proton/trunk/proton-c/include/proton/driver_extras.h
    qpid/proton/trunk/proton-c/include/proton/engine.h
    qpid/proton/trunk/proton-c/include/proton/messenger.h
    qpid/proton/trunk/proton-c/include/proton/object.h
    qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
    qpid/proton/trunk/proton-c/src/messenger/messenger.c
    qpid/proton/trunk/proton-c/src/object/object.c
    qpid/proton/trunk/proton-c/src/posix/driver.c
    qpid/proton/trunk/proton-c/src/ssl/openssl.c
    qpid/proton/trunk/proton-c/src/transport/transport.c
    qpid/proton/trunk/proton-j/src/main/resources/cmessenger.py
    qpid/proton/trunk/tests/python/proton_tests/messenger.py

Modified: qpid/proton/trunk/proton-c/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/CMakeLists.txt?rev=1572744&r1=1572743&r2=1572744&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/CMakeLists.txt (original)
+++ qpid/proton/trunk/proton-c/CMakeLists.txt Thu Feb 27 21:25:13 2014
@@ -74,8 +74,10 @@ add_custom_command (
 
 # Select driver
 if(PN_WINAPI)
+  set (pn_io_impl src/windows/io.c)
   set (pn_driver_impl src/windows/driver.c)
 else(PN_WINAPI)
+  set (pn_io_impl src/posix/io.c)
   set (pn_driver_impl src/posix/driver.c)
 endif(PN_WINAPI)
 
@@ -227,6 +229,7 @@ add_subdirectory(../examples/messenger/c
 add_subdirectory(../tests/tools/apps/c ../tests/tools/apps/c)
 
 set (qpid-proton-platform
+  ${pn_io_impl}
   ${pn_driver_impl}
   src/platform.c
   ${pn_driver_ssl_impl}
@@ -259,6 +262,8 @@ set (qpid-proton-core
   src/messenger/subscription.c
   src/messenger/store.c
   src/messenger/transform.c
+  src/selectable.c
+  src/selector.c
 
   ${CMAKE_CURRENT_BINARY_DIR}/encodings.h
   ${CMAKE_CURRENT_BINARY_DIR}/protocol.h

Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1572744&r1=1572743&r2=1572744&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Thu Feb 27 21:25:13 2014
@@ -249,6 +249,7 @@ class Messenger(object):
     @param name: the name of the messenger or None
     """
     self._mng = pn_messenger(name)
+    self._selectables = {}
 
   def __del__(self):
     """
@@ -367,6 +368,20 @@ exception of L{work}.  Currently, the af
 L{send}, L{recv}, and L{stop}.
 """)
 
+  def _is_passive(self):
+    return pn_messenger_is_passive(self._mng)
+
+  def _set_passive(self, b):
+    self._check(pn_messenger_set_passive(self._mng, b))
+
+  passive = property(_is_passive, _set_passive,
+                      doc="""
+When passive is set to true, Messenger will not attempt to perform I/O
+internally. In this mode it is necessary to use the selectables API to
+drive any I/O needed to perform requested actions. In this mode
+Messenger will never block.
+""")
+
   def _get_incoming_window(self):
     return pn_messenger_get_incoming_window(self._mng)
 
@@ -447,7 +462,7 @@ first message.
     """
     sub_impl = pn_messenger_subscribe(self._mng, source)
     if not sub_impl:
-      self._check(PN_ERR)
+      self._check(pn_error_code(pn_messenger_error(self._mng)))
     return Subscription(sub_impl)
 
   def put(self, message):
@@ -723,6 +738,18 @@ first message.
     """
     self._check(pn_messenger_rewrite(self._mng, pattern, address))
 
+  def selectable(self):
+    impl = pn_messenger_selectable(self._mng)
+    if impl:
+      fd = pn_selectable_fd(impl)
+      sel = self._selectables.get(fd, None)
+      if sel is None:
+        sel = Selectable(self, impl)
+        self._selectables[fd] = sel
+      return sel
+    else:
+      return None
+
 class Message(object):
   """
   The L{Message} class is a mutable holder of message content.
@@ -1100,6 +1127,70 @@ class Subscription(object):
   def address(self):
     return pn_subscription_address(self._impl)
 
+class Selectable(object):
+
+  def __init__(self, messenger, impl):
+    self.messenger = messenger
+    self._impl = impl
+
+  def fileno(self):
+    if not self._impl: raise ValueError("selectable freed")
+    return pn_selectable_fd(self._impl)
+
+  @property
+  def capacity(self):
+    if not self._impl: raise ValueError("selectable freed")
+    return pn_selectable_capacity(self._impl)
+
+  @property
+  def pending(self):
+    if not self._impl: raise ValueError("selectable freed")
+    return pn_selectable_pending(self._impl)
+
+  @property
+  def deadline(self):
+    if not self._impl: raise ValueError("selectable freed")
+
+  def readable(self):
+    if not self._impl: raise ValueError("selectable freed")
+    pn_selectable_readable(self._impl)
+
+  def writable(self):
+    if not self._impl: raise ValueError("selectable freed")
+    pn_selectable_writable(self._impl)
+
+  def expired(self):
+    if not self._impl: raise ValueError("selectable freed")
+    pn_selectable_expired(self._impl)
+
+  def _is_registered(self):
+    if not self._impl: raise ValueError("selectable freed")
+    return pn_selectable_is_registered(self._impl)
+
+  def _set_registered(self, registered):
+    if not self._impl: raise ValueError("selectable freed")
+    pn_selectable_set_registered(self._impl, registered)
+
+  registered = property(_is_registered, _set_registered,
+                    doc="""
+The registered property may be get/set by an I/O polling system to
+indicate whether the fd has been registered or not.
+""")
+
+  @property
+  def is_terminal(self):
+    if not self._impl: return True
+    return pn_selectable_is_terminal(self._impl)
+
+  def free(self):
+    if self._impl:
+      del self.messenger._selectables[self.fileno()]
+      pn_selectable_free(self._impl)
+      self._impl = None
+
+  def __del__(self):
+    self.free()
+
 class DataException(ProtonException):
   """
   The DataException class is the root of the Data exception hierarchy.

Modified: qpid/proton/trunk/proton-c/include/proton/cproton.i
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/cproton.i?rev=1572744&r1=1572743&r2=1572744&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/cproton.i (original)
+++ qpid/proton/trunk/proton-c/include/proton/cproton.i Thu Feb 27 21:25:13 2014
@@ -1322,6 +1322,8 @@ typedef long long int int64_t;
 
 %include "proton/messenger.h"
 
+%include "proton/selectable.h"
+
 %include "proton/ssl.h"
 
 %ignore pn_decode_atoms;

Modified: qpid/proton/trunk/proton-c/include/proton/driver.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/driver.h?rev=1572744&r1=1572743&r2=1572744&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/driver.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/driver.h Thu Feb 27 21:25:13 2014
@@ -26,6 +26,7 @@
 #include <proton/error.h>
 #include <proton/engine.h>
 #include <proton/sasl.h>
+#include <proton/selectable.h>
 #include <proton/ssl.h>
 
 #ifdef __cplusplus

Modified: qpid/proton/trunk/proton-c/include/proton/driver_extras.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/driver_extras.h?rev=1572744&r1=1572743&r2=1572744&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/driver_extras.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/driver_extras.h Thu Feb 27 21:25:13 2014
@@ -27,6 +27,7 @@ extern "C" {
 #endif
 
 #include <proton/import_export.h>
+#include <proton/io.h>
 
 /** @file
  * Additional API for the Driver Layer.
@@ -36,12 +37,6 @@ extern "C" {
  *
  */
 
-#if defined(_WIN32) && ! defined(__CYGWIN__)
-typedef SOCKET pn_socket_t;
-#else
-typedef int pn_socket_t;
-#endif
-
 /** Create a listener using the existing file descriptor.
  *
  * @param[in] driver driver that will 'own' this listener
@@ -52,6 +47,8 @@ typedef int pn_socket_t;
  */
 PN_EXTERN pn_listener_t *pn_listener_fd(pn_driver_t *driver, pn_socket_t fd, void *context);
 
+PN_EXTERN pn_socket_t pn_listener_get_fd(pn_listener_t *listener);
+
 /** Create a connector using the existing file descriptor.
  *
  * @param[in] driver driver that will 'own' this connector.
@@ -62,6 +59,7 @@ PN_EXTERN pn_listener_t *pn_listener_fd(
  */
 PN_EXTERN pn_connector_t *pn_connector_fd(pn_driver_t *driver, pn_socket_t fd, void *context);
 
+PN_EXTERN pn_socket_t pn_connector_get_fd(pn_connector_t *connector);
 
 #ifdef __cplusplus
 }

Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1572744&r1=1572743&r2=1572744&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Thu Feb 27 21:25:13 2014
@@ -341,6 +341,8 @@ PN_EXTERN void *pn_connection_get_contex
  */
 PN_EXTERN void pn_connection_set_context(pn_connection_t *connection, void *context);
 
+PN_EXTERN pn_transport_t *pn_connection_transport(pn_connection_t *connection);
+
 
 // transport
 PN_EXTERN pn_error_t *pn_transport_error(pn_transport_t *transport);
@@ -496,6 +498,7 @@ PN_EXTERN pn_millis_t pn_transport_get_r
 PN_EXTERN uint64_t pn_transport_get_frames_output(const pn_transport_t *transport);
 PN_EXTERN uint64_t pn_transport_get_frames_input(const pn_transport_t *transport);
 PN_EXTERN bool pn_transport_quiesced(pn_transport_t *transport);
+PN_EXTERN bool pn_transport_closed(pn_transport_t *transport);
 PN_EXTERN void pn_transport_free(pn_transport_t *transport);
 
 // session

Added: qpid/proton/trunk/proton-c/include/proton/io.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/io.h?rev=1572744&view=auto
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/io.h (added)
+++ qpid/proton/trunk/proton-c/include/proton/io.h Thu Feb 27 21:25:13 2014
@@ -0,0 +1,62 @@
+#ifndef PROTON_IO_H
+#define PROTON_IO_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/import_export.h>
+#include <proton/error.h>
+#include <sys/types.h>
+#ifndef __cplusplus
+#include <stdbool.h>
+#endif
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#if defined(_WIN32) && ! defined(__CYGWIN__)
+typedef SOCKET pn_socket_t;
+#else
+typedef int pn_socket_t;
+#define INVALID_SOCKET (-1)
+#endif
+
+typedef struct pn_io_t pn_io_t;
+
+PN_EXTERN pn_io_t *pn_io(void);
+PN_EXTERN void pn_io_free(pn_io_t *io);
+PN_EXTERN pn_error_t *pn_io_error(pn_io_t *io);
+PN_EXTERN pn_socket_t pn_connect(pn_io_t *io, const char *host, const char *port);
+PN_EXTERN pn_socket_t pn_listen(pn_io_t *io, const char *host, const char *port);
+PN_EXTERN pn_socket_t pn_accept(pn_io_t *io, pn_socket_t socket, char *name, size_t size);
+PN_EXTERN void pn_close(pn_io_t *io, pn_socket_t socket);
+PN_EXTERN ssize_t pn_send(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size);
+PN_EXTERN ssize_t pn_recv(pn_io_t *io, pn_socket_t socket, void *buf, size_t size);
+PN_EXTERN int pn_pipe(pn_io_t *io, pn_socket_t *dest);
+PN_EXTERN ssize_t pn_read(pn_io_t *io, pn_socket_t socket, void *buf, size_t size);
+PN_EXTERN ssize_t pn_write(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* io.h */

Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1572744&r1=1572743&r2=1572744&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Thu Feb 27 21:25:13 2014
@@ -24,6 +24,7 @@
 
 #include <proton/import_export.h>
 #include <proton/message.h>
+#include <proton/selectable.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -175,6 +176,10 @@ PN_EXTERN bool pn_messenger_is_blocking(
  */
 PN_EXTERN int pn_messenger_set_blocking(pn_messenger_t *messenger, bool blocking);
 
+PN_EXTERN bool pn_messenger_is_passive(pn_messenger_t *messenger);
+
+PN_EXTERN int pn_messenger_set_passive(pn_messenger_t *messenger, bool passive);
+
 /** Frees a Messenger.
  *
  * @param[in] messenger the messenger to free, no longer valid on
@@ -611,6 +616,8 @@ PN_EXTERN int pn_messenger_route(pn_mess
 PN_EXTERN int pn_messenger_rewrite(pn_messenger_t *messenger, const char *pattern,
                                    const char *address);
 
+PN_EXTERN pn_selectable_t *pn_messenger_selectable(pn_messenger_t *messenger);
+
 #ifdef __cplusplus
 }
 #endif

Modified: qpid/proton/trunk/proton-c/include/proton/object.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/object.h?rev=1572744&r1=1572743&r2=1572744&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/object.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/object.h Thu Feb 27 21:25:13 2014
@@ -44,6 +44,8 @@ typedef struct pn_list_t pn_list_t;
 typedef struct pn_map_t pn_map_t;
 typedef struct pn_hash_t pn_hash_t;
 typedef struct pn_string_t pn_string_t;
+typedef void *(*pn_iterator_next_t)(void *state);
+typedef struct pn_iterator_t pn_iterator_t;
 
 typedef struct {
   void (*initialize)(void *);
@@ -84,6 +86,8 @@ PN_EXTERN int pn_list_add(pn_list_t *lis
 PN_EXTERN ssize_t pn_list_index(pn_list_t *list, void *value);
 PN_EXTERN bool pn_list_remove(pn_list_t *list, void *value);
 PN_EXTERN void pn_list_del(pn_list_t *list, int index, int n);
+PN_EXTERN void pn_list_clear(pn_list_t *list);
+PN_EXTERN void pn_list_iterator(pn_list_t *list, pn_iterator_t *iter);
 
 #define PN_REFCOUNT_KEY (0x2)
 #define PN_REFCOUNT_VALUE (0x4)
@@ -134,9 +138,6 @@ PN_EXTERN size_t pn_string_capacity(pn_s
 PN_EXTERN int pn_string_resize(pn_string_t *string, size_t size);
 PN_EXTERN int pn_string_copy(pn_string_t *string, pn_string_t *src);
 
-typedef void *(*pn_iterator_next_t)(void *state);
-typedef struct pn_iterator_t pn_iterator_t;
-
 PN_EXTERN pn_iterator_t *pn_iterator(void);
 PN_EXTERN void *pn_iterator_start(pn_iterator_t *iterator,
                                   pn_iterator_next_t next, size_t size);

Added: qpid/proton/trunk/proton-c/include/proton/selectable.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/selectable.h?rev=1572744&view=auto
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/selectable.h (added)
+++ qpid/proton/trunk/proton-c/include/proton/selectable.h Thu Feb 27 21:25:13 2014
@@ -0,0 +1,58 @@
+#ifndef PROTON_SELECTABLE_H
+#define PROTON_SELECTABLE_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/import_export.h>
+#include <proton/object.h>
+#ifndef __cplusplus
+#include <stdbool.h>
+#endif
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef pn_iterator_t pn_selectables_t;
+typedef struct pn_selectable_t pn_selectable_t;
+
+PN_EXTERN pn_selectables_t *pn_selectables(void);
+PN_EXTERN pn_selectable_t *pn_selectables_next(pn_selectables_t *selectables);
+PN_EXTERN void pn_selectables_free(pn_selectables_t *selectables);
+
+PN_EXTERN int pn_selectable_fd(pn_selectable_t *selectable);
+PN_EXTERN ssize_t pn_selectable_capacity(pn_selectable_t *selectable);
+PN_EXTERN ssize_t pn_selectable_pending(pn_selectable_t *selectable);
+PN_EXTERN pn_timestamp_t pn_selectable_deadline(pn_selectable_t *selectable);
+PN_EXTERN void pn_selectable_readable(pn_selectable_t *selectable);
+PN_EXTERN void pn_selectable_writable(pn_selectable_t *selectable);
+PN_EXTERN void pn_selectable_expired(pn_selectable_t *selectable);
+PN_EXTERN bool pn_selectable_is_registered(pn_selectable_t *selectable);
+PN_EXTERN void pn_selectable_set_registered(pn_selectable_t *selectable, bool registered);
+PN_EXTERN bool pn_selectable_is_terminal(pn_selectable_t *selectable);
+PN_EXTERN void pn_selectable_free(pn_selectable_t *selectable);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* selectable.h */

Added: qpid/proton/trunk/proton-c/include/proton/selector.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/selector.h?rev=1572744&view=auto
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/selector.h (added)
+++ qpid/proton/trunk/proton-c/include/proton/selector.h Thu Feb 27 21:25:13 2014
@@ -0,0 +1,53 @@
+#ifndef PROTON_SELECTOR_H
+#define PROTON_SELECTOR_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/import_export.h>
+#include <proton/selectable.h>
+#ifndef __cplusplus
+#include <stdbool.h>
+#endif
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define PN_READABLE (1)
+#define PN_WRITABLE (2)
+#define PN_EXPIRED (4)
+
+typedef struct pn_selector_t pn_selector_t;
+
+PN_EXTERN pn_selector_t *pn_selector(void);
+PN_EXTERN void pn_selector_free(pn_selector_t *selector);
+PN_EXTERN void pn_selector_add(pn_selector_t *selector, pn_selectable_t *selectable);
+PN_EXTERN void pn_selector_update(pn_selector_t *selector, pn_selectable_t *selectable);
+PN_EXTERN void pn_selector_remove(pn_selector_t *selector, pn_selectable_t *selectable);
+PN_EXTERN int pn_selector_select(pn_selector_t *select, int timeout);
+PN_EXTERN pn_selectable_t *pn_selector_next(pn_selector_t *select, int *events);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* selector.h */

Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1572744&r1=1572743&r2=1572744&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Thu Feb 27 21:25:13 2014
@@ -36,7 +36,9 @@ pn_dispatcher_t *pn_dispatcher(uint8_t f
 
   disp->frame_type = frame_type;
   disp->transport = transport;
-  disp->trace = PN_TRACE_OFF;
+  disp->trace = (pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) |
+    (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF) |
+    (pn_env_bool("PN_TRACE_DRV") ? PN_TRACE_DRV : PN_TRACE_OFF);
 
   disp->input = pn_buffer(1024);
   disp->fragment = 0;

Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1572744&r1=1572743&r2=1572744&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Thu Feb 27 21:25:13 2014
@@ -20,10 +20,13 @@
  */
 
 #include <proton/messenger.h>
-#include <proton/driver.h>
-#include <proton/util.h>
+#include <proton/io.h>
+#include <proton/sasl.h>
 #include <proton/ssl.h>
+#include <proton/util.h>
 #include <proton/object.h>
+#include <proton/selector.h>
+
 #include <assert.h>
 #include <ctype.h>
 #include <stdlib.h>
@@ -35,6 +38,7 @@
 #include "store.h"
 #include "transform.h"
 #include "subscription.h"
+#include "../selectable.h"
 
 typedef struct pn_link_ctx_t pn_link_ctx_t;
 
@@ -64,7 +68,15 @@ struct pn_messenger_t {
   char *trusted_certificates;
   int timeout;
   bool blocking;
-  pn_driver_t *driver;
+  bool passive;
+  pn_io_t *io;
+  pn_list_t *pending; // pending selectables
+  pn_selectable_t *interruptor;
+  bool interrupted;
+  pn_socket_t ctrl[2];
+  pn_list_t *listeners;
+  pn_list_t *connections;
+  pn_selector_t *selector;
   pn_collector_t *collector;
   int send_threshold;
   pn_link_credit_mode_t credit_mode;
@@ -93,22 +105,277 @@ struct pn_messenger_t {
   int connection_error;
 };
 
+#define CTX_HEAD                                \
+  pn_messenger_t *messenger;                    \
+  pn_selectable_t *selectable;                  \
+  bool pending;
+
+typedef struct pn_ctx_t {
+  CTX_HEAD
+} pn_ctx_t;
+
 typedef struct {
+  CTX_HEAD
   char *host;
   char *port;
   pn_subscription_t *subscription;
   pn_ssl_domain_t *domain;
 } pn_listener_ctx_t;
 
-static pn_listener_ctx_t *pn_listener_ctx(pn_listener_t *lnr,
-                                          pn_messenger_t *messenger,
+typedef struct {
+  CTX_HEAD
+  pn_connection_t *connection;
+  char *address;
+  char *scheme;
+  char *user;
+  char *pass;
+  char *host;
+  char *port;
+  pn_listener_ctx_t *listener;
+} pn_connection_ctx_t;
+
+static pn_connection_ctx_t *pni_context(pn_selectable_t *sel)
+{
+  assert(sel);
+  pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) pni_selectable_get_context(sel);
+  assert(ctx);
+  return ctx;
+}
+
+static pn_transport_t *pni_transport(pn_selectable_t *sel)
+{
+  return pn_connection_transport(pni_context(sel)->connection);
+}
+
+static ssize_t pni_connection_capacity(pn_selectable_t *sel)
+{
+  pn_transport_t *transport = pni_transport(sel);
+  ssize_t capacity = pn_transport_capacity(transport);
+  if (capacity < 0) {
+    if (pn_transport_closed(transport)) {
+      pni_selectable_set_terminal(sel, true);
+    }
+  }
+  return capacity;
+}
+
+bool pn_messenger_flow(pn_messenger_t *messenger);
+
+static ssize_t pni_connection_pending(pn_selectable_t *sel)
+{
+  pn_connection_ctx_t *ctx = pni_context(sel);
+  pn_messenger_flow(ctx->messenger);
+  pn_transport_t *transport = pni_transport(sel);
+  ssize_t pending = pn_transport_pending(transport);
+  if (pending < 0) {
+    if (pn_transport_closed(transport)) {
+      pni_selectable_set_terminal(sel, true);
+    }
+  }
+  return pending;
+}
+
+static pn_timestamp_t pni_connection_deadline(pn_selectable_t *sel)
+{
+  pn_connection_ctx_t *ctx = pni_context(sel);
+  return ctx->messenger->next_drain;
+}
+
+#include <errno.h>
+
+static void pn_error_report(const char *pfx, const char *error)
+{
+  fprintf(stderr, "%s ERROR %s\n", pfx, error);
+}
+
+void pni_modified(pn_ctx_t *ctx)
+{
+  pn_messenger_t *m = ctx->messenger;
+  pn_selectable_t *sel = ctx->selectable;
+  if (pn_selectable_is_registered(sel) && !ctx->pending) {
+    pn_list_add(m->pending, sel);
+    ctx->pending = true;
+  }
+}
+
+void pni_conn_modified(pn_connection_ctx_t *ctx)
+{
+  pni_modified((pn_ctx_t *) ctx);
+}
+
+void pni_lnr_modified(pn_listener_ctx_t *lnr)
+{
+  pni_modified((pn_ctx_t *) lnr);
+}
+
+int pn_messenger_process_events(pn_messenger_t *messenger);
+
+static void pni_connection_readable(pn_selectable_t *sel)
+{
+  pn_connection_ctx_t *context = pni_context(sel);
+  pn_messenger_t *messenger = context->messenger;
+  pn_connection_t *connection = context->connection;
+  pn_transport_t *transport = pni_transport(sel);
+  ssize_t capacity = pn_transport_capacity(transport);
+  if (capacity > 0) {
+    ssize_t n = pn_recv(messenger->io, pn_selectable_fd(sel),
+                        pn_transport_tail(transport), capacity);
+    if (n <= 0) {
+      if (errno != EAGAIN && errno != EWOULDBLOCK) {
+        if (n < 0) perror("recv");
+        pn_transport_close_tail(transport);
+        if (!(pn_connection_state(connection) & PN_REMOTE_CLOSED)) {
+          pn_error_report("CONNECTION", "connection aborted (remote)");
+        }
+      }
+    } else {
+      pn_transport_process(transport, (size_t) n);
+    }
+  }
+
+  pn_messenger_process_events(messenger);
+  pn_messenger_flow(messenger);
+  messenger->worked = true;
+  pni_conn_modified(context);
+}
+
+static void pni_connection_writable(pn_selectable_t *sel)
+{
+  pn_connection_ctx_t *context = pni_context(sel);
+  pn_messenger_t *messenger = context->messenger;
+  pn_transport_t *transport = pni_transport(sel);
+  ssize_t pending = pn_transport_pending(transport);
+  if (pending > 0) {
+    ssize_t n = pn_send(messenger->io, pn_selectable_fd(sel),
+                        pn_transport_head(transport), pending);
+    if (n < 0) {
+      if (errno != EAGAIN && errno != EWOULDBLOCK) {
+        perror("send");
+        pn_transport_close_head(transport);
+      }
+    } else {
+      pn_transport_pop(transport, n);
+    }
+  }
+
+  pn_messenger_process_events(messenger);
+  pn_messenger_flow(messenger);
+  messenger->worked = true;
+  pni_conn_modified(context);
+}
+
+static void pni_connection_expired(pn_selectable_t *sel)
+{
+  pn_connection_ctx_t *ctx = pni_context(sel);
+  pn_messenger_flow(ctx->messenger);
+  ctx->messenger->worked = true;
+  pni_conn_modified(ctx);
+}
+
+static void pni_messenger_reclaim(pn_messenger_t *messenger, pn_connection_t *conn);
+
+static void pni_connection_finalize(pn_selectable_t *sel)
+{
+  pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) pni_selectable_get_context(sel);
+  pn_socket_t fd = pn_selectable_fd(sel);
+  pn_close(ctx->messenger->io, fd);
+  pn_list_remove(ctx->messenger->pending, sel);
+  pni_messenger_reclaim(ctx->messenger, ctx->connection);
+}
+
+static ssize_t pni_listener_capacity(pn_selectable_t *sel)
+{
+  return 1;
+}
+
+static ssize_t pni_listener_pending(pn_selectable_t *sel)
+{
+  return 0;
+}
+
+static pn_timestamp_t pni_listener_deadline(pn_selectable_t *sel)
+{
+  return 0;
+}
+
+pn_connection_t *pn_messenger_connection(pn_messenger_t *messenger,
+                                         pn_socket_t sock,
+                                         const char *scheme,
+                                         char *user,
+                                         char *pass,
+                                         char *host,
+                                         char *port,
+                                         pn_listener_ctx_t *lnr);
+
+static void pni_listener_readable(pn_selectable_t *sel)
+{
+  pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) pni_selectable_get_context(sel);
+  pn_subscription_t *sub = ctx->subscription;
+  const char *scheme = pn_subscription_scheme(sub);
+  char name[1024];
+  pn_socket_t sock = pn_accept(ctx->messenger->io, pn_selectable_fd(sel), name, 1024);
+
+  pn_transport_t *t = pn_transport();
+
+  pn_ssl_t *ssl = pn_ssl(t);
+  pn_ssl_init(ssl, ctx->domain, NULL);
+  pn_sasl_t *sasl = pn_sasl(t);
+
+  pn_sasl_mechanisms(sasl, "ANONYMOUS");
+  pn_sasl_server(sasl);
+  pn_sasl_done(sasl, PN_SASL_OK);
+
+  pn_connection_t *conn = pn_messenger_connection(ctx->messenger, sock, scheme, NULL, NULL, NULL, NULL, ctx);
+  pn_transport_bind(t, conn);
+}
+
+static void pni_listener_writable(pn_selectable_t *sel)
+{
+  // do nothing
+}
+
+static void pni_listener_expired(pn_selectable_t *sel)
+{
+  // do nothing
+}
+
+static void pn_listener_ctx_free(pn_messenger_t *messenger, pn_listener_ctx_t *ctx);
+
+static void pni_listener_finalize(pn_selectable_t *sel)
+{
+  pn_listener_ctx_t *lnr = (pn_listener_ctx_t *) pni_selectable_get_context(sel);
+  pn_messenger_t *messenger = lnr->messenger;
+  pn_close(messenger->io, pn_selectable_fd(sel));
+  pn_list_remove(messenger->pending, sel);
+  pn_listener_ctx_free(messenger, lnr);
+}
+
+static bool pn_streq(const char *a, const char *b)
+{
+  return a == b || (a && b && !strcmp(a, b));
+}
+
+static const char *default_port(const char *scheme)
+{
+  if (scheme && pn_streq(scheme, "amqps"))
+    return "5671";
+  else
+    return "5672";
+}
+
+static pn_listener_ctx_t *pn_listener_ctx(pn_messenger_t *messenger,
                                           const char *scheme,
                                           const char *host,
                                           const char *port)
 {
-  pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) pn_listener_context(lnr);
-  assert(!ctx);
-  ctx = (pn_listener_ctx_t *) malloc(sizeof(pn_listener_ctx_t));
+  pn_socket_t socket = pn_listen(messenger->io, host, port ? port : default_port(scheme));
+  if (socket == INVALID_SOCKET) {
+    pn_error_copy(messenger->error, pn_io_error(messenger->io));
+    return NULL;
+  }
+
+  pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) malloc(sizeof(pn_listener_ctx_t));
+  ctx->messenger = messenger;
   ctx->domain = pn_ssl_domain(PN_SSL_MODE_SERVER);
   if (messenger->certificate) {
     int err = pn_ssl_domain_set_credentials(ctx->domain, messenger->certificate,
@@ -119,6 +386,7 @@ static pn_listener_ctx_t *pn_listener_ct
       pn_error_format(messenger->error, PN_ERR, "invalid credentials");
       pn_ssl_domain_free(ctx->domain);
       free(ctx);
+      pn_close(messenger->io, socket);
       return NULL;
     }
   }
@@ -131,49 +399,68 @@ static pn_listener_ctx_t *pn_listener_ct
   ctx->subscription = sub;
   ctx->host = pn_strdup(host);
   ctx->port = pn_strdup(port);
-  pn_listener_set_context(lnr, ctx);
+
+  pn_selectable_t *selectable = pni_selectable(pni_listener_capacity,
+                                               pni_listener_pending,
+                                               pni_listener_deadline,
+                                               pni_listener_readable,
+                                               pni_listener_writable,
+                                               pni_listener_expired,
+                                               pni_listener_finalize);
+  pni_selectable_set_fd(selectable, socket);
+  pni_selectable_set_context(selectable, ctx);
+  pn_list_add(messenger->pending, selectable);
+  ctx->selectable = selectable;
+  ctx->pending = true;
+
+  pn_list_add(messenger->listeners, ctx);
   return ctx;
 }
 
-static void pn_listener_ctx_free(pn_listener_t *lnr)
+static void pn_listener_ctx_free(pn_messenger_t *messenger, pn_listener_ctx_t *ctx)
 {
-  pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) pn_listener_context(lnr);
+  pn_list_remove(messenger->listeners, ctx);
   // XXX: subscriptions are freed when the messenger is freed pn_subscription_free(ctx->subscription);
   free(ctx->host);
   free(ctx->port);
   pn_ssl_domain_free(ctx->domain);
   free(ctx);
-  pn_listener_set_context(lnr, NULL);
 }
 
-typedef struct {
-  char *address;
-  char *scheme;
-  char *user;
-  char *pass;
-  char *host;
-  char *port;
-  pn_connector_t *connector;
-} pn_connection_ctx_t;
-
-static pn_connection_ctx_t *pn_connection_ctx(pn_connection_t *conn,
-                                              pn_connector_t *connector,
+static pn_connection_ctx_t *pn_connection_ctx(pn_messenger_t *messenger,
+                                              pn_connection_t *conn,
+                                              pn_socket_t sock,
                                               const char *scheme,
                                               const char *user,
                                               const char *pass,
                                               const char *host,
-                                              const char *port)
+                                              const char *port,
+                                              pn_listener_ctx_t *lnr)
 {
   pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) pn_connection_get_context(conn);
   assert(!ctx);
   ctx = (pn_connection_ctx_t *) malloc(sizeof(pn_connection_ctx_t));
+  ctx->messenger = messenger;
+  ctx->connection = conn;
+  ctx->selectable = pni_selectable(pni_connection_capacity,
+                                   pni_connection_pending,
+                                   pni_connection_deadline,
+                                   pni_connection_readable,
+                                   pni_connection_writable,
+                                   pni_connection_expired,
+                                   pni_connection_finalize);
+  pni_selectable_set_fd(ctx->selectable, sock);
+  pni_selectable_set_context(ctx->selectable, ctx);
+  pn_list_add(messenger->pending, ctx->selectable);
+  ctx->pending = true;
   ctx->scheme = pn_strdup(scheme);
   ctx->user = pn_strdup(user);
   ctx->pass = pn_strdup(pass);
   ctx->host = pn_strdup(host);
   ctx->port = pn_strdup(port);
-  ctx->connector = connector;
+  ctx->listener = lnr;
   pn_connection_set_context(conn, ctx);
+
   return ctx;
 }
 
@@ -181,6 +468,7 @@ static void pn_connection_ctx_free(pn_co
 {
   pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) pn_connection_get_context(conn);
   if (ctx) {
+    pni_selectable_set_context(ctx->selectable, NULL);
     free(ctx->scheme);
     free(ctx->user);
     free(ctx->pass);
@@ -238,10 +526,10 @@ static void link_ctx_setup( pn_messenger
 static void link_ctx_release( pn_messenger_t *messenger, pn_link_t *link )
 {
   if (pn_link_is_receiver(link)) {
+    pn_link_ctx_t *ctx = (pn_link_ctx_t *) pn_link_get_context( link );
+    if (!ctx) return;
     assert( messenger->receivers > 0 );
     messenger->receivers--;
-    pn_link_ctx_t *ctx = (pn_link_ctx_t *) pn_link_get_context( link );
-    assert( ctx );
     if (pn_link_get_drain(link)) {
       pn_link_set_drain(link, false);
       assert( messenger->draining > 0 );
@@ -254,6 +542,45 @@ static void link_ctx_release( pn_messeng
   }
 }
 
+static ssize_t pni_interruptor_capacity(pn_selectable_t *sel)
+{
+  return 1024;
+}
+
+static ssize_t pni_interruptor_pending(pn_selectable_t *sel)
+{
+  return 0;
+}
+
+static pn_timestamp_t pni_interruptor_deadline(pn_selectable_t *sel)
+{
+  return 0;
+}
+
+static void pni_interruptor_readable(pn_selectable_t *sel)
+{
+  pn_messenger_t *messenger = (pn_messenger_t *) pni_selectable_get_context(sel);
+  char buf[1024];
+  pn_read(messenger->io, pn_selectable_fd(sel), buf, 1024);
+  messenger->interrupted = true;
+}
+
+static void pni_interruptor_writable(pn_selectable_t *sel)
+{
+  // do nothing
+}
+
+static void pni_interruptor_expired(pn_selectable_t *sel)
+{
+  // do nothing
+}
+
+static void pni_interruptor_finalize(pn_selectable_t *sel)
+{
+  pn_messenger_t *messenger = (pn_messenger_t *) pni_selectable_get_context(sel);
+  messenger->interruptor = NULL;
+}
+
 pn_messenger_t *pn_messenger(const char *name)
 {
   pn_messenger_t *m = (pn_messenger_t *) malloc(sizeof(pn_messenger_t));
@@ -266,7 +593,22 @@ pn_messenger_t *pn_messenger(const char 
     m->trusted_certificates = NULL;
     m->timeout = -1;
     m->blocking = true;
-    m->driver = pn_driver();
+    m->passive = false;
+    m->io = pn_io();
+    m->pending = pn_list(0, 0);
+    m->interruptor = pni_selectable
+      (pni_interruptor_capacity, pni_interruptor_pending,
+       pni_interruptor_deadline, pni_interruptor_readable,
+       pni_interruptor_writable, pni_interruptor_expired,
+       pni_interruptor_finalize);
+    pn_list_add(m->pending, m->interruptor);
+    m->interrupted = false;
+    pn_pipe(m->io, m->ctrl);
+    pni_selectable_set_fd(m->interruptor, m->ctrl[0]);
+    pni_selectable_set_context(m->interruptor, m);
+    m->listeners = pn_list(0, 0);
+    m->connections = pn_list(0, 0);
+    m->selector = pn_selector();
     m->collector = pn_collector();
     m->credit_mode = LINK_CREDIT_EXPLICIT;
     m->credit_batch = 1024;
@@ -379,21 +721,51 @@ int pn_messenger_set_blocking(pn_messeng
   return 0;
 }
 
-static void pni_messenger_reclaim(pn_messenger_t *messenger, pn_connection_t *conn);
+bool pn_messenger_is_passive(pn_messenger_t *messenger)
+{
+  assert(messenger);
+  return messenger->passive;
+}
+
+int pn_messenger_set_passive(pn_messenger_t *messenger, bool passive)
+{
+  messenger->passive = passive;
+  return 0;
+}
+
+pn_selectable_t *pn_messenger_selectable(pn_messenger_t *messenger)
+{
+  assert(messenger);
+  pn_messenger_process_events(messenger);
+  pn_list_t *p = messenger->pending;
+  size_t n = pn_list_size(p);
+  if (n) {
+    pn_selectable_t *s = (pn_selectable_t *) pn_list_get(p, n - 1);
+    pn_list_del(p, n-1, 1);
+    // this is a total hack, messenger has selectables whose context
+    // are the messenger itself and whose context share a common
+    // prefix that is described by pn_ctx_t
+    void *c = pni_selectable_get_context(s);
+    if (c != messenger) {
+      pn_ctx_t *ctx = (pn_ctx_t *) c;
+      ctx->pending = false;
+    }
+    return s;
+  } else {
+    return NULL;
+  }
+}
 
-static void pni_driver_reclaim(pn_messenger_t *messenger, pn_driver_t *driver)
+static void pni_reclaim(pn_messenger_t *messenger)
 {
-  pn_listener_t *l = pn_listener_head(driver);
-  while (l) {
-    pn_listener_ctx_free(l);
-    l = pn_listener_next(l);
+  while (pn_list_size(messenger->listeners)) {
+    pn_listener_ctx_t *l = (pn_listener_ctx_t *) pn_list_get(messenger->listeners, 0);
+    pn_listener_ctx_free(messenger, l);
   }
 
-  pn_connector_t *c = pn_connector_head(driver);
-  while (c) {
-    pn_connection_t *conn = pn_connector_connection(c);
-    pni_messenger_reclaim(messenger, conn);
-    c = pn_connector_next(c);
+  while (pn_list_size(messenger->connections)) {
+    pn_connection_t *c = (pn_connection_t *) pn_list_get(messenger->connections, 0);
+    pni_messenger_reclaim(messenger, c);
   }
 }
 
@@ -408,8 +780,14 @@ void pn_messenger_free(pn_messenger_t *m
     free(messenger->private_key);
     free(messenger->password);
     free(messenger->trusted_certificates);
-    pni_driver_reclaim(messenger, messenger->driver);
-    pn_driver_free(messenger->driver);
+    pni_reclaim(messenger);
+    pn_free(messenger->pending);
+    pn_selectable_free(messenger->interruptor);
+    pn_close(messenger->io, messenger->ctrl[0]);
+    pn_close(messenger->io, messenger->ctrl[1]);
+    pn_free(messenger->listeners);
+    pn_free(messenger->connections);
+    pn_selector_free(messenger->selector);
     pn_collector_free(messenger->collector);
     pn_error_free(messenger->error);
     pni_store_free(messenger->incoming);
@@ -419,6 +797,7 @@ void pn_messenger_free(pn_messenger_t *m
     pn_free(messenger->routes);
     pn_free(messenger->credited);
     pn_free(messenger->blocked);
+    pn_free(messenger->io);
     free(messenger);
   }
 }
@@ -443,7 +822,10 @@ pn_error_t *pn_messenger_error(pn_messen
 bool pn_messenger_flow(pn_messenger_t *messenger)
 {
   bool updated = false;
-  if (messenger->receivers == 0) return updated;
+  if (messenger->receivers == 0) {
+    messenger->next_drain = 0;
+    return updated;
+  }
 
   if (messenger->credit_mode == LINK_CREDIT_AUTO) {
     // replenish, but limit the max total messages buffered
@@ -502,17 +884,11 @@ bool pn_messenger_flow(pn_messenger_t *m
   return updated;
 }
 
-static void pn_error_report(const char *pfx, const char *error)
-{
-  fprintf(stderr, "%s ERROR %s\n", pfx, error);
-}
-
 static int pn_transport_config(pn_messenger_t *messenger,
-                               pn_connector_t *connector,
                                pn_connection_t *connection)
 {
   pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) pn_connection_get_context(connection);
-  pn_transport_t *transport = pn_connector_transport(connector);
+  pn_transport_t *transport = pn_connection_transport(connection);
   if (ctx->scheme && !strcmp(ctx->scheme, "amqps")) {
     pn_ssl_domain_t *d = pn_ssl_domain(PN_SSL_MODE_CLIENT);
     if (messenger->certificate && messenger->private_key) {
@@ -608,6 +984,7 @@ int pni_pump_in(pn_messenger_t *messenge
   messenger->distributed--;
 
   pn_link_t *link = receiver;
+
   // replenish if low (< 20% maximum batch) and credit available
   if (!pn_link_get_drain(link) && pn_list_size(messenger->blocked) == 0 && messenger->credit > 0) {
     const int max = per_link_credit(messenger);
@@ -671,25 +1048,31 @@ void pni_messenger_reclaim(pn_messenger_
     link = pn_link_next(link, 0);
   }
 
+  pn_list_remove(messenger->connections, conn);
   pn_connection_ctx_free(conn);
+  pn_transport_free(pn_connection_transport(conn));
   pn_connection_free(conn);
 }
 
 pn_connection_t *pn_messenger_connection(pn_messenger_t *messenger,
-                                         pn_connector_t *connector,
+                                         pn_socket_t sock,
                                          const char *scheme,
                                          char *user,
                                          char *pass,
                                          char *host,
-                                         char *port)
+                                         char *port,
+                                         pn_listener_ctx_t *lnr)
 {
   pn_connection_t *connection = pn_connection();
   if (!connection) return NULL;
   pn_connection_collect(connection, messenger->collector);
-  pn_connection_ctx(connection, connector, scheme, user, pass, host, port);
+  pn_connection_ctx(messenger, connection, sock, scheme, user, pass, host, port, lnr);
 
   pn_connection_set_container(connection, messenger->name);
   pn_connection_set_hostname(connection, host);
+
+  pn_list_add(messenger->connections, connection);
+
   return connection;
 }
 
@@ -697,13 +1080,12 @@ void pn_messenger_process_connection(pn_
 {
   pn_connection_t *conn = pn_event_connection(event);
   pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) pn_connection_get_context(conn);
-  pn_connector_t *ctor = ctx ? ctx->connector : NULL;
 
   if (pn_connection_state(conn) & PN_LOCAL_UNINIT) {
     pn_connection_open(conn);
   }
 
-  if (ctor && pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
+  if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
     pn_condition_t *condition = pn_connection_remote_condition(conn);
     pn_condition_report("CONNECTION", condition);
     pn_connection_close(conn);
@@ -712,17 +1094,15 @@ void pn_messenger_process_connection(pn_
       char buf[1024];
       sprintf(buf, "%i", pn_condition_redirect_port(condition));
 
-      pn_connector_set_connection(ctor, NULL);
-      pn_driver_t *driver = messenger->driver;
-      pn_connector_t *connector = pn_connector(driver, host, buf, NULL);
-      pn_transport_unbind(pn_connector_transport(ctor));
+      pn_close(messenger->io, pn_selectable_fd(ctx->selectable));
+      pn_socket_t sock = pn_connect(messenger->io, host, buf);
+      pni_selectable_set_fd(ctx->selectable, sock);
+      pn_transport_unbind(pn_connection_transport(conn));
       pn_connection_reset(conn);
-      pn_transport_config(messenger, connector, conn);
-      pn_connector_set_connection(connector, conn);
+      pn_transport_t *t = pn_transport();
+      pn_transport_bind(t, conn);
+      pn_transport_config(messenger, conn);
     }
-  } else if ((ctor == NULL || pn_connector_closed(ctor))
-             && !(pn_connection_state(conn) & PN_REMOTE_CLOSED)) {
-    pn_error_report("CONNECTION", "connection aborted");
   }
 }
 
@@ -744,17 +1124,15 @@ void pn_messenger_process_link(pn_messen
   pn_link_t *link = pn_event_link(event);
   pn_connection_t *conn = pn_event_connection(event);
   pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) pn_connection_get_context(conn);
-  pn_connector_t *ctor = ctx ? ctx->connector : NULL;
 
   if (pn_link_state(link) & PN_LOCAL_UNINIT) {
     pn_terminus_copy(pn_link_source(link), pn_link_remote_source(link));
     pn_terminus_copy(pn_link_target(link), pn_link_remote_target(link));
     link_ctx_setup( messenger, conn, link );
     pn_link_open(link);
-    if (pn_link_is_receiver(link) && ctor) {
-      pn_listener_t *listener = pn_connector_listener(ctor);
-      pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) pn_listener_context(listener);
-      ((pn_link_ctx_t *)pn_link_get_context(link))->subscription = ctx ? ctx->subscription : NULL;
+    if (pn_link_is_receiver(link)) {
+      pn_listener_ctx_t *lnr = ctx->listener;
+      ((pn_link_ctx_t *)pn_link_get_context(link))->subscription = lnr ? lnr->subscription : NULL;
     }
   }
 
@@ -829,10 +1207,7 @@ void pn_messenger_process_transport(pn_m
   pn_connection_t *conn = pn_event_connection(event);
   pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) pn_connection_get_context(conn);
   if (ctx) {
-    pn_connector_t *ctor = ctx->connector;
-    if (ctor) {
-      pn_connector_process(ctor);
-    }
+    pni_conn_modified(ctx);
   }
 }
 
@@ -870,57 +1245,28 @@ int pn_messenger_process_events(pn_messe
   return processed;
 }
 
-void pn_messenger_process_listeners(pn_messenger_t *messenger)
+int pn_messenger_process(pn_messenger_t *messenger)
 {
-  pn_listener_t *l;
-  while ((l = pn_driver_listener(messenger->driver))) {
-    messenger->worked = true;
-    pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) pn_listener_context(l);
-    pn_subscription_t *sub = ctx->subscription;
-    const char *scheme = pn_subscription_scheme(sub);
-    pn_connector_t *c = pn_listener_accept(l);
-    pn_transport_t *t = pn_connector_transport(c);
-
-    pn_ssl_t *ssl = pn_ssl(t);
-    pn_ssl_init(ssl, ctx->domain, NULL);
-
-    pn_sasl_t *sasl = pn_sasl(t);
-    pn_sasl_mechanisms(sasl, "ANONYMOUS");
-    pn_sasl_server(sasl);
-    pn_sasl_done(sasl, PN_SASL_OK);
-    pn_connection_t *conn =
-      pn_messenger_connection(messenger, c, scheme, NULL, NULL, NULL, NULL);
-    pn_connector_set_connection(c, conn);
-  }
-}
-
-void pn_messenger_process_connectors(pn_messenger_t *messenger)
-{
-  pn_connector_t *c;
-  while ((c = pn_driver_connector(messenger->driver))) {
-    messenger->worked = true;
-    pn_connector_process(c);
-    pn_connection_t *conn = pn_connector_connection(c);
-    if (pn_connector_closed(c)) {
-      pn_connector_free(c);
-      if (conn) {
-        pni_messenger_reclaim(messenger, conn);
-      }
+  pn_selectable_t *sel;
+  int events;
+  while ((sel = pn_selector_next(messenger->selector, &events))) {
+    if (events & PN_READABLE) {
+      pn_selectable_readable(sel);
+    }
+    if (events & PN_WRITABLE) {
+      pn_selectable_writable(sel);
+    }
+    if (events & PN_EXPIRED) {
+      pn_selectable_expired(sel);
     }
   }
-}
 
-int pn_messenger_process(pn_messenger_t *messenger)
-{
-  pn_messenger_process_listeners(messenger);
-  pn_messenger_process_connectors(messenger);
-  int first = pn_messenger_process_events(messenger);
-  if (first < 0) return first;
-  // Update the credit scheduler.
-  pn_messenger_flow(messenger);
-  int second = pn_messenger_process_events(messenger);
-  if (second < 0) return second;
-  return first + second;
+  if (messenger->interrupted) {
+    messenger->interrupted = false;
+    return PN_INTR;
+  } else {
+    return 0;
+  }
 }
 
 pn_timestamp_t pn_messenger_deadline(pn_messenger_t *messenger)
@@ -930,15 +1276,50 @@ pn_timestamp_t pn_messenger_deadline(pn_
   return messenger->next_drain;
 }
 
+int pni_wait(pn_messenger_t *messenger, int timeout)
+{
+  bool wake = false;
+  pn_selectable_t *sel;
+  while ((sel = pn_messenger_selectable(messenger))) {
+    if (pn_selectable_is_terminal(sel)) {
+      if (pn_selectable_is_registered(sel)) {
+        pn_selector_remove(messenger->selector, sel);
+      }
+      pn_selectable_free(sel);
+      // we can't wait if we end up freeing anything because we could
+      // be waiting on the stopped predicate which might become true
+      // as a result of the free
+      wake = true;
+    } else if (pn_selectable_is_registered(sel)) {
+      pn_selector_update(messenger->selector, sel);
+    } else {
+      pn_selector_add(messenger->selector, sel);
+      pn_selectable_set_registered(sel, true);
+    }
+  }
+
+  if (wake) return 0;
+
+  return pn_selector_select(messenger->selector, timeout);
+}
+
 int pn_messenger_tsync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *), int timeout)
 {
+  if (messenger->passive) {
+    bool pred = predicate(messenger);
+    return pred ? 0 : PN_INPROGRESS;
+  }
+
   pn_timestamp_t now = pn_i_now();
   long int deadline = now + timeout;
   bool pred;
 
   while (true) {
-    pn_messenger_process(messenger);
+    int error = pn_messenger_process(messenger);
     pred = predicate(messenger);
+    if (error == PN_INTR) {
+      return pred ? 0 : PN_INTR;
+    }
     int remaining = deadline - now;
     if (pred || (timeout >= 0 && remaining < 0)) break;
 
@@ -951,16 +1332,12 @@ int pn_messenger_tsync(pn_messenger_t *m
         remaining = (remaining < 0) ? delay : pn_min( remaining, delay );
       }
     }
-    int error = pn_driver_wait(messenger->driver, remaining);
-    if (error && error != PN_INTR) return error;
+    error = pni_wait(messenger, remaining);
+    if (error) return error;
 
     if (timeout >= 0) {
       now = pn_i_now();
     }
-
-    if (error == PN_INTR) {
-      return pred ? 0 : PN_INTR;
-    }
   }
 
   return pred ? 0 : PN_TIMEOUT;
@@ -989,51 +1366,32 @@ int pn_messenger_start(pn_messenger_t *m
 
 bool pn_messenger_stopped(pn_messenger_t *messenger)
 {
-  return pn_connector_head(messenger->driver) == NULL;
+  return pn_list_size(messenger->connections) == 0 && pn_list_size(messenger->listeners) == 0;
 }
 
 int pn_messenger_stop(pn_messenger_t *messenger)
 {
   if (!messenger) return PN_ARG_ERR;
 
-  pn_connector_t *ctor = pn_connector_head(messenger->driver);
-  while (ctor) {
-    pn_connection_t *conn = pn_connector_connection(ctor);
+  for (size_t i = 0; i < pn_list_size(messenger->connections); i++) {
+    pn_connection_t *conn = (pn_connection_t *) pn_list_get(messenger->connections, i);
     pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
     while (link) {
       pn_link_close(link);
       link = pn_link_next(link, PN_LOCAL_ACTIVE);
     }
     pn_connection_close(conn);
-    ctor = pn_connector_next(ctor);
   }
 
-  pn_listener_t *l = pn_listener_head(messenger->driver);
-  while (l) {
-    pn_listener_close(l);
-    pn_listener_t *prev = l;
-    l = pn_listener_next(l);
-    pn_listener_ctx_free(prev);
-    pn_listener_close(prev);
-    pn_listener_free(prev);
+  for (size_t i = 0; i < pn_list_size(messenger->listeners); i++) {
+    pn_listener_ctx_t *lnr = (pn_listener_ctx_t *) pn_list_get(messenger->listeners, i);
+    pni_selectable_set_terminal(lnr->selectable, true);
+    pni_lnr_modified(lnr);
   }
 
   return pn_messenger_sync(messenger, pn_messenger_stopped);
 }
 
-static bool pn_streq(const char *a, const char *b)
-{
-  return a == b || (a && b && !strcmp(a, b));
-}
-
-static const char *default_port(const char *scheme)
-{
-  if (scheme && pn_streq(scheme, "amqps"))
-    return "5671";
-  else
-    return "5672";
-}
-
 static void pni_parse(pn_address_t *address)
 {
   address->passive = false;
@@ -1084,28 +1442,14 @@ pn_connection_t *pn_messenger_resolve(pn
   *name = messenger->address.name;
 
   if (passive) {
-    pn_listener_t *lnr = pn_listener_head(messenger->driver);
-    while (lnr) {
-      pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) pn_listener_context(lnr);
+    for (size_t i = 0; i < pn_list_size(messenger->listeners); i++) {
+      pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) pn_list_get(messenger->listeners, i);
       if (pn_streq(host, ctx->host) && pn_streq(port, ctx->port)) {
         return NULL;
       }
-      lnr = pn_listener_next(lnr);
-    }
-
-    lnr = pn_listener(messenger->driver, host, port ? port : default_port(scheme), NULL);
-    if (lnr) {
-      pn_listener_ctx_t *ctx = pn_listener_ctx(lnr, messenger, scheme, host, port);
-      if (!ctx) {
-        pn_listener_close(lnr);
-        pn_listener_free(lnr);
-      }
-    } else {
-      pn_error_format(messenger->error, PN_ERR,
-                      "unable to bind to address %s: %s:%s", address, host, port,
-                      pn_driver_error(messenger->driver));
     }
 
+    pn_listener_ctx(messenger, scheme, host, port);
     return NULL;
   }
 
@@ -1121,9 +1465,8 @@ pn_connection_t *pn_messenger_resolve(pn
     strcat(domain, port);
   }
 
-  pn_connector_t *ctor = pn_connector_head(messenger->driver);
-  while (ctor) {
-    pn_connection_t *connection = pn_connector_connection(ctor);
+  for (size_t i = 0; i < pn_list_size(messenger->connections); i++) {
+    pn_connection_t *connection = (pn_connection_t *) pn_list_get(messenger->connections, i);
     pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) pn_connection_get_context(connection);
     if (pn_streq(scheme, ctx->scheme) && pn_streq(user, ctx->user) &&
         pn_streq(pass, ctx->pass) && pn_streq(host, ctx->host) &&
@@ -1134,32 +1477,27 @@ pn_connection_t *pn_messenger_resolve(pn
     if (pn_streq(container, domain)) {
       return connection;
     }
-    ctor = pn_connector_next(ctor);
   }
 
-  pn_connector_t *connector = pn_connector(messenger->driver, host,
-                                           port ? port : default_port(scheme),
-                                           NULL);
-  if (!connector) {
-    pn_error_format(messenger->error, PN_ERR,
-                    "unable to connect to %s: %s", address,
-                    pn_driver_error(messenger->driver));
+  pn_socket_t sock = pn_connect(messenger->io, host, port ? port : default_port(scheme));
+  if (sock == INVALID_SOCKET) {
     return NULL;
   }
 
   pn_connection_t *connection =
-    pn_messenger_connection(messenger, connector, scheme, user, pass, host, port);
-  err = pn_transport_config(messenger, connector, connection);
+    pn_messenger_connection(messenger, sock, scheme, user, pass, host, port, NULL);
+  pn_transport_t *transport = pn_transport();
+  pn_transport_bind(transport, connection);
+  err = pn_transport_config(messenger, connection);
   if (err) {
-    pni_messenger_reclaim(messenger, connection);
-    pn_connector_close(connector);
-    pn_connector_free(connector);
+    pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) pn_connection_get_context(connection);
+    pn_selectable_t *sel = ctx->selectable;
+    pn_selectable_free(sel);
     messenger->connection_error = err;
     return NULL;
   }
 
   pn_connection_open(connection);
-  pn_connector_set_connection(connector, connection);
 
   return connection;
 }
@@ -1235,21 +1573,10 @@ pn_subscription_t *pn_messenger_subscrib
   char *port = messenger->address.port;
 
   if (passive) {
-    pn_listener_t *lnr = pn_listener(messenger->driver, host,
-                                     port ? port : default_port(scheme), NULL);
-    if (lnr) {
-      pn_listener_ctx_t *ctx = pn_listener_ctx(lnr, messenger, scheme, host, port);
-      if (ctx) {
-        return ctx->subscription;
-      } else {
-        pn_listener_close(lnr);
-        pn_listener_free(lnr);
-        return NULL;
-      }
+    pn_listener_ctx_t *ctx = pn_listener_ctx(messenger, scheme, host, port);
+    if (ctx) {
+      return ctx->subscription;
     } else {
-      pn_error_format(messenger->error, PN_ERR,
-                      "unable to subscribe to address %s: %s", source,
-                      pn_driver_error(messenger->driver));
       return NULL;
     }
   } else {
@@ -1499,20 +1826,17 @@ bool pn_messenger_sent(pn_messenger_t *m
 {
   int total = pni_store_size(messenger->outgoing);
 
-  pn_connector_t *ctor = pn_connector_head(messenger->driver);
-  while (ctor) {
-
+  for (size_t i = 0; i < pn_list_size(messenger->connections); i++)
+  {
+    pn_connection_t *conn = (pn_connection_t *) pn_list_get(messenger->connections, i);
     // check if transport is done generating output
-    pn_transport_t *transport = pn_connector_transport(ctor);
+    pn_transport_t *transport = pn_connection_transport(conn);
     if (transport) {
       if (!pn_transport_quiesced(transport)) {
-        pn_connector_process(ctor);
         return false;
       }
     }
 
-    pn_connection_t *conn = pn_connector_connection(ctor);
-
     pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
     while (link) {
       if (pn_link_is_sender(link)) {
@@ -1528,8 +1852,6 @@ bool pn_messenger_sent(pn_messenger_t *m
       }
       link = pn_link_next(link, PN_LOCAL_ACTIVE);
     }
-
-    ctor = pn_connector_next(ctor);
   }
 
   return total <= messenger->send_threshold;
@@ -1539,9 +1861,9 @@ bool pn_messenger_rcvd(pn_messenger_t *m
 {
   if (pni_store_size(messenger->incoming) > 0) return true;
 
-  pn_connector_t *ctor = pn_connector_head(messenger->driver);
-  while (ctor) {
-    pn_connection_t *conn = pn_connector_connection(ctor);
+  for (size_t i = 0; i < pn_list_size(messenger->connections); i++)
+  {
+    pn_connection_t *conn = (pn_connection_t *) pn_list_get(messenger->connections, i);
 
     pn_delivery_t *d = pn_work_head(conn);
     while (d) {
@@ -1550,10 +1872,9 @@ bool pn_messenger_rcvd(pn_messenger_t *m
       }
       d = pn_work_next(d);
     }
-    ctor = pn_connector_next(ctor);
   }
 
-  if (!pn_connector_head(messenger->driver) && !pn_listener_head(messenger->driver)) {
+  if (!pn_list_size(messenger->connections) && !pn_list_size(messenger->listeners)) {
     return true;
   } else {
     return false;
@@ -1568,7 +1889,9 @@ int pn_messenger_work(pn_messenger_t *me
 {
   messenger->worked = false;
   int err = pn_messenger_tsync(messenger, work_pred, timeout);
-  if (err) return err;
+  if (err) {
+    return err;
+  }
   return (int) (messenger->worked ? 1 : 0);
 }
 
@@ -1589,8 +1912,9 @@ int pni_messenger_work(pn_messenger_t *m
 int pn_messenger_interrupt(pn_messenger_t *messenger)
 {
   assert(messenger);
-  if (messenger->driver) {
-    return pn_driver_wakeup(messenger->driver);
+  ssize_t n = pn_write(messenger->io, messenger->ctrl[1], "x", 1);
+  if (n <= 0) {
+    return n;
   } else {
     return 0;
   }
@@ -1611,8 +1935,8 @@ int pn_messenger_send(pn_messenger_t *me
 int pn_messenger_recv(pn_messenger_t *messenger, int n)
 {
   if (!messenger) return PN_ARG_ERR;
-  if (messenger->blocking && !pn_listener_head(messenger->driver)
-      && !pn_connector_head(messenger->driver))
+  if (messenger->blocking && !pn_list_size(messenger->listeners)
+      && !pn_list_size(messenger->connections))
     return pn_error_format(messenger->error, PN_STATE_ERR, "no valid sources");
 
   // re-compute credit, and update credit scheduler
@@ -1630,8 +1954,8 @@ int pn_messenger_recv(pn_messenger_t *me
   if (err) return err;
   if (!pn_messenger_incoming(messenger) &&
       messenger->blocking &&
-      !pn_listener_head(messenger->driver) &&
-      !pn_connector_head(messenger->driver)) {
+      !pn_list_size(messenger->listeners) &&
+      !pn_list_size(messenger->connections)) {
     return pn_error_format(messenger->error, PN_STATE_ERR, "no valid sources");
   } else {
     return 0;
@@ -1715,9 +2039,8 @@ int pn_messenger_queued(pn_messenger_t *
 
   int result = 0;
 
-  pn_connector_t *ctor = pn_connector_head(messenger->driver);
-  while (ctor) {
-    pn_connection_t *conn = pn_connector_connection(ctor);
+  for (size_t i = 0; i < pn_list_size(messenger->connections); i++) {
+    pn_connection_t *conn = (pn_connection_t *) pn_list_get(messenger->connections, i);
 
     pn_link_t *link = pn_link_head(conn, PN_LOCAL_ACTIVE);
     while (link) {
@@ -1730,7 +2053,6 @@ int pn_messenger_queued(pn_messenger_t *
       }
       link = pn_link_next(link, PN_LOCAL_ACTIVE);
     }
-    ctor = pn_connector_next(ctor);
   }
 
   return result;

Modified: qpid/proton/trunk/proton-c/src/object/object.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/object/object.c?rev=1572744&r1=1572743&r2=1572744&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/object/object.c (original)
+++ qpid/proton/trunk/proton-c/src/object/object.c Thu Feb 27 21:25:13 2014
@@ -66,9 +66,9 @@ void pn_decref(void *object)
 {
   if (object) {
     pni_head_t *head = pni_head(object);
-    if (head->refcount > 1) {
-      head->refcount--;
-    } else if (head->refcount == 1) {
+    assert(head->refcount > 0);
+    head->refcount--;
+    if (!head->refcount) {
       pn_finalize(object);
       free(head);
     }
@@ -79,7 +79,7 @@ void pn_finalize(void *object)
 {
   if (object) {
     pni_head_t *head = pni_head(object);
-    assert(head->refcount == 1);
+    assert(head->refcount == 0);
     if (head->clazz && head->clazz->finalize) {
       head->clazz->finalize(object);
     }
@@ -254,6 +254,12 @@ void pn_list_del(pn_list_t *list, int in
   list->size -= n;
 }
 
+void pn_list_clear(pn_list_t *list)
+{
+  assert(list);
+  pn_list_del(list, 0, list->size);
+}
+
 void pn_list_fill(pn_list_t *list, void *value, int n)
 {
   for (int i = 0; i < n; i++) {
@@ -261,6 +267,28 @@ void pn_list_fill(pn_list_t *list, void 
   }
 }
 
+typedef struct {
+  pn_list_t *list;
+  size_t index;
+} pni_list_iter_t;
+
+static void *pni_list_next(void *ctx)
+{
+  pni_list_iter_t *iter = (pni_list_iter_t *) ctx;
+  if (iter->index < pn_list_size(iter->list)) {
+    return pn_list_get(iter->list, iter->index++);
+  } else {
+    return NULL;
+  }
+}
+
+void pn_list_iterator(pn_list_t *list, pn_iterator_t *iter)
+{
+  pni_list_iter_t *liter = (pni_list_iter_t *) pn_iterator_start(iter, pni_list_next, sizeof(pni_list_iter_t));
+  liter->list = list;
+  liter->index = 0;
+}
+
 static void pn_list_finalize(void *object)
 {
   assert(object);
@@ -971,5 +999,11 @@ void  *pn_iterator_start(pn_iterator_t *
 
 void *pn_iterator_next(pn_iterator_t *iterator) {
   assert(iterator);
-  return iterator->next(iterator->state);
+  if (iterator->next) {
+    void *result = iterator->next(iterator->state);
+    if (!result) iterator->next = NULL;
+    return result;
+  } else {
+    return NULL;
+  }
 }

Modified: qpid/proton/trunk/proton-c/src/posix/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/posix/driver.c?rev=1572744&r1=1572743&r2=1572744&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/posix/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/posix/driver.c Thu Feb 27 21:25:13 2014
@@ -34,6 +34,7 @@
 #include <proton/driver.h>
 #include <proton/driver_extras.h>
 #include <proton/error.h>
+#include <proton/io.h>
 #include <proton/sasl.h>
 #include <proton/ssl.h>
 #include <proton/util.h>
@@ -47,37 +48,9 @@
 #define PN_SEL_RD (0x0001)
 #define PN_SEL_WR (0x0002)
 
-/* Abstract away turning off SIGPIPE */
-#ifdef MSG_NOSIGNAL
-static inline ssize_t pn_send(int sockfd, const void *buf, size_t len) {
-    return send(sockfd, buf, len, MSG_NOSIGNAL);
-}
-
-static inline int pn_create_socket(void) {
-    return socket(AF_INET, SOCK_STREAM, getprotobyname("tcp")->p_proto);
-}
-#elif defined(SO_NOSIGPIPE)
-static inline ssize_t pn_send(int sockfd, const void *buf, size_t len) {
-    return send(sockfd, buf, len, 0);
-}
-
-static inline int pn_create_socket(void) {
-    int sock = socket(AF_INET, SOCK_STREAM, getprotobyname("tcp")->p_proto);
-    if (sock == -1) return sock;
-
-    int optval = 1;
-    if (setsockopt(sock, SOL_SOCKET, SO_NOSIGPIPE, &optval, sizeof(optval)) == -1) {
-        close(sock);
-        return -1;
-    }
-    return sock;
-}
-#else
-#error "Don't know how to turn off SIGPIPE on this platform"
-#endif
-
 struct pn_driver_t {
   pn_error_t *error;
+  pn_io_t *io;
   pn_listener_t *listener_head;
   pn_listener_t *listener_tail;
   pn_listener_t *listener_next;
@@ -161,46 +134,17 @@ pn_listener_t *pn_listener(pn_driver_t *
 {
   if (!driver) return NULL;
 
-  struct addrinfo *addr;
-  int code = getaddrinfo(host, port, NULL, &addr);
-  if (code) {
-    pn_error_format(driver->error, PN_ERR, "getaddrinfo(%s, %s): %s\n", host, port, gai_strerror(code));
+  pn_socket_t sock = pn_listen(driver->io, host, port);
+  if (sock == INVALID_SOCKET) {
     return NULL;
-  }
-
-  int sock = pn_create_socket();
-  if (sock == -1) {
-    pn_i_error_from_errno(driver->error, "pn_create_socket");
-    return NULL;
-  }
-
-  int optval = 1;
-  if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) == -1) {
-    pn_i_error_from_errno(driver->error, "setsockopt");
-    close(sock);
-    return NULL;
-  }
-
-  if (bind(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
-    pn_i_error_from_errno(driver->error, "bind");
-    freeaddrinfo(addr);
-    close(sock);
-    return NULL;
-  }
+  } else {
+    pn_listener_t *l = pn_listener_fd(driver, sock, context);
 
-  freeaddrinfo(addr);
+    if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
+      fprintf(stderr, "Listening on %s:%s\n", host, port);
 
-  if (listen(sock, 50) == -1) {
-    pn_i_error_from_errno(driver->error, "listen");
-    close(sock);
-    return NULL;
+    return l;
   }
-
-  pn_listener_t *l = pn_listener_fd(driver, sock, context);
-
-  if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
-    fprintf(stderr, "Listening on %s:%s\n", host, port);
-  return l;
 }
 
 pn_listener_t *pn_listener_fd(pn_driver_t *driver, int fd, void *context)
@@ -222,6 +166,12 @@ pn_listener_t *pn_listener_fd(pn_driver_
   return l;
 }
 
+pn_socket_t pn_listener_get_fd(pn_listener_t *listener)
+{
+  assert(listener);
+  return listener->fd;
+}
+
 pn_listener_t *pn_listener_head(pn_driver_t *driver)
 {
   return driver ? driver->listener_head : NULL;
@@ -246,62 +196,21 @@ void pn_listener_set_context(pn_listener
   listener->context = context;
 }
 
-static void pn_configure_sock(int sock) {
-  // this would be nice, but doesn't appear to exist on linux
-  /*
-  int set = 1;
-  if (!setsockopt(sock, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int))) {
-    perror("setsockopt");
-  };
-  */
-
-    int flags = fcntl(sock, F_GETFL);
-    flags |= O_NONBLOCK;
-
-    if (fcntl(sock, F_SETFL, flags) < 0) {
-        perror("fcntl");
-    }
-
-    //
-    // Disable the Nagle algorithm on TCP connections.
-    //
-    // Note:  It would be more correct for the "level" argument to be SOL_TCP.  However, there
-    //        are portability issues with this macro so we use IPPROTO_TCP instead.
-    //
-    int tcp_nodelay = 1;
-    if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void*) &tcp_nodelay, sizeof(tcp_nodelay)) < 0) {
-        perror("setsockopt");
-    }
-}
-
 pn_connector_t *pn_listener_accept(pn_listener_t *l)
 {
   if (!l || !l->pending) return NULL;
+  char name[PN_NAME_MAX];
 
-  struct sockaddr_in addr = {0};
-  addr.sin_family = AF_INET;
-  socklen_t addrlen = sizeof(addr);
-  int sock = accept(l->fd, (struct sockaddr *) &addr, &addrlen);
-  if (sock == -1) {
-    perror("accept");
+  pn_socket_t sock = pn_accept(l->driver->io, l->fd, name, PN_NAME_MAX);
+  if (sock == INVALID_SOCKET) {
     return NULL;
   } else {
-    char host[1024], serv[64];
-    int code;
-    if ((code = getnameinfo((struct sockaddr *) &addr, addrlen, host, 1024, serv, 64, 0))) {
-      fprintf(stderr, "getnameinfo: %s\n", gai_strerror(code));
-      if (close(sock) == -1)
-        perror("close");
-      return NULL;
-    } else {
-      pn_configure_sock(sock);
-      if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
-        fprintf(stderr, "Accepted from %s:%s\n", host, serv);
-      pn_connector_t *c = pn_connector_fd(l->driver, sock, NULL);
-      snprintf(c->name, PN_NAME_MAX, "%s:%s", host, serv);
-      c->listener = l;
-      return c;
-    }
+    if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
+      fprintf(stderr, "Accepted from %s\n", name);
+    pn_connector_t *c = pn_connector_fd(l->driver, sock, NULL);
+    snprintf(c->name, PN_NAME_MAX, "%s", name);
+    c->listener = l;
+    return c;
   }
 }
 
@@ -354,31 +263,7 @@ pn_connector_t *pn_connector(pn_driver_t
 {
   if (!driver) return NULL;
 
-  struct addrinfo *addr;
-  int code = getaddrinfo(host, port, NULL, &addr);
-  if (code) {
-    pn_error_format(driver->error, PN_ERR, "getaddrinfo(%s, %s): %s", host, port, gai_strerror(code));
-    return NULL;
-  }
-
-  int sock = pn_create_socket();
-  if (sock == -1) {
-    pn_i_error_from_errno(driver->error, "pn_create_socket");
-    return NULL;
-  }
-
-  pn_configure_sock(sock);
-
-  if (connect(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
-    if (errno != EINPROGRESS) {
-      pn_i_error_from_errno(driver->error, "connect");
-      freeaddrinfo(addr);
-      close(sock);
-      return NULL;
-    }
-  }
-
-  freeaddrinfo(addr);
+  pn_socket_t sock = pn_connect(driver->io, host, port);
 
   pn_connector_t *c = pn_connector_fd(driver, sock, context);
   snprintf(c->name, PN_NAME_MAX, "%s:%s", host, port);
@@ -420,6 +305,12 @@ pn_connector_t *pn_connector_fd(pn_drive
   return c;
 }
 
+pn_socket_t pn_connector_get_fd(pn_connector_t *connector)
+{
+  assert(connector);
+  return connector->fd;
+}
+
 pn_connector_t *pn_connector_head(pn_driver_t *driver)
 {
   return driver ? driver->connector_head : NULL;
@@ -575,7 +466,7 @@ void pn_connector_process(pn_connector_t
         c->status |= PN_SEL_RD;
         if (c->pending_read) {
           c->pending_read = false;
-          ssize_t n =  recv(c->fd, pn_transport_tail(transport), capacity, 0);
+          ssize_t n =  pn_recv(c->driver->io, c->fd, pn_transport_tail(transport), capacity);
           if (n < 0) {
             if (errno != EAGAIN) {
               perror("read");
@@ -618,7 +509,7 @@ void pn_connector_process(pn_connector_t
         c->status |= PN_SEL_WR;
         if (c->pending_write) {
           c->pending_write = false;
-          ssize_t n = pn_send(c->fd, pn_transport_head(transport), pending);
+          ssize_t n = pn_send(c->driver->io, c->fd, pn_transport_head(transport), pending);
           if (n < 0) {
             // XXX
             if (errno != EAGAIN) {
@@ -657,6 +548,7 @@ pn_driver_t *pn_driver()
   pn_driver_t *d = (pn_driver_t *) malloc(sizeof(pn_driver_t));
   if (!d) return NULL;
   d->error = pn_error();
+  d->io = pn_io();
   d->listener_head = NULL;
   d->listener_tail = NULL;
   d->listener_next = NULL;
@@ -711,6 +603,7 @@ void pn_driver_free(pn_driver_t *d)
     pn_listener_free(d->listener_head);
   free(d->fds);
   pn_error_free(d->error);
+  pn_io_free(d->io);
   free(d);
 }
 

Added: qpid/proton/trunk/proton-c/src/posix/io.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/posix/io.c?rev=1572744&view=auto
==============================================================================
--- qpid/proton/trunk/proton-c/src/posix/io.c (added)
+++ qpid/proton/trunk/proton-c/src/posix/io.c Thu Feb 27 21:25:13 2014
@@ -0,0 +1,264 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/io.h>
+#include <proton/object.h>
+
+#include <ctype.h>
+#include <errno.h>
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <netdb.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <assert.h>
+
+#include "../platform.h"
+
+#define MAX_HOST (1024)
+#define MAX_SERV (64)
+
+struct pn_io_t {
+  char host[MAX_HOST];
+  char serv[MAX_SERV];
+  pn_error_t *error;
+};
+
+void pn_io_initialize(void *obj)
+{
+  pn_io_t *io = (pn_io_t *) obj;
+  io->error = pn_error();
+}
+
+void pn_io_finalize(void *obj)
+{
+  pn_io_t *io = (pn_io_t *) obj;
+  pn_error_free(io->error);
+}
+
+#define pn_io_hashcode NULL
+#define pn_io_compare NULL
+#define pn_io_inspect
+
+pn_io_t *pn_io(void)
+{
+  static pn_class_t clazz = PN_CLASS(pn_io);
+  pn_io_t *io = (pn_io_t *) pn_new(sizeof(pn_io_t), &clazz);
+  return io;
+}
+
+void pn_io_free(pn_io_t *io)
+{
+  pn_free(io);
+}
+
+pn_error_t *pn_io_error(pn_io_t *io)
+{
+  assert(io);
+  return io->error;
+}
+
+int pn_pipe(pn_io_t *io, pn_socket_t *dest)
+{
+  int n = pipe(dest);
+  if (n) {
+    pn_i_error_from_errno(io->error, "pipe");
+  }
+
+  return n;
+}
+
+static void pn_configure_sock(pn_io_t *io, pn_socket_t sock) {
+  // this would be nice, but doesn't appear to exist on linux
+  /*
+  int set = 1;
+  if (!setsockopt(sock, SOL_SOCKET, SO_NOSIGPIPE, (void *)&set, sizeof(int))) {
+    pn_i_error_from_errno(io->error, "setsockopt");
+  };
+  */
+
+  int flags = fcntl(sock, F_GETFL);
+  flags |= O_NONBLOCK;
+
+  if (fcntl(sock, F_SETFL, flags) < 0) {
+    pn_i_error_from_errno(io->error, "fcntl");
+  }
+
+  //
+  // Disable the Nagle algorithm on TCP connections.
+  //
+  // Note:  It would be more correct for the "level" argument to be SOL_TCP.  However, there
+  //        are portability issues with this macro so we use IPPROTO_TCP instead.
+  //
+  int tcp_nodelay = 1;
+  if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void*) &tcp_nodelay, sizeof(tcp_nodelay)) < 0) {
+    pn_i_error_from_errno(io->error, "setsockopt");
+  }
+}
+
+static inline int pn_create_socket(void);
+
+pn_socket_t pn_listen(pn_io_t *io, const char *host, const char *port)
+{
+  struct addrinfo *addr;
+  int code = getaddrinfo(host, port, NULL, &addr);
+  if (code) {
+    pn_error_format(io->error, PN_ERR, "getaddrinfo(%s, %s): %s\n", host, port, gai_strerror(code));
+    return INVALID_SOCKET;
+  }
+
+  pn_socket_t sock = pn_create_socket();
+  if (sock == INVALID_SOCKET) {
+    pn_i_error_from_errno(io->error, "pn_create_socket");
+    return INVALID_SOCKET;
+  }
+
+  int optval = 1;
+  if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) == -1) {
+    pn_i_error_from_errno(io->error, "setsockopt");
+    close(sock);
+    return INVALID_SOCKET;
+  }
+
+  if (bind(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
+    pn_i_error_from_errno(io->error, "bind");
+    freeaddrinfo(addr);
+    close(sock);
+    return INVALID_SOCKET;
+  }
+
+  freeaddrinfo(addr);
+
+  if (listen(sock, 50) == -1) {
+    pn_i_error_from_errno(io->error, "listen");
+    close(sock);
+    return INVALID_SOCKET;
+  }
+
+  return sock;
+}
+
+pn_socket_t pn_connect(pn_io_t *io, const char *host, const char *port)
+{
+  struct addrinfo *addr;
+  int code = getaddrinfo(host, port, NULL, &addr);
+  if (code) {
+    pn_error_format(io->error, PN_ERR, "getaddrinfo(%s, %s): %s", host, port, gai_strerror(code));
+    return INVALID_SOCKET;
+  }
+
+  pn_socket_t sock = pn_create_socket();
+  if (sock == INVALID_SOCKET) {
+    pn_i_error_from_errno(io->error, "pn_create_socket");
+    return INVALID_SOCKET;
+  }
+
+  pn_configure_sock(io, sock);
+
+  if (connect(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
+    if (errno != EINPROGRESS) {
+      pn_i_error_from_errno(io->error, "connect");
+      freeaddrinfo(addr);
+      close(sock);
+      return INVALID_SOCKET;
+    }
+  }
+
+  freeaddrinfo(addr);
+
+  return sock;
+}
+
+pn_socket_t pn_accept(pn_io_t *io, pn_socket_t socket, char *name, size_t size)
+{
+  struct sockaddr_in addr = {0};
+  addr.sin_family = AF_INET;
+  socklen_t addrlen = sizeof(addr);
+  pn_socket_t sock = accept(socket, (struct sockaddr *) &addr, &addrlen);
+  if (sock == INVALID_SOCKET) {
+    pn_i_error_from_errno(io->error, "accept");
+    return sock;
+  } else {
+    int code;
+    if ((code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, MAX_HOST, io->serv, MAX_SERV, 0))) {
+      pn_error_format(io->error, PN_ERR, "getnameinfo: %s\n", gai_strerror(code));
+      if (close(sock) == -1)
+        pn_i_error_from_errno(io->error, "close");
+      return INVALID_SOCKET;
+    } else {
+      pn_configure_sock(io, sock);
+      snprintf(name, size, "%s:%s", io->host, io->serv);
+      return sock;
+    }
+  }
+}
+
+/* Abstract away turning off SIGPIPE */
+#ifdef MSG_NOSIGNAL
+ssize_t pn_send(pn_io_t *io, pn_socket_t socket, const void *buf, size_t len) {
+  return send(socket, buf, len, MSG_NOSIGNAL);
+}
+
+static inline int pn_create_socket(void) {
+  return socket(AF_INET, SOCK_STREAM, getprotobyname("tcp")->p_proto);
+}
+#elif defined(SO_NOSIGPIPE)
+ssize_t pn_send(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size) {
+  return send(socket, buf, len, 0);
+}
+
+static inline int pn_create_socket(void) {
+  int sock = socket(AF_INET, SOCK_STREAM, getprotobyname("tcp")->p_proto);
+  if (sock == -1) return sock;
+
+  int optval = 1;
+  if (setsockopt(sock, SOL_SOCKET, SO_NOSIGPIPE, &optval, sizeof(optval)) == -1) {
+    close(sock);
+    return -1;
+  }
+  return sock;
+}
+#else
+#error "Don't know how to turn off SIGPIPE on this platform"
+#endif
+
+ssize_t pn_recv(pn_io_t *io, pn_socket_t socket, void *buf, size_t size)
+{
+  return recv(socket, buf, size, 0);
+}
+
+ssize_t pn_write(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size)
+{
+  return write(socket, buf, size);
+}
+
+ssize_t pn_read(pn_io_t *io, pn_socket_t socket, void *buf, size_t size)
+{
+  return read(socket, buf, size);
+}
+
+void pn_close(pn_io_t *io, pn_socket_t socket)
+{
+  close(socket);
+}

Added: qpid/proton/trunk/proton-c/src/selectable.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/selectable.c?rev=1572744&view=auto
==============================================================================
--- qpid/proton/trunk/proton-c/src/selectable.c (added)
+++ qpid/proton/trunk/proton-c/src/selectable.c Thu Feb 27 21:25:13 2014
@@ -0,0 +1,207 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/error.h>
+#include "selectable.h"
+#include <stdlib.h>
+#include <assert.h>
+
+pn_selectables_t *pn_selectables(void)
+{
+  return pn_iterator();
+}
+
+pn_selectable_t *pn_selectables_next(pn_selectables_t *selectables)
+{
+  return (pn_selectable_t *) pn_iterator_next(selectables);
+}
+
+void pn_selectables_free(pn_selectables_t *selectables)
+{
+  pn_free(selectables);
+}
+
+struct pn_selectable_t {
+  int fd;
+  int index;
+  void *context;
+  ssize_t (*capacity)(pn_selectable_t *);
+  ssize_t (*pending)(pn_selectable_t *);
+  pn_timestamp_t (*deadline)(pn_selectable_t *);
+  void (*readable)(pn_selectable_t *);
+  void (*writable)(pn_selectable_t *);
+  void (*expired)(pn_selectable_t *);
+  void (*finalize)(pn_selectable_t *);
+  bool registered;
+  bool terminal;
+};
+
+void pn_selectable_initialize(void *obj)
+{
+  pn_selectable_t *sel = (pn_selectable_t *) obj;
+  sel->fd = -1;
+  sel->index = -1;
+  sel->context = NULL;
+  sel->capacity = NULL;
+  sel->deadline = NULL;
+  sel->pending = NULL;
+  sel->readable = NULL;
+  sel->writable = NULL;
+  sel->expired = NULL;
+  sel->finalize = NULL;
+  sel->registered = false;
+  sel->terminal = false;
+}
+
+void pn_selectable_finalize(void *obj)
+{
+  pn_selectable_t *sel = (pn_selectable_t *) obj;
+  sel->finalize(sel);
+}
+
+#define pn_selectable_hashcode NULL
+#define pn_selectable_inspect NULL
+#define pn_selectable_compare NULL
+
+pn_selectable_t *pni_selectable(ssize_t (*capacity)(pn_selectable_t *),
+                                ssize_t (*pending)(pn_selectable_t *),
+                                pn_timestamp_t (*deadline)(pn_selectable_t *),
+                                void (*readable)(pn_selectable_t *),
+                                void (*writable)(pn_selectable_t *),
+                                void (*expired)(pn_selectable_t *),
+                                void (*finalize)(pn_selectable_t *))
+{
+  static pn_class_t clazz = PN_CLASS(pn_selectable);
+  pn_selectable_t *selectable = (pn_selectable_t *) pn_new(sizeof(pn_selectable_t), &clazz);
+  selectable->capacity = capacity;
+  selectable->pending = pending;
+  selectable->readable = readable;
+  selectable->deadline = deadline;
+  selectable->writable = writable;
+  selectable->expired = expired;
+  selectable->finalize = finalize;
+  return selectable;
+}
+
+void *pni_selectable_get_context(pn_selectable_t *selectable)
+{
+  assert(selectable);
+  return selectable->context;
+}
+
+void pni_selectable_set_context(pn_selectable_t *selectable, void *context)
+{
+  assert(selectable);
+  selectable->context = context;
+}
+
+int pni_selectable_get_index(pn_selectable_t *selectable)
+{
+  assert(selectable);
+  return selectable->index;
+}
+
+void pni_selectable_set_index(pn_selectable_t *selectable, int index)
+{
+  assert(selectable);
+  selectable->index = index;
+}
+
+int pn_selectable_fd(pn_selectable_t *selectable)
+{
+  assert(selectable);
+  return selectable->fd;
+}
+
+void pni_selectable_set_fd(pn_selectable_t *selectable, int fd)
+{
+  assert(selectable);
+  selectable->fd = fd;
+}
+
+ssize_t pn_selectable_capacity(pn_selectable_t *selectable)
+{
+  assert(selectable);
+  return selectable->capacity(selectable);
+}
+
+ssize_t pn_selectable_pending(pn_selectable_t *selectable)
+{
+  assert(selectable);
+  return selectable->pending(selectable);
+}
+
+pn_timestamp_t pn_selectable_deadline(pn_selectable_t *selectable)
+{
+  assert(selectable);
+  return selectable->deadline(selectable);
+}
+
+void pn_selectable_readable(pn_selectable_t *selectable)
+{
+  assert(selectable);
+  selectable->readable(selectable);
+}
+
+void pn_selectable_writable(pn_selectable_t *selectable)
+{
+  assert(selectable);
+  selectable->writable(selectable);
+}
+
+void pn_selectable_expired(pn_selectable_t *selectable)
+{
+  assert(selectable);
+  selectable->writable(selectable);
+}
+
+bool pn_selectable_is_registered(pn_selectable_t *selectable)
+{
+  assert(selectable);
+  return selectable->registered;
+}
+
+void pn_selectable_set_registered(pn_selectable_t *selectable, bool registered)
+{
+  assert(selectable);
+  selectable->registered = registered;
+}
+
+bool pn_selectable_is_terminal(pn_selectable_t *selectable)
+{
+  assert(selectable);
+  if (!selectable->terminal) {
+    selectable->terminal = (pn_selectable_capacity(selectable) < 0 &&
+                            pn_selectable_pending(selectable) < 0);
+  }
+  return selectable->terminal;
+}
+
+void pni_selectable_set_terminal(pn_selectable_t *selectable, bool terminal)
+{
+  assert(selectable);
+  selectable->terminal = terminal;
+}
+
+void pn_selectable_free(pn_selectable_t *selectable)
+{
+  pn_free(selectable);
+}

Added: qpid/proton/trunk/proton-c/src/selectable.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/selectable.h?rev=1572744&view=auto
==============================================================================
--- qpid/proton/trunk/proton-c/src/selectable.h (added)
+++ qpid/proton/trunk/proton-c/src/selectable.h Thu Feb 27 21:25:13 2014
@@ -0,0 +1,45 @@
+#ifndef _PROTON_SRC_SELECTABLE_H
+#define _PROTON_SRC_SELECTABLE_H 1
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef __cplusplus
+#include <stdbool.h>
+#endif
+
+#include <proton/selectable.h>
+
+pn_selectable_t *pni_selectable(ssize_t (*capacity)(pn_selectable_t *),
+                                ssize_t (*pending)(pn_selectable_t *),
+                                pn_timestamp_t (*deadline)(pn_selectable_t *),
+                                void (*readable)(pn_selectable_t *),
+                                void (*writable)(pn_selectable_t *),
+                                void (*expired)(pn_selectable_t *),
+                                void (*finalize)(pn_selectable_t *));
+void *pni_selectable_get_context(pn_selectable_t *selectable);
+void pni_selectable_set_context(pn_selectable_t *selectable, void *context);
+void pni_selectable_set_fd(pn_selectable_t *selectable, int fd);
+void pni_selectable_set_terminal(pn_selectable_t *selectable, bool terminal);
+int pni_selectable_get_index(pn_selectable_t *selectable);
+void pni_selectable_set_index(pn_selectable_t *selectable, int index);
+
+#endif /* selectable.h */



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message