qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From astitc...@apache.org
Subject [qpid-proton] 01/06: PROTON-1992: [Python] Remove dependency on Proton Reactor API - Python binding now only uses APIs from Proton Core library. It uses Python APIs to do all IO and uses Proton purely to process the AMQP protocol. - It is very compatible with the existing higher level Python APIs. [In modules proton, proton.reactor, proton.handlers, proton.utils]
Date Mon, 25 Feb 2019 18:40:18 GMT
This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 1d6e14f8bb077f584afbe5419aab7ed78d422f9b
Author: Andrew Stitcher <astitcher@apache.org>
AuthorDate: Tue Jan 15 16:41:57 2019 -0500

    PROTON-1992: [Python] Remove dependency on Proton Reactor API
    - Python binding now only uses APIs from Proton Core library.
      It uses Python APIs to do all IO and uses Proton purely to process
      the AMQP protocol.
    - It is very compatible with the existing higher level Python APIs.
      [In modules proton, proton.reactor, proton.handlers, proton.utils]
    
    - Passes the python tests as well as before
    - Works with Python 2 and Python 3
    - Works on Unix and Windows
    - Runs all the python examples
---
 python/CMakeLists.txt                |   3 +-
 python/proton/_endpoints.py          |  56 ++---
 python/proton/_events.py             | 250 ++++++++++++--------
 python/proton/_handlers.py           | 214 +++++++++++++++--
 python/proton/_io.py                 | 138 +++++++++++
 python/proton/_message.py            |  13 +-
 python/proton/_reactor.py            | 438 ++++++++++++++++++++++-------------
 python/proton/_reactor_impl.py       | 217 -----------------
 python/proton/_selectable.py         |  93 ++++++++
 python/proton/_transport.py          |   5 +
 python/proton/_utils.py              |   4 +-
 python/proton/_wrapper.py            |  15 ++
 python/tests/proton_tests/handler.py |   2 +-
 python/tests/proton_tests/reactor.py |   8 +-
 14 files changed, 922 insertions(+), 534 deletions(-)

diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index c9e659e..a02c401 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -72,6 +72,7 @@ set (pysrc
     proton/_endpoints.py
     proton/_events.py
     proton/_exceptions.py
+    proton/_io.py
     proton/_message.py
     proton/_transport.py
     proton/_url.py
@@ -83,7 +84,7 @@ set (pysrc
 
     proton/_handlers.py
     proton/_reactor.py
-    proton/_reactor_impl.py
+    proton/_selectable.py
     proton/_utils.py
     )
 # extra files included in the source distribution
diff --git a/python/proton/_endpoints.py b/python/proton/_endpoints.py
index bf72727..e873710 100644
--- a/python/proton/_endpoints.py
+++ b/python/proton/_endpoints.py
@@ -27,7 +27,6 @@ import weakref
 
 from cproton import PN_LOCAL_UNINIT, PN_REMOTE_UNINIT, PN_LOCAL_ACTIVE, PN_REMOTE_ACTIVE, PN_LOCAL_CLOSED, \
     PN_REMOTE_CLOSED, \
-    pn_object_reactor, pn_record_get_handler, pn_record_set_handler, pn_decref, \
     pn_connection, pn_connection_attachments, pn_connection_transport, pn_connection_error, pn_connection_condition, \
     pn_connection_remote_condition, pn_connection_collect, pn_connection_set_container, pn_connection_get_container, \
     pn_connection_get_hostname, pn_connection_set_hostname, pn_connection_get_user, pn_connection_set_user, \
@@ -81,6 +80,7 @@ class Endpoint(object):
 
     def _init(self):
         self.condition = None
+        self._handler = None
 
     def _update_cond(self):
         obj2cond(self.condition, self._get_cond_impl())
@@ -97,35 +97,21 @@ class Endpoint(object):
         assert False, "Subclass must override this!"
 
     def _get_handler(self):
-        from . import _reactor
-        from . import _reactor_impl
-        ractor = _reactor.Reactor.wrap(pn_object_reactor(self._impl))
-        if ractor:
-            on_error = ractor.on_error_delegate()
-        else:
-            on_error = None
-        record = self._get_attachments()
-        return _reactor_impl.WrappedHandler.wrap(pn_record_get_handler(record), on_error)
+        return self._handler
 
     def _set_handler(self, handler):
-        from . import _reactor
-        from . import _reactor_impl
-        ractor = _reactor.Reactor.wrap(pn_object_reactor(self._impl))
-        if ractor:
-            on_error = ractor.on_error_delegate()
+        # TODO Hack This is here for some very odd (IMO) backwards compat behaviour
+        from ._events import Handler
+        if handler is None:
+          self._handler = None
+        elif issubclass(type(handler), Handler):
+            self._handler = handler
         else:
-            on_error = None
-        impl = _reactor_impl._chandler(handler, on_error)
-        record = self._get_attachments()
-        pn_record_set_handler(record, impl)
-        pn_decref(impl)
+            self._handler = Handler()
+            self._handler.add(handler)
 
     handler = property(_get_handler, _set_handler)
 
-    @property
-    def transport(self):
-        return self.connection.transport
-
 
 class Connection(Wrapper, Endpoint):
     """
@@ -147,6 +133,8 @@ class Connection(Wrapper, Endpoint):
         self.offered_capabilities = None
         self.desired_capabilities = None
         self.properties = None
+        self.url = None
+        self._acceptor = None
 
     def _get_attachments(self):
         return pn_connection_attachments(self._impl)
@@ -183,7 +171,7 @@ class Connection(Wrapper, Endpoint):
         return utf82unicode(pn_connection_get_container(self._impl))
 
     def _set_container(self, name):
-        return pn_connection_set_container(self._impl, unicode2utf8(name))
+        pn_connection_set_container(self._impl, unicode2utf8(name))
 
     container = property(_get_container, _set_container)
 
@@ -191,7 +179,7 @@ class Connection(Wrapper, Endpoint):
         return utf82unicode(pn_connection_get_hostname(self._impl))
 
     def _set_hostname(self, name):
-        return pn_connection_set_hostname(self._impl, unicode2utf8(name))
+        pn_connection_set_hostname(self._impl, unicode2utf8(name))
 
     hostname = property(_get_hostname, _set_hostname,
                         doc="""
@@ -206,7 +194,7 @@ and SASL layers to identify the peer.
         return utf82unicode(pn_connection_get_user(self._impl))
 
     def _set_user(self, name):
-        return pn_connection_set_user(self._impl, unicode2utf8(name))
+        pn_connection_set_user(self._impl, unicode2utf8(name))
 
     user = property(_get_user, _set_user)
 
@@ -214,7 +202,7 @@ and SASL layers to identify the peer.
         return None
 
     def _set_password(self, name):
-        return pn_connection_set_password(self._impl, unicode2utf8(name))
+        pn_connection_set_password(self._impl, unicode2utf8(name))
 
     password = property(_get_password, _set_password)
 
@@ -243,6 +231,10 @@ and SASL layers to identify the peer.
         """The properties specified by the remote peer for this connection."""
         return dat2obj(pn_connection_remote_properties(self._impl))
 
+    @property
+    def connected_address(self):
+        return self.url and str(self.url)
+
     def open(self):
         """
         Opens the connection.
@@ -374,6 +366,10 @@ class Session(Wrapper, Endpoint):
     def connection(self):
         return Connection.wrap(pn_session_connection(self._impl))
 
+    @property
+    def transport(self):
+        return self.connection.transport
+
     def sender(self, name):
         return Sender(pn_sender(self._impl, unicode2utf8(name)))
 
@@ -486,6 +482,10 @@ class Link(Wrapper, Endpoint):
         """The connection on which this link was attached."""
         return self.session.connection
 
+    @property
+    def transport(self):
+        return self.session.transport
+
     def delivery(self, tag):
         return Delivery(pn_delivery(self._impl, tag))
 
diff --git a/python/proton/_events.py b/python/proton/_events.py
index c6d5459..d322b2e 100644
--- a/python/proton/_events.py
+++ b/python/proton/_events.py
@@ -22,26 +22,23 @@ from __future__ import absolute_import
 import threading
 
 from cproton import PN_SESSION_REMOTE_CLOSE, PN_SESSION_FINAL, pn_event_context, pn_collector_put, \
-    PN_SELECTABLE_UPDATED, pn_collector, PN_CONNECTION_REMOTE_OPEN, pn_event_attachments, pn_event_type, \
-    pn_collector_free, pn_handler_dispatch, PN_SELECTABLE_WRITABLE, PN_SELECTABLE_INIT, PN_SESSION_REMOTE_OPEN, \
-    pn_collector_peek, PN_CONNECTION_BOUND, PN_LINK_FLOW, pn_event_connection, PN_LINK_LOCAL_CLOSE, \
+    pn_collector, PN_CONNECTION_REMOTE_OPEN, pn_event_type, \
+    pn_collector_free, pn_collector_release, PN_SESSION_REMOTE_OPEN, \
+    pn_collector_peek, pn_collector_more, PN_CONNECTION_BOUND, PN_LINK_FLOW, pn_event_connection, PN_LINK_LOCAL_CLOSE, \
     PN_TRANSPORT_ERROR, PN_CONNECTION_LOCAL_OPEN, PN_CONNECTION_LOCAL_CLOSE, pn_event_delivery, \
-    PN_LINK_REMOTE_OPEN, PN_TRANSPORT_CLOSED, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT, pn_event_reactor, \
+    PN_LINK_REMOTE_OPEN, PN_TRANSPORT_CLOSED, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT, \
     PN_CONNECTION_REMOTE_CLOSE, pn_collector_pop, PN_LINK_INIT, pn_event_link, PN_CONNECTION_UNBOUND, \
-    pn_event_type_name, pn_event_session, PN_LINK_FINAL, pn_py2void, PN_REACTOR_INIT, PN_REACTOR_QUIESCED, \
-    PN_LINK_LOCAL_DETACH, PN_SESSION_INIT, PN_CONNECTION_FINAL, PN_TIMER_TASK, pn_class_name, PN_SELECTABLE_READABLE, \
-    pn_event_transport, PN_TRANSPORT_TAIL_CLOSED, PN_SELECTABLE_FINAL, PN_SESSION_LOCAL_OPEN, PN_DELIVERY, \
-    PN_SESSION_LOCAL_CLOSE, pn_event_copy, PN_REACTOR_FINAL, PN_LINK_LOCAL_OPEN, PN_SELECTABLE_EXPIRED, \
-    PN_LINK_REMOTE_DETACH, PN_PYREF, PN_LINK_REMOTE_CLOSE, pn_event_root, PN_SELECTABLE_ERROR, \
+    pn_event_type_name, pn_event_session, PN_LINK_FINAL, pn_py2void, \
+    PN_LINK_LOCAL_DETACH, PN_SESSION_INIT, PN_CONNECTION_FINAL, PN_TIMER_TASK, pn_class_name,\
+    pn_event_transport, PN_TRANSPORT_TAIL_CLOSED,PN_SESSION_LOCAL_OPEN, PN_DELIVERY, \
+    PN_SESSION_LOCAL_CLOSE, PN_LINK_LOCAL_OPEN, \
+    PN_LINK_REMOTE_DETACH, PN_PYREF, PN_LINK_REMOTE_CLOSE, \
     PN_CONNECTION_INIT, pn_event_class, pn_void2py, pn_cast_pn_session, pn_cast_pn_link, pn_cast_pn_delivery, \
-    pn_cast_pn_transport, pn_cast_pn_connection, pn_cast_pn_selectable
+    pn_cast_pn_transport, pn_cast_pn_connection
 
-from ._common import Constant
 from ._delivery import Delivery
 from ._endpoints import Connection, Session, Link
-from ._reactor_impl import Selectable, WrappedHandler
 from ._transport import Transport
-from ._wrapper import Wrapper
 
 
 class Collector:
@@ -55,10 +52,16 @@ class Collector:
     def peek(self):
         return Event.wrap(pn_collector_peek(self._impl))
 
+    def more(self):
+        return pn_collector_more(self._impl)
+
     def pop(self):
         ev = self.peek()
         pn_collector_pop(self._impl)
 
+    def release(self):
+        pn_collector_release(self._impl)
+
     def __del__(self):
         pn_collector_free(self._impl)
         del self._impl
@@ -104,38 +107,52 @@ class EventType(object):
             self._lock.release()
 
     def __repr__(self):
+        return "EventType(name=%s, number=%d)" % (self.name, self.number)
+
+    def __str__(self):
         return self.name
 
 
 def _dispatch(handler, method, *args):
     m = getattr(handler, method, None)
     if m:
-        return m(*args)
+        m(*args)
     elif hasattr(handler, "on_unhandled"):
-        return handler.on_unhandled(method, *args)
+        handler.on_unhandled(method, *args)
 
 
 class EventBase(object):
 
-    def __init__(self, clazz, context, type):
-        self.clazz = clazz
-        self.context = context
-        self.type = type
-
-    def dispatch(self, handler):
-        return _dispatch(handler, self.type.method, self)
+    def __init__(self, type):
+        self._type = type
 
+    @property
+    def type(self):
+        return self._type
 
-def _none(x): return None
+    @property
+    def handler(self):
+        return None
 
+    def dispatch(self, handler, type=None):
+        type = type or self._type
+        _dispatch(handler, type.method, self)
+        if hasattr(handler, "handlers"):
+            for h in handler.handlers:
+                self.dispatch(h, type)
 
-DELEGATED = Constant("DELEGATED")
+    def __repr__(self):
+        return "%s(%r)" % (self._type, self.context)
 
 
 def _core(number, method):
     return EventType(number=number, method=method)
 
 
+def _internal(name):
+    return EventType(name=name)
+
+
 wrappers = {
     "pn_void": lambda x: pn_void2py(x),
     "pn_pyref": lambda x: pn_void2py(x),
@@ -143,16 +160,11 @@ wrappers = {
     "pn_session": lambda x: Session.wrap(pn_cast_pn_session(x)),
     "pn_link": lambda x: Link.wrap(pn_cast_pn_link(x)),
     "pn_delivery": lambda x: Delivery.wrap(pn_cast_pn_delivery(x)),
-    "pn_transport": lambda x: Transport.wrap(pn_cast_pn_transport(x)),
-    "pn_selectable": lambda x: Selectable.wrap(pn_cast_pn_selectable(x))
+    "pn_transport": lambda x: Transport.wrap(pn_cast_pn_transport(x))
 }
 
 
-class Event(Wrapper, EventBase):
-    REACTOR_INIT = _core(PN_REACTOR_INIT, "on_reactor_init")
-    REACTOR_QUIESCED = _core(PN_REACTOR_QUIESCED, "on_reactor_quiesced")
-    REACTOR_FINAL = _core(PN_REACTOR_FINAL, "on_reactor_final")
-
+class Event(EventBase):
     TIMER_TASK = _core(PN_TIMER_TASK, "on_timer_task")
 
     CONNECTION_INIT = _core(PN_CONNECTION_INIT, "on_connection_init")
@@ -189,107 +201,159 @@ class Event(Wrapper, EventBase):
     TRANSPORT_TAIL_CLOSED = _core(PN_TRANSPORT_TAIL_CLOSED, "on_transport_tail_closed")
     TRANSPORT_CLOSED = _core(PN_TRANSPORT_CLOSED, "on_transport_closed")
 
-    SELECTABLE_INIT = _core(PN_SELECTABLE_INIT, "on_selectable_init")
-    SELECTABLE_UPDATED = _core(PN_SELECTABLE_UPDATED, "on_selectable_updated")
-    SELECTABLE_READABLE = _core(PN_SELECTABLE_READABLE, "on_selectable_readable")
-    SELECTABLE_WRITABLE = _core(PN_SELECTABLE_WRITABLE, "on_selectable_writable")
-    SELECTABLE_EXPIRED = _core(PN_SELECTABLE_EXPIRED, "on_selectable_expired")
-    SELECTABLE_ERROR = _core(PN_SELECTABLE_ERROR, "on_selectable_error")
-    SELECTABLE_FINAL = _core(PN_SELECTABLE_FINAL, "on_selectable_final")
+    # These events are now internal events in the python code
+    REACTOR_INIT = _internal("reactor_init")
+    REACTOR_QUIESCED = _internal("reactor_quiesced")
+    REACTOR_FINAL = _internal("reactor_final")
+
+    SELECTABLE_INIT = _internal("selectable_init")
+    SELECTABLE_UPDATED = _internal("selectable_updated")
+    SELECTABLE_READABLE = _internal("selectable_readable")
+    SELECTABLE_WRITABLE = _internal("selectable_writable")
+    SELECTABLE_EXPIRED = _internal("selectable_expired")
+    SELECTABLE_ERROR = _internal("selectable_error")
+    SELECTABLE_FINAL = _internal("selectable_final")
 
     @staticmethod
-    def wrap(impl, number=None):
+    def wrap(impl):
         if impl is None:
             return None
 
-        if number is None:
-            number = pn_event_type(impl)
+        number = pn_event_type(impl)
+        cls = pn_event_class(impl)
 
-        event = Event(impl, number)
+        if cls:
+            clsname = pn_class_name(cls)
+            context = wrappers[clsname](pn_event_context(impl))
 
-        # check for an application defined ApplicationEvent and return that.  This
-        # avoids an expensive wrap operation invoked by event.context
-        if pn_event_class(impl) == PN_PYREF and \
-                isinstance(event.context, EventBase):
-            return event.context
+            # check for an application defined ApplicationEvent and return that.  This
+            # avoids an expensive wrap operation invoked by event.context
+            if cls == PN_PYREF and isinstance(context, EventBase):
+                return context
         else:
-            return event
+            clsname = None
 
-    def __init__(self, impl, number):
-        Wrapper.__init__(self, impl, pn_event_attachments)
-        self.__dict__["type"] = EventType.TYPES[number]
+        event = Event(impl, number, clsname, context)
+        return event
 
-    def _init(self):
-        pass
+    def __init__(self, impl, number, clsname, context):
+        self._type = EventType.TYPES[number]
+        self._clsname = clsname
+        self._context = context
 
-    def copy(self):
-        copy = pn_event_copy(self._impl)
-        return Event.wrap(copy)
+        # Do all this messing around to avoid duplicate wrappers
+        if issubclass(type(context), Delivery):
+            self._delivery = context
+        else:
+            self._delivery = Delivery.wrap(pn_event_delivery(impl))
+        if self._delivery:
+            self._link = self._delivery.link
+        elif issubclass(type(context), Link):
+            self._link = context
+        else:
+            self._link = Link.wrap(pn_event_link(impl))
+        if self._link:
+            self._session = self._link.session
+        elif issubclass(type(context), Session):
+            self._session = context
+        else:
+            self._session = Session.wrap(pn_event_session(impl))
+        if self._session:
+            self._connection = self._session.connection
+        elif issubclass(type(context), Connection):
+            self._connection = context
+        else:
+            self._connection = Connection.wrap(pn_event_connection(impl))
 
-    @property
-    def clazz(self):
-        cls = pn_event_class(self._impl)
-        if cls:
-            return pn_class_name(cls)
+        if issubclass(type(context), Transport):
+            self._transport = context
         else:
-            return None
+            self._transport = Transport.wrap(pn_event_transport(impl))
 
     @property
-    def root(self):
-        return WrappedHandler.wrap(pn_event_root(self._impl))
+    def clazz(self):
+        return self._clsname
 
     @property
     def context(self):
-        """Returns the context object associated with the event. The type of this depend on the type of event."""
-        return wrappers[self.clazz](pn_event_context(self._impl))
+        """Returns the context object associated with the event. The type of this depends on the type of event."""
+        return self._context
 
-    def dispatch(self, handler, type=None):
-        type = type or self.type
-        if isinstance(handler, WrappedHandler):
-            pn_handler_dispatch(handler._impl, self._impl, type.number)
-        else:
-            result = _dispatch(handler, type.method, self)
-            if result != DELEGATED and hasattr(handler, "handlers"):
-                for h in handler.handlers:
-                    self.dispatch(h, type)
+    @property
+    def handler(self):
+        l = self.link
+        if l:
+            h = l.handler
+            if h:
+                return h
+        s = self.session
+        if s:
+            h = s.handler
+            if h:
+                return h
+        c = self.connection
+        if c:
+            h = c.handler
+            if h:
+                return h
+        c = self.context
+        if not c or not hasattr(c, 'handler'):
+            return None
+        h = c.handler
+        return h
 
     @property
     def reactor(self):
-        """Returns the reactor associated with the event."""
-        return wrappers.get("pn_reactor", _none)(pn_event_reactor(self._impl))
+        """
+        Deprecated: Returns the container (was reactor) associated with the event.
+        """
+        return self.container
+
+    @property
+    def container(self):
+        """
+        Returns the container associated with the event.
+        """
+        return self._transport._reactor
 
     def __getattr__(self, name):
-        r = self.reactor
-        if r and hasattr(r, 'subclass') and r.subclass.__name__.lower() == name:
-            return r
-        else:
-            return super(Event, self).__getattr__(name)
+        """
+        This will look for a property of the event as an attached context object of the same
+        type as the property (but lowercase)
+        """
+        c = self.context
+        # Direct type or subclass of type
+        if type(c).__name__.lower() == name or name in [x.__name__.lower() for x in type(c).__bases__]:
+            return c
+
+        # If the attached object is the wrong type then see if *it* has a property of that name
+        return getattr(c, name, None)
 
     @property
     def transport(self):
         """Returns the transport associated with the event, or null if none is associated with it."""
-        return Transport.wrap(pn_event_transport(self._impl))
+        return self._transport
 
     @property
     def connection(self):
         """Returns the connection associated with the event, or null if none is associated with it."""
-        return Connection.wrap(pn_event_connection(self._impl))
+        return self._connection
 
     @property
     def session(self):
         """Returns the session associated with the event, or null if none is associated with it."""
-        return Session.wrap(pn_event_session(self._impl))
+        return self._session
 
     @property
     def link(self):
         """Returns the link associated with the event, or null if none is associated with it."""
-        return Link.wrap(pn_event_link(self._impl))
+        return self._link
 
     @property
     def sender(self):
         """Returns the sender link associated with the event, or null if
            none is associated with it. This is essentially an alias for
-           link(), that does an additional checkon the type of the
+           link(), that does an additional check on the type of the
            link."""
         l = self.link
         if l and l.is_sender:
@@ -301,7 +365,7 @@ class Event(Wrapper, EventBase):
     def receiver(self):
         """Returns the receiver link associated with the event, or null if
            none is associated with it. This is essentially an alias for
-           link(), that does an additional checkon the type of the link."""
+           link(), that does an additional check on the type of the link."""
         l = self.link
         if l and l.is_receiver:
             return l
@@ -311,10 +375,7 @@ class Event(Wrapper, EventBase):
     @property
     def delivery(self):
         """Returns the delivery associated with the event, or null if none is associated with it."""
-        return Delivery.wrap(pn_event_delivery(self._impl))
-
-    def __repr__(self):
-        return "%s(%s)" % (self.type, self.context)
+        return self._delivery
 
 
 class LazyHandlers(object):
@@ -329,5 +390,10 @@ class LazyHandlers(object):
 class Handler(object):
     handlers = LazyHandlers()
 
+    # TODO What to do with on_error?
+    def add(self, handler, on_error=None):
+        """Add a child handler"""
+        self.handlers.append(handler)
+
     def on_unhandled(self, method, *args):
         pass
diff --git a/python/proton/_handlers.py b/python/proton/_handlers.py
index f8d5413..c946a3d 100644
--- a/python/proton/_handlers.py
+++ b/python/proton/_handlers.py
@@ -22,13 +22,15 @@ from __future__ import absolute_import
 import logging
 import time
 import weakref
-from select import select
 
 from ._delivery import Delivery
 from ._endpoints import Endpoint
-from ._message import Message
-from ._exceptions import ProtonException
 from ._events import Handler, _dispatch
+from ._exceptions import ProtonException
+from ._io import IO
+from ._message import Message
+from ._transport import Transport
+from ._url import Url
 
 log = logging.getLogger("proton")
 
@@ -672,15 +674,6 @@ CFlowController = FlowController
 CHandshaker = Handshaker
 
 
-from ._reactor_impl import WrappedHandler
-from cproton import pn_iohandler
-
-class IOHandler(WrappedHandler):
-
-    def __init__(self):
-        WrappedHandler.__init__(self, pn_iohandler)
-
-
 class PythonIO:
 
     def __init__(self):
@@ -726,13 +719,11 @@ class PythonIO:
             timeout = deadline - time.time()
         else:
             timeout = reactor.timeout
-        if (timeout < 0): timeout = 0
+        if timeout < 0: timeout = 0
         timeout = min(timeout, reactor.timeout)
-        readable, writable, _ = select(reading, writing, [], timeout)
+        readable, writable, _ = IO.select(reading, writing, [], timeout)
 
-        reactor.mark()
-
-        now = time.time()
+        now = reactor.mark()
 
         for s in readable:
             s.readable()
@@ -743,3 +734,192 @@ class PythonIO:
                 s.expired()
 
         reactor.yield_()
+
+
+# For C style IO handler need to implement Selector
+class IOHandler(Handler):
+
+    def __init__(self):
+        self._selector = IO.Selector()
+
+    def on_selectable_init(self, event):
+        s = event.selectable
+        self._selector.add(s)
+        s._reactor._selectables += 1
+
+    def on_selectable_updated(self, event):
+        s = event.selectable
+        self._selector.update(s)
+
+    def on_selectable_final(self, event):
+        s = event.selectable
+        self._selector.remove(s)
+        s._reactor._selectables -= 1
+        s.release()
+
+    def on_reactor_quiesced(self, event):
+        r = event.reactor
+
+        if not r.quiesced:
+            return
+
+        d = r.timer_deadline
+        readable, writable, expired = self._selector.select(r.timeout)
+
+        now = r.mark()
+
+        for s in readable:
+            s.readable()
+        for s in writable:
+            s.writable()
+        for s in expired:
+            s.expired()
+
+        r.yield_()
+
+    def on_selectable_readable(self, event):
+        s = event.selectable
+        t = s._transport
+
+        # If we're an acceptor we can't have a transport
+        # and we don't want to do anything here in any case
+        if not t:
+            return
+
+        capacity = t.capacity()
+        if capacity > 0:
+            try:
+                b = s.recv(capacity)
+                if len(b) > 0:
+                    n = t.push(b)
+                else:
+                    # EOF handling
+                    self.on_selectable_error(event)
+            except:
+                # TODO: What's the error handling to be here?
+                t.close_tail()
+
+        # Always update as we may have gone to not reading or from
+        # not writing to writing when processing the incoming bytes
+        r = s._reactor
+        self.update(t, s, r.now)
+
+    def on_selectable_writable(self, event):
+        s = event.selectable
+        t = s._transport
+
+        # If we're an acceptor we can't have a transport
+        # and we don't want to do anything here in any case
+        if not t:
+            return
+
+        pending = t.pending()
+        if pending > 0:
+
+            try:
+                n = s.send(t.peek(pending))
+                t.pop(n)
+            except:
+                # TODO: Error? or actually an exception
+                t.close_head()
+
+        newpending = t.pending()
+        if newpending != pending:
+            r = s._reactor
+            self.update(t, s, r.now)
+
+    def on_selectable_error(self, event):
+        s = event.selectable
+        t = s._transport
+
+        t.close_head()
+        t.close_tail()
+        s.terminate()
+        s.update()
+
+    def on_selectable_expired(self, event):
+        s = event.selectable
+        t = s._transport
+        r = s._reactor
+
+        self.update(t, s, r.now)
+
+    def on_connection_local_open(self, event):
+        c = event.connection
+        if not c.state & Endpoint.REMOTE_UNINIT:
+            return
+
+        t = Transport()
+        # It seems perverse, but the C code ignores bind errors too!
+        # and this is required or you get errors because Connector() has already
+        # bound the transport and connection!
+        t.bind_nothrow(c)
+
+    def on_connection_bound(self, event):
+        c = event.connection
+        t = event.transport
+
+        reactor = c._reactor
+
+        # link the new transport to its reactor:
+        t._reactor = reactor
+
+        if c._acceptor:
+            # this connection was created by the acceptor.  There is already a
+            # socket assigned to this connection.  Nothing needs to be done.
+            return
+
+        url = c.url or Url(c.hostname)
+        url.defaults()
+
+        host = url.host
+        port = url.port
+
+        if not c.user:
+            user = url.username
+            if user:
+                c.user = user
+            password = url.password
+            if password:
+                c.password = password
+
+        # TODO Currently this is synch and will throw if it cannot connect
+        # do we want to handle errors differently? or do it asynch?
+        sock = IO.connect(host, int(port))
+
+        s = reactor.selectable(delegate=sock)
+        s._transport = t
+        t._selectable = s
+        self.update(t, s, reactor.now)
+
+    @staticmethod
+    def update(transport, selectable, now):
+        try:
+            capacity = transport.capacity()
+            selectable.reading = capacity>0
+        except:
+            if transport.closed:
+                selectable.terminate()
+        try:
+            pending = transport.pending()
+            selectable.writing = pending>0
+        except:
+            if transport.closed:
+                selectable.terminate()
+        selectable.deadline = transport.tick(now)
+        selectable.update()
+
+    def on_transport(self, event):
+        t = event.transport
+        r = t._reactor
+        s = t._selectable
+        if s and not s.is_terminal:
+            self.update(t, s, r.now)
+
+    def on_transport_closed(self, event):
+        t = event.transport
+        r = t._reactor
+        s = t._selectable
+        s.terminate()
+        r.update(s)
+        t.unbind()
diff --git a/python/proton/_io.py b/python/proton/_io.py
new file mode 100644
index 0000000..401ba11
--- /dev/null
+++ b/python/proton/_io.py
@@ -0,0 +1,138 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+import socket
+import select
+import time
+
+PN_INVALID_SOCKET = -1
+
+class IO(object):
+
+    @staticmethod
+    def close(s):
+        s.close()
+
+    @staticmethod
+    def listen(host, port):
+        s = socket.socket()
+        s.bind((host, port))
+        s.listen(10)
+        return s
+
+    @staticmethod
+    def accept(s):
+        return s.accept()
+
+    @staticmethod
+    def connect(host, port):
+        return socket.create_connection((host, port))
+
+    @staticmethod
+    def select(*args, **kwargs):
+        return select.select(*args, **kwargs)
+
+    @staticmethod
+    def sleep(t):
+        time.sleep(t)
+        return
+
+    class Selector(object):
+
+        def __init__(self):
+            self._selectables = set()
+            self._reading = set()
+            self._writing = set()
+            self._deadline = None
+
+        def add(self, selectable):
+            self._selectables.add(selectable)
+            if selectable.reading:
+                self._reading.add(selectable)
+            if selectable.writing:
+                self._writing.add(selectable)
+            if selectable.deadline:
+                if self._deadline is None:
+                    self._deadline = selectable.deadline
+                else:
+                    self._deadline = min(selectable.deadline, self._deadline)
+
+        def remove(self, selectable):
+            self._selectables.discard(selectable)
+            self._reading.discard(selectable)
+            self._writing.discard(selectable)
+            self.update_deadline()
+
+        @property
+        def selectables(self):
+            return len(self._selectables)
+
+        def update_deadline(self):
+            for sel in self._selectables:
+                if sel.deadline:
+                    if self._deadline is None:
+                        self._deadline = sel.deadline
+                    else:
+                        self._deadline = min(sel.deadline, self._deadline)
+
+        def update(self, selectable):
+            self._reading.discard(selectable)
+            self._writing.discard(selectable)
+            if selectable.reading:
+                self._reading.add(selectable)
+            if selectable.writing:
+                self._writing.add(selectable)
+            self.update_deadline()
+
+        def select(self, timeout):
+
+            def select_inner(timeout):
+                r = self._reading
+                w = self._writing
+
+                now = time.time()
+
+                # No timeout or deadline
+                if timeout is None and self._deadline is None:
+                    return IO.select(r, w, [])
+
+                if timeout is None:
+                    t = max(0, self._deadline - now)
+                    return IO.select(r, w, [], t)
+
+                if self._deadline is None:
+                    return IO.select(r, w, [], timeout)
+
+                t = max(0, min(timeout, self._deadline - now))
+                if len(r)==0 and len(w)==0:
+                    if t > 0: IO.sleep(t)
+                    return ([],[],[])
+
+                return IO.select(r, w, [], t)
+
+            r, w, _ = select_inner(timeout)
+
+            # Calculate timed out selectables
+            now = time.time()
+            t = [s for s in self._selectables if s.deadline and now > s.deadline]
+            self._deadline = None
+            self.update_deadline()
+            return r, w, t
diff --git a/python/proton/_message.py b/python/proton/_message.py
index cca498f..2f3709a 100644
--- a/python/proton/_message.py
+++ b/python/proton/_message.py
@@ -39,7 +39,7 @@ from cproton import PN_DEFAULT_PRIORITY, PN_OVERFLOW, \
     pn_inspect, pn_string, pn_string_get, pn_free, pn_error_text
 
 from . import _compat
-from ._common import Constant, isinteger, secs2millis, millis2secs, unicode2utf8, utf82unicode
+from ._common import isinteger, secs2millis, millis2secs, unicode2utf8, utf82unicode
 from ._data import Data, ulong, symbol
 from ._endpoints import Link
 from ._exceptions import EXCEPTIONS, MessageException
@@ -52,7 +52,6 @@ except NameError:
     unicode = str
 
 
-
 class Message(object):
     """The L{Message} class is a mutable holder of message content.
 
@@ -432,7 +431,7 @@ The group-id for any replies.
         self.decode(dlv.encoded)
         return dlv
 
-    def __repr2__(self):
+    def __repr__(self):
         props = []
         for attr in ("inferred", "address", "reply_to", "durable", "ttl",
                      "priority", "first_acquirer", "delivery_count", "id",
@@ -442,11 +441,3 @@ The group-id for any replies.
             value = getattr(self, attr)
             if value: props.append("%s=%r" % (attr, value))
         return "Message(%s)" % ", ".join(props)
-
-    def __repr__(self):
-        tmp = pn_string(None)
-        err = pn_inspect(self._msg, tmp)
-        result = pn_string_get(tmp)
-        pn_free(tmp)
-        self._check(err)
-        return result
diff --git a/python/proton/_reactor.py b/python/proton/_reactor.py
index a47625f..6cf5305 100644
--- a/python/proton/_reactor.py
+++ b/python/proton/_reactor.py
@@ -19,21 +19,16 @@
 
 from __future__ import absolute_import
 
+from functools import total_ordering
+import heapq
 import json
-import os
 import logging
+import os
+import time
 import traceback
 import uuid
 
-from cproton import PN_MILLIS_MAX, PN_PYREF, PN_ACCEPTED, \
-    pn_reactor_stop, pn_selectable_attachments, pn_reactor_quiesced, pn_reactor_acceptor, \
-    pn_record_set_handler, pn_collector_put, pn_reactor_get_timeout, pn_task_cancel, pn_acceptor_set_ssl_domain, \
-    pn_record_get, pn_reactor_selectable, pn_task_attachments, pn_reactor_schedule, pn_acceptor_close, pn_py2void, \
-    pn_reactor_error, pn_reactor_attachments, pn_reactor_get_global_handler, pn_reactor_process, pn_reactor, \
-    pn_reactor_set_handler, pn_reactor_set_global_handler, pn_reactor_yield, pn_error_text, pn_reactor_connection, \
-    pn_cast_pn_reactor, pn_reactor_get_connection_address, pn_reactor_update, pn_reactor_collector, pn_void2py, \
-    pn_reactor_start, pn_reactor_set_connection_host, pn_cast_pn_task, pn_decref, pn_reactor_set_timeout, \
-    pn_reactor_mark, pn_reactor_get_handler, pn_reactor_wakeup
+from cproton import PN_PYREF, PN_ACCEPTED, PN_EVENT_NONE
 
 from ._delivery import  Delivery
 from ._endpoints import Connection, Endpoint, Link, Session, Terminus
@@ -42,159 +37,175 @@ from ._data import Described, symbol, ulong
 from ._message import  Message
 from ._transport import Transport, SSL, SSLDomain
 from ._url import Url
-from ._common import isstring, secs2millis, millis2secs, unicode2utf8, utf82unicode
-from ._events import EventType, EventBase, Handler
-from ._reactor_impl import Selectable, WrappedHandler, _chandler
-from ._wrapper import Wrapper, PYCTX
+from ._common import isstring, unicode2utf8, utf82unicode
+from ._events import Collector, EventType, EventBase, Handler, Event
+from ._selectable import Selectable
 
-from ._handlers import OutgoingMessageHandler
+from ._handlers import OutgoingMessageHandler, IOHandler
+
+from ._io import IO, PN_INVALID_SOCKET
 
 from . import _compat
 from ._compat import queue
 
-Logger = logging.getLogger("proton")
+
+_logger = logging.getLogger("proton")
 
 
 def _generate_uuid():
     return uuid.uuid4()
 
 
-def _timeout2millis(secs):
-    if secs is None: return PN_MILLIS_MAX
-    return secs2millis(secs)
+def _now():
+    return time.time()
 
+@total_ordering
+class Task(object):
 
-def _millis2timeout(millis):
-    if millis == PN_MILLIS_MAX: return None
-    return millis2secs(millis)
-
-
-class Task(Wrapper):
-
-    @staticmethod
-    def wrap(impl):
-        if impl is None:
-            return None
-        else:
-            return Task(impl)
+    def __init__(self, reactor, deadline, handler):
+        self._deadline = deadline
+        self._handler = handler
+        self._reactor = reactor
+        self._cancelled = False
 
-    def __init__(self, impl):
-        Wrapper.__init__(self, impl, pn_task_attachments)
-
-    def _init(self):
-        pass
+    def __lt__(self, rhs):
+        return self._deadline < rhs._deadline
 
     def cancel(self):
-        pn_task_cancel(self._impl)
+        self._cancelled = True
 
+    @property
+    def handler(self):
+        return self._handler
 
-class Acceptor(Wrapper):
+class TimerSelectable(Selectable):
 
-    def __init__(self, impl):
-        Wrapper.__init__(self, impl)
+    def __init__(self, reactor, collector):
+        super(TimerSelectable, self).__init__(None, reactor)
+        self.collect(collector)
+        collector.put(self, Event.SELECTABLE_INIT)
 
-    def set_ssl_domain(self, ssl_domain):
-        pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain)
+    def fileno(self):
+        return PN_INVALID_SOCKET
 
-    def close(self):
-        pn_acceptor_close(self._impl)
+    def readable(self):
+        pass
 
+    def writable(self):
+        pass
 
-class Reactor(Wrapper):
+    def expired(self):
+        self._reactor.timer_tick()
+        self.deadline = self._reactor.timer_deadline
+        self.update()
 
-    @staticmethod
-    def wrap(impl):
-        if impl is None:
-            return None
-        else:
-            record = pn_reactor_attachments(impl)
-            attrs = pn_void2py(pn_record_get(record, PYCTX))
-            if attrs and 'subclass' in attrs:
-                return attrs['subclass'](impl=impl)
-            else:
-                return Reactor(impl=impl)
+class Reactor(object):
 
     def __init__(self, *handlers, **kwargs):
-        Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments)
-        for h in handlers:
-            self.handler.add(h, on_error=self.on_error_delegate())
-
-    def _init(self):
+        self._previous = PN_EVENT_NONE
+        self._timeout = 0
+        self.mark()
+        self._yield = False
+        self._stop = False
+        self._collector = Collector()
+        self._selectable = None
+        self._selectables = 0
+        self._global_handler = IOHandler()
+        self._handler = Handler()
+        self._timerheap = []
+        self._timers = 0
         self.errors = []
-
-    # on_error relay handler tied to underlying C reactor.  Use when the
-    # error will always be generated from a callback from this reactor.
-    # Needed to prevent reference cycles and be compatible with wrappers.
-    class ErrorDelegate(object):
-        def __init__(self, reactor):
-            self.reactor_impl = reactor._impl
-
-        def on_error(self, info):
-            ractor = Reactor.wrap(self.reactor_impl)
-            ractor.on_error(info)
-
-    def on_error_delegate(self):
-        return Reactor.ErrorDelegate(self).on_error
+        for h in handlers:
+            self.handler.add(h, on_error=self.on_error)
 
     def on_error(self, info):
         self.errors.append(info)
         self.yield_()
 
+    # TODO: need to make this actually return a proxy which catches exceptions and calls
+    # on error.
+    # [Or arrange another way to deal with exceptions thrown by handlers]
+    def _make_handler(self, handler):
+        """
+        Return a proxy handler that dispatches to the provided handler.
+
+        If handler throws an exception then on_error is called with info
+        """
+        return handler
+
     def _get_global(self):
-        return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error_delegate())
+        return self._global_handler
 
     def _set_global(self, handler):
-        impl = _chandler(handler, self.on_error_delegate())
-        pn_reactor_set_global_handler(self._impl, impl)
-        pn_decref(impl)
+        self._global_handler = self._make_handler(handler)
 
     global_handler = property(_get_global, _set_global)
 
     def _get_timeout(self):
-        return _millis2timeout(pn_reactor_get_timeout(self._impl))
+        return self._timeout
 
     def _set_timeout(self, secs):
-        return pn_reactor_set_timeout(self._impl, _timeout2millis(secs))
+        self._timeout = secs
 
     timeout = property(_get_timeout, _set_timeout)
 
     def yield_(self):
-        pn_reactor_yield(self._impl)
+        self._yield = True
 
     def mark(self):
-        return pn_reactor_mark(self._impl)
+        """ This sets the reactor now instant to the current time """
+        self._now = _now()
+        return self._now
+
+    @property
+    def now(self):
+        return self._now
 
     def _get_handler(self):
-        return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error_delegate())
+        return self._handler
 
     def _set_handler(self, handler):
-        impl = _chandler(handler, self.on_error_delegate())
-        pn_reactor_set_handler(self._impl, impl)
-        pn_decref(impl)
+        self._handler = self._make_handler(handler)
 
     handler = property(_get_handler, _set_handler)
 
     def run(self):
+        # TODO: Why do we timeout like this?
         self.timeout = 3.14159265359
         self.start()
         while self.process(): pass
         self.stop()
         self.process()
-        self.global_handler = None
-        self.handler = None
+        # TODO: This isn't correct if we ever run again
+        self._global_handler = None
+        self._handler = None
 
+    # Cross thread reactor wakeup
     def wakeup(self):
-        n = pn_reactor_wakeup(self._impl)
-        if n: raise IOError(pn_error_text(pn_reactor_error(self._impl)))
+        # TODO: Do this with pipe and write?
+        #os.write(self._wakeup[1], "x", 1);
+        pass
 
     def start(self):
-        pn_reactor_start(self._impl)
+        self.push_event(self, Event.REACTOR_INIT)
+        self._selectable = TimerSelectable(self, self._collector)
+        self._selectable.deadline = self.timer_deadline
+        # TODO set up fd to read for wakeups - but problematic on windows
+        #self._selectable.fileno(self._wakeup[0])
+        #self._selectable.reading = True
+        self.update(self._selectable)
 
     @property
     def quiesced(self):
-        return pn_reactor_quiesced(self._impl)
+        event = self._collector.peek()
+        if not event:
+            return True
+        if self._collector.more():
+            return False
+        return event.type is Event.REACTOR_QUIESCED
 
     def _check_errors(self):
+        """ This """
         if self.errors:
             for exc, value, tb in self.errors[:-1]:
                 traceback.print_exception(exc, value, tb)
@@ -202,35 +213,104 @@ class Reactor(Wrapper):
             _compat.raise_(exc, value, tb)
 
     def process(self):
-        result = pn_reactor_process(self._impl)
-        self._check_errors()
-        return result
+        # result = pn_reactor_process(self._impl)
+        # self._check_errors()
+        # return result
+        self.mark()
+        previous = PN_EVENT_NONE
+        while True:
+            if self._yield:
+                self._yield = False
+                _logger.debug('%s Yielding', self)
+                return True
+            event = self._collector.peek()
+            if event:
+                _logger.debug('%s recvd Event: %r', self, event)
+                type = event.type
+
+                # regular handler
+                handler = event.handler or self._handler
+                event.dispatch(handler)
+
+                event.dispatch(self._global_handler)
+
+                previous = type
+                self._previous = type
+                self._collector.pop()
+            elif not self._stop and (self._timers > 0 or self._selectables > 1):
+                if previous is not Event.REACTOR_QUIESCED and self._previous is not Event.REACTOR_FINAL:
+                    self.push_event(self, Event.REACTOR_QUIESCED)
+                self.yield_()
+            else:
+                if self._selectable:
+                    self._selectable.terminate()
+                    self.update(self._selectable)
+                    self._selectable = None
+                else:
+                    if self._previous is not Event.REACTOR_FINAL:
+                        self.push_event(self, Event.REACTOR_FINAL)
+                    _logger.debug('%s Stopping', self)
+                    return False
 
     def stop(self):
-        pn_reactor_stop(self._impl)
+        self._stop = True
         self._check_errors()
 
-    def schedule(self, delay, task):
-        impl = _chandler(task, self.on_error_delegate())
-        task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl))
-        pn_decref(impl)
+    def stop_events(self):
+        self._collector.release()
+
+    def schedule(self, delay, handler):
+        himpl = self._make_handler(handler)
+        task = Task(self, self._now+delay, himpl)
+        heapq.heappush(self._timerheap, task)
+        self._timers += 1
+        deadline = self._timerheap[0]._deadline
+        if self._selectable:
+            self._selectable.deadline = deadline
+            self.update(self._selectable)
         return task
 
+    def timer_tick(self):
+        while self._timers > 0:
+            t = self._timerheap[0]
+            if t._cancelled:
+                heapq.heappop(self._timerheap)
+                self._timers -= 1
+            elif t._deadline > self._now:
+                return
+            else:
+                heapq.heappop(self._timerheap)
+                self._timers -= 1
+                self.push_event(t, Event.TIMER_TASK)
+
+    @property
+    def timer_deadline(self):
+        while self._timers > 0:
+            t = self._timerheap[0]
+            if t._cancelled:
+                heapq.heappop(self._timerheap)
+                self._timers -= 1
+            else:
+                return t._deadline
+        return None
+
     def acceptor(self, host, port, handler=None):
-        impl = _chandler(handler, self.on_error_delegate())
-        aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl)
-        pn_decref(impl)
-        if aimpl:
-            return Acceptor(aimpl)
+        impl = self._make_handler(handler)
+        a = Acceptor(self, unicode2utf8(host), int(port), impl)
+        if a:
+            return a
         else:
-            raise IOError("%s (%s:%s)" % (pn_error_text(pn_reactor_error(self._impl)), host, port))
+            raise IOError("%s (%s:%s)" % (str(self.errors), host, port))
 
     def connection(self, handler=None):
         """Deprecated: use connection_to_host() instead
         """
-        impl = _chandler(handler, self.on_error_delegate())
-        result = Connection.wrap(pn_reactor_connection(self._impl, impl))
-        if impl: pn_decref(impl)
+        impl = self._make_handler(handler)
+        result = Connection()
+        if impl:
+            result.handler = impl
+        result._reactor = self
+        result.collect(self._collector)
         return result
 
     def connection_to_host(self, host, port, handler=None):
@@ -247,10 +327,7 @@ class Reactor(Wrapper):
         used by the reactor's iohandler to create an outgoing socket
         connection.  This must be set prior to opening the connection.
         """
-        pn_reactor_set_connection_host(self._impl,
-                                       connection._impl,
-                                       unicode2utf8(str(host)),
-                                       unicode2utf8(str(port)))
+        connection.set_address(host, port)
 
     def get_connection_address(self, connection):
         """This may be used to retrieve the remote peer address.
@@ -258,29 +335,23 @@ class Reactor(Wrapper):
         address is available.  Use the proton.Url class to create a Url object
         from the returned value.
         """
-        _url = pn_reactor_get_connection_address(self._impl, connection._impl)
+        _url = connection.get_address()
         return utf82unicode(_url)
 
-    def selectable(self, handler=None):
-        impl = _chandler(handler, self.on_error_delegate())
-        result = Selectable.wrap(pn_reactor_selectable(self._impl))
-        if impl:
-            record = pn_selectable_attachments(result._impl)
-            pn_record_set_handler(record, impl)
-            pn_decref(impl)
+    def selectable(self, handler=None, delegate=None):
+        if delegate is None:
+            delegate = handler
+        result = Selectable(delegate, self)
+        result.collect(self._collector)
+        result.handler = handler
+        self.push_event(result, Event.SELECTABLE_INIT)
         return result
 
-    def update(self, sel):
-        pn_reactor_update(self._impl, sel._impl)
+    def update(self, selectable):
+        selectable.update()
 
     def push_event(self, obj, etype):
-        pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number)
-
-
-from ._events import wrappers as _wrappers
-
-_wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x))
-_wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x))
+        self._collector.put(obj, etype)
 
 
 class EventInjector(object):
@@ -296,6 +367,7 @@ class EventInjector(object):
     def __init__(self):
         self.queue = queue.Queue()
         self.pipe = os.pipe()
+        self._transport = None
         self._closed = False
 
     def trigger(self, event):
@@ -320,19 +392,19 @@ class EventInjector(object):
 
     def on_selectable_init(self, event):
         sel = event.context
-        sel.fileno(self.fileno())
+        #sel.fileno(self.fileno())
         sel.reading = True
-        event.reactor.update(sel)
+        sel.update()
 
     def on_selectable_readable(self, event):
+        s = event.context
         os.read(self.pipe[0], 512)
         while not self.queue.empty():
             requested = self.queue.get()
-            event.reactor.push_event(requested.context, requested.type)
+            s.push_event(requested.context, requested.type)
         if self._closed:
-            s = event.context
             s.terminate()
-            event.reactor.update(s)
+            s.update()
 
 
 class ApplicationEvent(EventBase):
@@ -342,7 +414,8 @@ class ApplicationEvent(EventBase):
     """
 
     def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None):
-        super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename))
+        super(ApplicationEvent, self).__init__(EventType(typename))
+        self.clazz = PN_PYREF
         self.connection = connection
         self.session = session
         self.link = link
@@ -355,6 +428,10 @@ class ApplicationEvent(EventBase):
             self.connection = self.session.connection
         self.subject = subject
 
+    @property
+    def context(self):
+        return self
+
     def __repr__(self):
         objects = [self.connection, self.session, self.link, self.delivery, self.subject]
         return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None]))
@@ -429,7 +506,7 @@ class Transaction(object):
             elif event.delivery.remote_state == Delivery.REJECTED:
                 self.handler.on_transaction_declare_failed(event)
             else:
-                Logger.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state)
+                _logger.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state)
                 self.handler.on_transaction_declare_failed(event)
         elif event.delivery == self._discharge:
             if event.delivery.remote_state == Delivery.REJECTED:
@@ -569,7 +646,7 @@ class SessionPerConnection(object):
         return self._default_session
 
 
-class GlobalOverrides(object):
+class GlobalOverrides(Handler):
     """
     Internal handler that triggers the necessary socket connect for an
     opened connection.
@@ -587,6 +664,49 @@ class GlobalOverrides(object):
         return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides)
 
 
+class Acceptor(Handler):
+
+    def __init__(self, reactor, host, port, handler=None):
+        self._ssl_domain = None
+        self._reactor = reactor
+        self._handler = handler
+        sock = IO.listen(host, port)
+        s = reactor.selectable(handler=self, delegate=sock)
+        s.reading = True
+        s._transport = None
+        self._selectable = s
+        reactor.update(s)
+
+    def set_ssl_domain(self, ssl_domain):
+        self._ssl_domain = ssl_domain
+
+    def close(self):
+        if not self._selectable.is_terminal:
+            IO.close(self._selectable)
+            self._selectable.terminate()
+            self._reactor.update(self._selectable)
+
+    def on_selectable_readable(self, event):
+        s = event.selectable
+
+        sock, name = IO.accept(self._selectable)
+        _logger.debug("Accepted connection from %s", name)
+
+        r = self._reactor
+        handler = self._handler or r.handler
+        c = r.connection(handler)
+        c._acceptor = self
+        c.url = Url(host=name[0], port=name[1])
+        t = Transport(Transport.SERVER)
+        if self._ssl_domain:
+            t.ssl(self._ssl_domain)
+        t.bind(c)
+
+        s = r.selectable(delegate=sock)
+        s._transport = t
+        t._selectable = s
+        IOHandler.update(t, s, r.now)
+
 class Connector(Handler):
     """
     Internal handler that triggers the necessary socket connect for an
@@ -608,14 +728,13 @@ class Connector(Handler):
         self.ssl_sni = None
         self.max_frame_size = None
 
-    def _connect(self, connection, reactor):
-        assert (reactor is not None)
+    def _connect(self, connection):
         url = self.address.next()
-        reactor.set_connection_host(connection, url.host, str(url.port))
+        connection.url = url
         # if virtual-host not set, use host from address as default
         if self.virtual_host is None:
             connection.hostname = url.host
-        Logger.debug("connecting to %r..." % url)
+        _logger.debug("connecting to %r..." % url)
 
         transport = Transport()
         if self.sasl_enabled:
@@ -643,10 +762,10 @@ class Connector(Handler):
             transport.max_frame_size = self.max_frame_size
 
     def on_connection_local_open(self, event):
-        self._connect(event.connection, event.reactor)
+        self._connect(event.connection)
 
     def on_connection_remote_open(self, event):
-        Logger.debug("connected to %s" % event.connection.hostname)
+        _logger.debug("connected to %s" % event.connection.hostname)
         if self.reconnect:
             self.reconnect.reset()
             self.transport = None
@@ -661,20 +780,20 @@ class Connector(Handler):
                 event.transport.unbind()
                 delay = self.reconnect.next()
                 if delay == 0:
-                    Logger.info("Disconnected, reconnecting...")
-                    self._connect(self.connection, event.reactor)
+                    _logger.info("Disconnected, reconnecting...")
+                    self._connect(self.connection)
                     return
                 else:
-                    Logger.info("Disconnected will try to reconnect after %s seconds" % delay)
+                    _logger.info("Disconnected will try to reconnect after %s seconds" % delay)
                     event.reactor.schedule(delay, self)
                     return
             else:
-                Logger.debug("Disconnected")
+                _logger.debug("Disconnected")
         # See connector.cpp: conn.free()/pn_connection_release() here?
         self.connection = None
 
     def on_timer_task(self, event):
-        self._connect(self.connection, event.reactor)
+        self._connect(self.connection)
 
 
 class Backoff(object):
@@ -727,7 +846,7 @@ class SSLConfig(object):
         self.client.set_trusted_ca_db(certificate_db)
         self.server.set_trusted_ca_db(certificate_db)
 
-def find_config_file():
+def _find_config_file():
     confname = 'connect.json'
     confpath = ['.', '~/.config/messaging','/etc/messaging']
     for d in confpath:
@@ -736,15 +855,15 @@ def find_config_file():
             return f
     return None
 
-def get_default_config():
-    conf = os.environ.get('MESSAGING_CONNECT_FILE') or find_config_file()
+def _get_default_config():
+    conf = os.environ.get('MESSAGING_CONNECT_FILE') or _find_config_file()
     if conf and os.path.isfile(conf):
         with open(conf, 'r') as f:
             return json.load(f)
     else:
         return {}
 
-def get_default_port_for_scheme(scheme):
+def _get_default_port_for_scheme(scheme):
     if scheme == 'amqps':
         return 5671
     else:
@@ -773,7 +892,6 @@ class Container(Reactor):
             self.sasl_enabled = True
             self.user = None
             self.password = None
-            Wrapper.__setattr__(self, 'subclass', self.__class__)
 
     def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None,
                 **kwargs):
@@ -825,9 +943,9 @@ class Container(Reactor):
 
         """
         if not url and not urls and not address:
-            config = get_default_config()
+            config = _get_default_config()
             scheme = config.get('scheme', 'amqp')
-            _url = "%s://%s:%s" % (scheme, config.get('host', 'localhost'), config.get('port', get_default_port_for_scheme(scheme)))
+            _url = "%s://%s:%s" % (scheme, config.get('host', 'localhost'), config.get('port', _get_default_port_for_scheme(scheme)))
             _ssl_domain = None
             _kwargs = kwargs
             if config.get('user'):
@@ -952,7 +1070,7 @@ class Container(Reactor):
             snd.source.address = source
         if target:
             snd.target.address = target
-        if handler != None:
+        if handler is not None:
             snd.handler = handler
         if tags:
             snd.tag_generator = tags
@@ -995,7 +1113,7 @@ class Container(Reactor):
             rcv.source.dynamic = True
         if target:
             rcv.target.address = target
-        if handler != None:
+        if handler is not None:
             rcv.handler = handler
         _apply_link_options(options, rcv)
         rcv.open()
diff --git a/python/proton/_reactor_impl.py b/python/proton/_reactor_impl.py
deleted file mode 100644
index 4ffebcd..0000000
--- a/python/proton/_reactor_impl.py
+++ /dev/null
@@ -1,217 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from __future__ import absolute_import
-
-import weakref
-
-from cproton import PN_INVALID_SOCKET, \
-    pn_incref, pn_decref, \
-    pn_handler_add, pn_handler_clear, pn_pyhandler, \
-    pn_selectable_is_reading, pn_selectable_attachments, pn_selectable_set_reading, \
-    pn_selectable_expired, pn_selectable_set_fd, pn_selectable_set_registered, pn_selectable_writable, \
-    pn_selectable_is_writing, pn_selectable_set_deadline, pn_selectable_is_registered, pn_selectable_terminate, \
-    pn_selectable_get_deadline, pn_selectable_is_terminal, pn_selectable_readable, \
-    pn_selectable_release, pn_selectable_set_writing, pn_selectable_get_fd
-
-from ._common import millis2secs, secs2millis
-from ._wrapper import Wrapper
-
-from . import _compat
-
-_DEFAULT = False
-
-
-class Selectable(Wrapper):
-
-    @staticmethod
-    def wrap(impl):
-        if impl is None:
-            return None
-        else:
-            return Selectable(impl)
-
-    def __init__(self, impl):
-        Wrapper.__init__(self, impl, pn_selectable_attachments)
-
-    def _init(self):
-        pass
-
-    def fileno(self, fd=_DEFAULT):
-        if fd is _DEFAULT:
-            return pn_selectable_get_fd(self._impl)
-        elif fd is None:
-            pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET)
-        else:
-            pn_selectable_set_fd(self._impl, fd)
-
-    def _is_reading(self):
-        return pn_selectable_is_reading(self._impl)
-
-    def _set_reading(self, val):
-        pn_selectable_set_reading(self._impl, bool(val))
-
-    reading = property(_is_reading, _set_reading)
-
-    def _is_writing(self):
-        return pn_selectable_is_writing(self._impl)
-
-    def _set_writing(self, val):
-        pn_selectable_set_writing(self._impl, bool(val))
-
-    writing = property(_is_writing, _set_writing)
-
-    def _get_deadline(self):
-        tstamp = pn_selectable_get_deadline(self._impl)
-        if tstamp:
-            return millis2secs(tstamp)
-        else:
-            return None
-
-    def _set_deadline(self, deadline):
-        pn_selectable_set_deadline(self._impl, secs2millis(deadline))
-
-    deadline = property(_get_deadline, _set_deadline)
-
-    def readable(self):
-        pn_selectable_readable(self._impl)
-
-    def writable(self):
-        pn_selectable_writable(self._impl)
-
-    def expired(self):
-        pn_selectable_expired(self._impl)
-
-    def _is_registered(self):
-        return pn_selectable_is_registered(self._impl)
-
-    def _set_registered(self, registered):
-        pn_selectable_set_registered(self._impl, registered)
-
-    registered = property(_is_registered, _set_registered,
-                          doc="""
-The registered property may be get/set by an I/O polling system to
-indicate whether the fd has been registered or not.
-""")
-
-    @property
-    def is_terminal(self):
-        return pn_selectable_is_terminal(self._impl)
-
-    def terminate(self):
-        pn_selectable_terminate(self._impl)
-
-    def release(self):
-        pn_selectable_release(self._impl)
-
-
-class _cadapter:
-
-    def __init__(self, handler, on_error=None):
-        self.handler = handler
-        self.on_error = on_error
-
-    def dispatch(self, cevent, ctype):
-        from ._events import Event
-        ev = Event.wrap(cevent, ctype)
-        ev.dispatch(self.handler)
-
-    def exception(self, exc, val, tb):
-        if self.on_error is None:
-            _compat.raise_(exc, val, tb)
-        else:
-            self.on_error((exc, val, tb))
-
-
-class WrappedHandlersChildSurrogate:
-    def __init__(self, delegate):
-        self.handlers = []
-        self.delegate = weakref.ref(delegate)
-
-    def on_unhandled(self, method, event):
-        from ._events import _dispatch
-        delegate = self.delegate()
-        if delegate:
-            _dispatch(delegate, method, event)
-
-
-class WrappedHandlersProperty(object):
-    def __get__(self, obj, clazz):
-        if obj is None:
-            return None
-        return self.surrogate(obj).handlers
-
-    def __set__(self, obj, value):
-        self.surrogate(obj).handlers = value
-
-    def surrogate(self, obj):
-        key = "_surrogate"
-        objdict = obj.__dict__
-        surrogate = objdict.get(key, None)
-        if surrogate is None:
-            objdict[key] = surrogate = WrappedHandlersChildSurrogate(obj)
-            obj.add(surrogate)
-        return surrogate
-
-
-class WrappedHandler(Wrapper):
-    handlers = WrappedHandlersProperty()
-
-    @classmethod
-    def wrap(cls, impl, on_error=None):
-        if impl is None:
-            return None
-        else:
-            handler = cls(impl)
-            handler.__dict__["on_error"] = on_error
-            return handler
-
-    def __init__(self, impl_or_constructor):
-        Wrapper.__init__(self, impl_or_constructor)
-        if list(self.__class__.__mro__).index(WrappedHandler) > 1:
-            # instantiate the surrogate
-            self.handlers.extend([])
-
-    def _on_error(self, info):
-        on_error = getattr(self, "on_error", None)
-        if on_error is None:
-            _compat.raise_(info[0], info[1], info[2])
-        else:
-            on_error(info)
-
-    def add(self, handler, on_error=None):
-        if handler is None: return
-        if on_error is None: on_error = self._on_error
-        impl = _chandler(handler, on_error)
-        pn_handler_add(self._impl, impl)
-        pn_decref(impl)
-
-    def clear(self):
-        pn_handler_clear(self._impl)
-
-
-def _chandler(obj, on_error=None):
-    if obj is None:
-        return None
-    elif isinstance(obj, WrappedHandler):
-        impl = obj._impl
-        pn_incref(impl)
-        return impl
-    else:
-        return pn_pyhandler(_cadapter(obj, on_error))
diff --git a/python/proton/_selectable.py b/python/proton/_selectable.py
new file mode 100644
index 0000000..2125f7d
--- /dev/null
+++ b/python/proton/_selectable.py
@@ -0,0 +1,93 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import absolute_import
+
+
+from ._events import Event
+
+class Selectable(object):
+
+    def __init__(self, delegate, reactor):
+        self._delegate = delegate
+        self.reading = False
+        self.writing = False
+        self._deadline = 0
+        self._terminal = False
+        self._terminated = False
+        self._collector = None
+        self._reactor = reactor
+
+    def release(self):
+        if self._delegate:
+            self._delegate.close()
+
+    def __getattr__(self, name):
+        if self._delegate:
+            return getattr(self._delegate, name)
+        else:
+            return None
+
+    def _get_deadline(self):
+        tstamp = self._deadline
+        if tstamp:
+            return tstamp
+        else:
+            return None
+
+    def _set_deadline(self, deadline):
+        if not deadline:
+            self._deadline = 0
+        else:
+            self._deadline = deadline
+
+    deadline = property(_get_deadline, _set_deadline)
+
+    def collect(self, collector):
+        self._collector = collector
+
+    def push_event(self, context, type):
+        self._collector.put(context, type)
+
+    def update(self):
+        if not self._terminated:
+            if self._terminal:
+                self._terminated = True
+                self.push_event(self, Event.SELECTABLE_FINAL)
+            else:
+                self.push_event(self, Event.SELECTABLE_UPDATED)
+
+    def readable(self):
+        if self._collector:
+            self.push_event(self, Event.SELECTABLE_READABLE)
+
+    def writable(self):
+        if self._collector:
+            self.push_event(self, Event.SELECTABLE_WRITABLE)
+
+    def expired(self):
+        if self._collector:
+            self.push_event(self, Event.SELECTABLE_EXPIRED)
+
+    @property
+    def is_terminal(self):
+        return self._terminal
+
+    def terminate(self):
+        self._terminal = True
diff --git a/python/proton/_transport.py b/python/proton/_transport.py
index 3db0078..182fde6 100644
--- a/python/proton/_transport.py
+++ b/python/proton/_transport.py
@@ -88,6 +88,7 @@ class Transport(Wrapper):
     def _init(self):
         self._sasl = None
         self._ssl = None
+        self._reactor = None
 
     def _check(self, err):
         if err < 0:
@@ -136,6 +137,10 @@ A callback for trace logging. The callback is passed the transport and log messa
         """Assign a connection to the transport"""
         self._check(pn_transport_bind(self._impl, connection._impl))
 
+    def bind_nothrow(self, connection):
+        """Assign a connection to the transport"""
+        pn_transport_bind(self._impl, connection._impl)
+
     def unbind(self):
         """Release the connection"""
         self._check(pn_transport_unbind(self._impl))
diff --git a/python/proton/_utils.py b/python/proton/_utils.py
index 6462b55..38639bb 100644
--- a/python/proton/_utils.py
+++ b/python/proton/_utils.py
@@ -23,8 +23,6 @@ import collections
 import time
 import threading
 
-from cproton import pn_reactor_collector, pn_collector_release
-
 from ._exceptions import ProtonException, ConnectionException, LinkException, Timeout
 from ._delivery import Delivery
 from ._endpoints import Endpoint, Link
@@ -284,7 +282,7 @@ class BlockingConnection(Handler):
             self.run()
             self.conn = None
             self.container.global_handler = None  # break circular ref: container to cadapter.on_error
-            pn_collector_release(pn_reactor_collector(self.container._impl))  # straggling event may keep reactor alive
+            self.container.stop_events()
             self.container = None
 
     def _is_closed(self):
diff --git a/python/proton/_wrapper.py b/python/proton/_wrapper.py
index 4ee98e8..1e7a33a 100644
--- a/python/proton/_wrapper.py
+++ b/python/proton/_wrapper.py
@@ -43,6 +43,21 @@ EMPTY_ATTRS = EmptyAttrs()
 
 
 class Wrapper(object):
+    """ Wrapper for python objects that need to be stored in event contexts and be retrived again from them
+        Quick note on how this works:
+        The actual *python* object has only 3 attributes which redirect into the wrapped C objects:
+        _impl   The wrapped C object itself
+        _attrs  This is a special pn_record_t holding a PYCTX which is a python dict
+                every attribute in the python object is actually looked up here
+        _record This is the C record itself (so actually identical to _attrs really but
+                a different python type
+
+        Because the objects actual attributes are stored away they must be initialised *after* the wrapping
+        is set up. This is the purpose of the _init method in the wrapped  object. Wrapper.__init__ will call
+        eht subclass _init to initialise attributes. So they *must not* be initialised in the subclass __init__
+        before calling the superclass (Wrapper) __init__ or they will not be accessible from the wrapper at all.
+
+    """
 
     def __init__(self, impl_or_constructor, get_context=None):
         init = False
diff --git a/python/tests/proton_tests/handler.py b/python/tests/proton_tests/handler.py
index 89376ad..2324073 100644
--- a/python/tests/proton_tests/handler.py
+++ b/python/tests/proton_tests/handler.py
@@ -98,7 +98,7 @@ class HandlerTest(common.Test):
     reactor.handler.handlers.append(root)
 
   def event_root(self, event):
-    return event.root
+    return event.handler
 
   def event_reactor_handler(self, event):
     return event.reactor.handler
diff --git a/python/tests/proton_tests/reactor.py b/python/tests/proton_tests/reactor.py
index 907f1fc..923af2d 100644
--- a/python/tests/proton_tests/reactor.py
+++ b/python/tests/proton_tests/reactor.py
@@ -374,8 +374,8 @@ class ContainerTest(Test):
 
             def on_connection_opened(self, event):
                 event.connection.close()
-                assert event.container == event.reactor
-                assert event.container == container
+                assert event.container is event.reactor
+                assert event.container is container
         container.connect(test_handler.url, handler=ConnectionHandler())
         container.run()
 
@@ -418,7 +418,7 @@ class ContainerTest(Test):
             self.listener = event.container.listen("%s:%s" % (self.host, self.port))
 
         def on_connection_opened(self, event):
-            self.client_addr = event.reactor.get_connection_address(event.connection)
+            self.client_addr = event.connected_address
             self.peer_hostname = event.connection.remote_hostname
 
         def on_connection_closing(self, event):
@@ -431,7 +431,7 @@ class ContainerTest(Test):
             self.server_addr = None
 
         def on_connection_opened(self, event):
-            self.server_addr = event.reactor.get_connection_address(event.connection)
+            self.server_addr = event.connected_address
             event.connection.close()
 
     def test_numeric_hostname(self):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message