qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject [01/38] qpid-proton git commit: PROTON-881: Initial commit of proton-j reactor implementation
Date Sun, 05 Jul 2015 23:45:02 GMT
Repository: qpid-proton
Updated Branches:
  refs/heads/master e4c99b936 -> 46edaebeb


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
new file mode 100644
index 0000000..08aca1f
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
@@ -0,0 +1,333 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor.impl;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.engine.impl.TransportImpl;
+import org.apache.qpid.proton.reactor.Reactor;
+import org.apache.qpid.proton.reactor.Selectable;
+import org.apache.qpid.proton.reactor.Selectable.Callback;
+import org.apache.qpid.proton.reactor.Selector;
+
+public class IOHandler extends BaseHandler {
+
+    // pni_handle_quiesced from connection.c
+    private void handleQuiesced(Reactor reactor, Selector selector) throws IOException {
+        // check if we are still quiesced, other handlers of
+        // PN_REACTOR_QUIESCED could have produced more events to process
+        if (!reactor.quiesced()) return;
+        selector.select(reactor.getTimeout());
+        reactor.mark();
+        Iterator<Selectable> selectables = selector.readable();
+        while(selectables.hasNext()) {
+            selectables.next().readable();
+        }
+        selectables = selector.writeable();
+        while(selectables.hasNext()) {
+            selectables.next().writeable();
+        }
+        selectables = selector.expired();
+        while(selectables.hasNext()) {
+            selectables.next().expired();
+        }
+        selectables = selector.error();
+        while(selectables.hasNext()) {
+            selectables.next().error();
+        }
+        reactor.yield();
+    }
+
+    // pni_handle_open(...) from connection.c
+    private void handleOpen(Event event) {
+        Connection connection = event.getConnection();
+        if (connection.getRemoteState() != EndpointState.UNINITIALIZED) {
+            return;
+        }
+        Transport transport = Proton.transport();
+        Sasl sasl = transport.sasl();
+        sasl.setMechanisms("ANONYMOUS");
+        transport.bind(connection);
+    }
+
+    // pni_handle_bound(...) from connection.c
+    private void handleBound(Reactor reactor, Event event) {
+        Connection connection = event.getConnection();
+        String hostname = connection.getHostname();
+        if (hostname == null) {
+            return;
+        }
+
+        int colonIndex = hostname.indexOf(':');
+        int port = 5672;
+        if (colonIndex >= 0) {
+            port = Integer.parseInt(hostname.substring(colonIndex+1));  // TODO: this can
throw NumberFormatException on malformed input!
+            hostname = hostname.substring(0, colonIndex);
+        }
+
+        Transport transport = event.getTransport();
+        Socket socket = null;   // TODO: null is our equivalent of PN_INVALID_SOCKET
+        try {
+            socket = new Socket(hostname, port);
+        } catch(IOException ioException) {
+            ErrorCondition condition = transport.getCondition();
+            condition.setCondition(Symbol.getSymbol("proton:io"));
+            condition.setDescription(ioException.getMessage());
+            transport.close_tail();
+            transport.close_head();
+        }
+        selectableTransport(reactor, socket, transport);
+    }
+
+    // pni_connection_capacity from connection.c
+    private int capacity(Selectable selectable) {
+        Transport transport = selectable.getTransport();
+        int capacity = transport.capacity();
+        if (capacity < 0) {
+            if (transport.isClosed()) {
+                selectable.terminate();
+            }
+        }
+        return capacity;
+    }
+
+    // pni_connection_pending from connection.c
+    private int pending(Selectable selectable) {
+        Transport transport = selectable.getTransport();
+        int pending = transport.pending();
+        if (pending < 0) {
+            if (transport.isClosed()) {
+                selectable.terminate();
+            }
+        }
+        return pending;
+    }
+
+    // pni_connection_deadline from connection.c
+    private long deadline(Selectable selectable) {
+        Reactor reactor = selectable.getReactor();
+        Transport transport = selectable.getTransport();
+        long deadline = transport.tick(reactor.now());
+        return deadline;
+    }
+
+    // pni_connection_update from connection.c
+    private void update(Selectable selectable) {
+        int c = capacity(selectable);
+        int p = pending(selectable);
+        selectable.setReading(c > 0);
+        selectable.setWriting(p > 0);
+        selectable.setDeadline(deadline(selectable));
+    }
+
+    // pni_connection_readable from connection.c
+    private class ConnectionReadable implements Callback {
+        @Override
+        public void run(Selectable selectable) {
+            Reactor reactor = selectable.getReactor();
+            Transport transport = selectable.getTransport();
+            int capacity = transport.capacity();
+            if (capacity > 0) {
+                // TODO: we shouldn't be doing this cast.  Instead - selectable should return
an
+                //       object with 1) a getter for the SelectableChannel, 2) read/write
methods.
+                SocketChannel socketChannel = (SocketChannel)selectable.getChannel();
+                try {
+                    int n = socketChannel.read(transport.tail());
+                    if (n == -1) {
+                        transport.close_tail();
+                    } else {
+                        transport.process();
+                    }
+                } catch (IOException e) {
+                    ErrorCondition condition = transport.getCondition();
+                    condition.setCondition(Symbol.getSymbol("proton:io"));
+                    condition.setDescription(e.getMessage());
+                    transport.close_tail();
+                }
+            }
+            // TODO: comment from C code...
+            // occasionally transport events aren't generated when expected, so
+            // the following hack ensures we always update the selector
+            update(selectable);
+            reactor.update(selectable);
+        }
+    }
+
+    // pni_connection_writable from connection.c
+    private class ConnectionWritable implements Callback {
+        @Override
+        public void run(Selectable selectable) {
+            Reactor reactor = selectable.getReactor();
+            Transport transport = selectable.getTransport();
+            int pending = transport.pending();
+            if (pending > 0) {
+                SocketChannel channel = (SocketChannel)selectable.getChannel(); // TODO:
can't rely on this cast always working!
+                try {
+                    int n = channel.write(transport.head());
+                    if (n < 0) {
+                        transport.close_head();
+                    } else {
+                        transport.pop(n);
+                    }
+                } catch(IOException ioException) {
+                    ErrorCondition condition = transport.getCondition();
+                    condition.setCondition(Symbol.getSymbol("proton:io"));
+                    condition.setDescription(ioException.getMessage());
+                    transport.close_head();
+                }
+            }
+
+            int newPending = transport.pending();
+            if (newPending != pending) {
+                update(selectable);
+                reactor.update(selectable);
+            }
+        }
+    }
+
+    // pni_connection_error from connection.c
+    private class ConnectionError implements Callback {
+        @Override
+        public void run(Selectable selectable) {
+            Reactor reactor = selectable.getReactor();
+            selectable.terminate();
+            reactor.update(selectable);
+        }
+
+    }
+
+    // pni_connection_expired from connection.c
+    private class ConnectionExpired implements Callback {
+        @Override
+        public void run(Selectable selectable) {
+            Reactor reactor = selectable.getReactor();
+            Transport transport = selectable.getTransport();
+            long deadline = transport.tick(reactor.now());
+            selectable.setDeadline(deadline);
+            int c = capacity(selectable);
+            int p = pending(selectable);
+            selectable.setReading(c > 0);
+            selectable.setWriting(p > 0);
+            reactor.update(selectable);
+        }
+    }
+
+    private class ConnectionFinalize implements Callback {
+        @Override
+        public void run(Selectable selectable) {
+            try {
+                selectable.getChannel().close();
+            } catch(IOException ioException) {
+                ioException.printStackTrace();
+                // TODO: what now?
+            }
+        }
+    }
+
+    // pn_reactor_selectable_transport
+    private Selectable selectableTransport(Reactor reactor, Socket socket, Transport transport)
{
+        // TODO: this code needs to be able to deal with a null socket (this is our equivalent
of PN_INVALID_SOCKET)
+        Selectable selectable = reactor.selectable();
+        selectable.setChannel(socket.getChannel());
+        selectable.onReadable(new ConnectionReadable());
+        selectable.onWritable(new ConnectionWritable());
+        selectable.onError(new ConnectionError());
+        selectable.onExpired(new ConnectionExpired());
+        selectable.onFinalize(new ConnectionFinalize());    // TODO: the corresponding selectable._finalize
method is never called anywhere in the C codebase!
+        selectable.setTransport(transport);
+        ((TransportImpl)transport).setSelectable(selectable);
+        ((TransportImpl)transport).setReactor(reactor);
+        update(selectable);
+        reactor.update(selectable);
+        return selectable;
+    }
+
+    private void handleTransport(Reactor reactor, Event event) {
+        TransportImpl transport = (TransportImpl)event.getTransport();
+        Selectable selectable = transport.getSelectable();
+        if (selectable != null && !selectable.isTerminal()) {
+            update(selectable);
+            reactor.update(selectable);
+        }
+    }
+
+    @Override
+    public void onUnhandled(Event event) {
+        try {
+            ReactorImpl reactor = (ReactorImpl)event.getReactor();
+            Selector selector = reactor.getSelector();
+            if (selector == null) {
+                selector = new SelectorImpl();     // TODO: the C code supplies the reactor's
pn_io object here...
+                reactor.setSelector(selector);
+            }
+
+            Selectable selectable;
+            switch(event.getType()) {
+            case SELECTABLE_INIT:
+                selectable = event.getSelectable();
+                selector.add(selectable);
+                break;
+            case SELECTABLE_UPDATED:
+                selectable = event.getSelectable();
+                selector.update(selectable);
+                break;
+            case SELECTABLE_FINAL:
+                selectable = event.getSelectable();
+                selector.remove(selectable);
+                selectable.release();
+                break;
+            case CONNECTION_LOCAL_OPEN:
+                handleOpen(event);
+                break;
+            case CONNECTION_BOUND:
+                handleBound(reactor, event);
+                break;
+            case TRANSPORT:
+                handleTransport(reactor, event);
+                break;
+            case TRANSPORT_CLOSED:
+                event.getTransport().unbind();
+                break;
+            case REACTOR_QUIESCED:
+                handleQuiesced(reactor, selector);
+                break;
+            default:
+                break;
+            }
+        } catch(IOException e) {
+            e.printStackTrace();
+            // TODO: not clear what to do with this!
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
new file mode 100644
index 0000000..5072958
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
@@ -0,0 +1,445 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Pipe;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Event.Type;
+import org.apache.qpid.proton.engine.Handler;
+import org.apache.qpid.proton.engine.impl.CollectorImpl;
+import org.apache.qpid.proton.engine.impl.HandlerEndpointImpl;
+import org.apache.qpid.proton.reactor.Reactor;
+import org.apache.qpid.proton.reactor.Selectable;
+import org.apache.qpid.proton.reactor.Selectable.Callback;
+import org.apache.qpid.proton.reactor.Selectable.RecordKeyType;
+import org.apache.qpid.proton.reactor.Selectable.RecordValueType;
+import org.apache.qpid.proton.reactor.Selector;
+import org.apache.qpid.proton.reactor.Task;
+
+public class ReactorImpl implements Reactor {
+
+    /*
+     *   pn_record_t *attachments;
+ 41   pn_io_t *io;
+ 42   pn_collector_t *collector;
+ 43   pn_handler_t *global;
+ 44   pn_handler_t *handler;
+ 45   pn_list_t *children;
+ 46   pn_timer_t *timer;
+ 47   pn_socket_t wakeup[2];
+ 48   pn_selectable_t *selectable;
+ 49   pn_event_type_t previous;
+ 50   pn_timestamp_t now;
+ 51   int selectables;
+ 52   int timeout;
+ 53   bool yield;
+     */
+
+    private Object attachment;
+    private CollectorImpl collector;
+    private long now;
+    private long timeout;
+    private Handler global;
+    private Handler handler;
+    private Set<Selectable> children;
+    private int selectables;
+    private boolean yield;
+    private Selectable selectable;
+    private Type previous;
+    private Timer timer;
+    private final Pipe wakeup;
+
+    @Override
+    public long mark() {
+        now = System.currentTimeMillis();
+        return now;
+    }
+
+    @Override
+    public long now() {
+        return now;
+    }
+/*
+ * tatic void pn_reactor_initialize(pn_reactor_t *reactor) {
+ 68   reactor->attachments = pn_record();
+ 69   reactor->io = pn_io();    TODO: pn_io most literally translates to SocketFactory
(and possibly also ServerSocketFactory...)
+ 70   reactor->collector = pn_collector();
+ 71   reactor->global = pn_iohandler();
+ 72   reactor->handler = pn_handler(NULL);
+ 73   reactor->children = pn_list(PN_OBJECT, 0);
+ 74   reactor->timer = pn_timer(reactor->collector);
+ 75   reactor->wakeup[0] = PN_INVALID_SOCKET;
+ 76   reactor->wakeup[1] = PN_INVALID_SOCKET;
+ 77   reactor->selectable = NULL;
+ 78   reactor->previous = PN_EVENT_NONE;
+ 79   reactor->selectables = 0;
+ 80   reactor->timeout = 0;
+ 81   reactor->yield = false;
+ 82   pn_reactor_mark(reactor);
+ 83 }
+ 84  */
+    public ReactorImpl() throws IOException {
+        collector = (CollectorImpl)Proton.collector();
+        global = new IOHandler();
+        handler = new BaseHandler();
+        children = new HashSet<Selectable>();
+        selectables = 0;
+        timer = new Timer(collector);
+        wakeup = Pipe.open();
+        mark();
+    }
+    /*
+ 85 static void pn_reactor_finalize(pn_reactor_t *reactor) {
+ 86   for (int i = 0; i < 2; i++) {
+ 87     if (reactor->wakeup[i] != PN_INVALID_SOCKET) {
+ 88       pn_close(reactor->io, reactor->wakeup[i]);
+ 89     }
+ 90   }
+ 91   pn_decref(reactor->attachments);
+ 92   pn_decref(reactor->collector);
+ 93   pn_decref(reactor->global);
+ 94   pn_decref(reactor->handler);
+ 95   pn_decref(reactor->children);
+ 96   pn_decref(reactor->timer);
+ 97   pn_decref(reactor->io);
+ 98 }
+ */
+
+    @Override
+    public void attach(Object attachment) {
+        this.attachment = attachment;
+    }
+
+    @Override
+    public Object attachment() {
+        return attachment;
+    }
+
+    @Override
+    public long getTimeout() {
+        return timeout;
+    }
+
+    @Override
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+    @Override
+    public Handler getGlobalHandler() {
+        return global;
+    }
+
+    @Override
+    public void setGlobalHandler(Handler handler) {
+        global = handler;
+    }
+
+    @Override
+    public Handler getHandler() {
+        return handler;
+    }
+
+    @Override
+    public void setHandler(Handler handler) {
+        this.handler = handler;
+    }
+
+/* TODO
+ * pn_io_t *pn_reactor_io(pn_reactor_t *reactor) {
+166   assert(reactor);
+167   return reactor->io;
+168 }
+169
+
+ */
+
+    @Override
+    public Set<Selectable> children() {
+        return children;
+    }
+
+    @Override
+    public Collector collector() {
+        return collector;
+    }
+
+    private class ReleaseCallback implements Callback {
+        private final ReactorImpl reactor;
+        public ReleaseCallback(ReactorImpl reactor) {
+            this.reactor = reactor;
+        }
+        @Override
+        public void run(Selectable selectable) {
+            if (reactor.children.remove(selectable)) {
+                --reactor.selectables;
+            }
+        }
+    }
+
+    @Override
+    public Selectable selectable() {
+        Selectable result = new SelectableImpl();
+        result.setCollector(collector);
+        collector.put(Type.SELECTABLE_INIT, result);
+        result.setReactor(this);
+        children.add(result);
+        result.onRelease(new ReleaseCallback(this));
+        ++selectables;
+        return result;
+    }
+
+
+
+    @Override
+    public void update(Selectable selectable) {
+        if (!selectable.hasRecord(RecordKeyType.PNI_TERMINATED)) {
+            if (selectable.isTerminal()) {
+                selectable.setRecord(RecordKeyType.PNI_TERMINATED, RecordValueType.PN_VOID);
+                collector.put(Type.SELECTABLE_FINAL, selectable);
+            } else {
+                collector.put(Type.SELECTABLE_UPDATED, selectable);
+            }
+        }
+    }
+
+    // TODO: pn_record_get_handler
+    // TODO: pn_record_set_handler
+    // TODO: pn_class_reactor
+    // TODO: pn_object_reactor
+    // TODO: pn_event_reactor
+
+    // pn_event_handler - TODO: this is copied from the Reactor.java code, so might need
some tweaks...
+    private Handler eventHandler(Event event) {
+        Handler result;
+        if (event.getLink() != null) {
+            result = ((HandlerEndpointImpl)event.getLink()).getHandler();
+            if (result != null) return result;
+        }
+        if (event.getSession() != null) {
+            result = ((HandlerEndpointImpl)event.getSession()).getHandler();
+            if (result != null) return result;
+        }
+        if (event.getConnection() != null) {
+            result = ((HandlerEndpointImpl)event.getConnection()).getHandler();
+            if (result != null) return result;
+        }
+//        if (event.getTransport() != null) { // TODO: do we want to allow handlers to be
added to the Transport object?
+//            result = ((EndpointImpl)event.getTransport()).getHandlers();
+//            if (result.hasNext()) return result;
+//        }
+
+        if (event.getTask() != null) {
+            result = event.getTask().getHandler();
+            if (result != null) return result;
+        }
+
+        if (event.getSelectable() != null) {
+            result = event.getSelectable().getHandler();
+            if (result != null) return result;
+        }
+
+        return handler;
+    }
+
+
+    @Override
+    public void yield() {
+        yield = true;
+    }
+
+    @Override
+    public boolean quiesced() {
+        Event event = collector.peek();
+        if (event == null) return true;
+        if (collector.more()) return false;
+        return event.getType() == Type.REACTOR_QUIESCED;
+    }
+
+    @Override
+    public boolean process() {
+        mark();
+        Type previous = null;
+        while (true) {
+            Event event = collector.peek();
+            if (event != null) {
+                if (yield) {
+                    yield = false;
+                    return true;
+                }
+                yield = false;  // TODO: is this required?
+                Handler handler = eventHandler(event);
+                event.dispatch(handler);
+                event.dispatch(global);
+
+                if (event.getType() == Type.CONNECTION_FINAL) { // TODO: this should be the
same as the pni_reactor_dispatch_post logic...
+                    children.remove(event.getConnection());
+                }
+                this.previous = event.getType();
+                previous = this.previous;
+                collector.pop();
+
+            } else {
+                if (more()) {
+                    if (previous != Type.REACTOR_QUIESCED && this.previous != Type.REACTOR_FINAL)
{
+                        collector.put(Type.REACTOR_QUIESCED, this);
+                    } else {
+                        return true;
+                    }
+                } else {
+                    if (selectable != null) {
+                        selectable.terminate();
+                        update(selectable);
+                        selectable = null;
+                    } else {
+                        return false;
+                    }
+                }
+            }
+        }
+    }
+
+
+    @Override
+    public void wakeup() throws IOException {
+        //selector.wakeup();
+        wakeup.sink().write(ByteBuffer.allocate(1));    // TODO: c version returns a value!
+    }
+
+    @Override
+    public void start() {
+        collector.put(Type.REACTOR_INIT, this);
+        selectable = timerSelectable();
+        //selectable.setDeadline(now + timeout);      // TODO: this isn't in the C code...
+    }
+
+    @Override
+    public void stop() {
+        collector.put(Type.REACTOR_FINAL, this);
+        // (Comment from C code) XXX: should consider removing this fron stop to avoid reentrance
+        process();
+        collector = null;
+    }
+
+    private boolean more() {
+        return timer.tasks() > 0 || selectables > 1;
+    }
+
+    @Override
+    public void run() {
+        setTimeout(3141);   // TODO: eh?
+        start();
+        while(process()) {}
+        stop();
+    }
+
+    // pn_reactor_schedule from reactor.c
+    @Override
+    public Task schedule(int delay, Handler handler) {
+        Task task = timer.schedule(now + delay);
+        task.setReactor(this);
+        task.setHandler(handler);
+        if (selectable != null) {
+            selectable.setDeadline(timer.deadline());
+            update(selectable);
+        }
+        return task;
+    }
+    // TODO: acceptor
+    // TODO: connection
+    // TODO: acceptorClose
+
+    private class TimerReadable implements Callback {
+
+        @Override
+        public void run(Selectable selectable) {
+            // TODO: the implication is that this will be called when the selectable is woken-up
+/*
+  434 static void pni_timer_readable(pn_selectable_t *sel) {
+  435   char buf[64];
+  436   pn_reactor_t *reactor = pni_reactor(sel);
+  437   pn_socket_t fd = pn_selectable_get_fd(sel);
+  438   pn_read(reactor->io, fd, buf, 64);
+  439   pni_timer_expired(sel);
+  440 }
+ */
+            // TODO: this could be more elegant...
+            new TimerExpired().run(selectable);
+        }
+
+    }
+
+    private class TimerExpired implements Callback {
+        @Override
+        public void run(Selectable selectable) {
+            ReactorImpl reactor = (ReactorImpl) selectable.getReactor();
+            reactor.timer.tick(reactor.now);
+            selectable.setDeadline(reactor.timer.deadline());
+            reactor.update(selectable);
+        }
+    }
+
+
+    // pni_timer_finalize from reactor.c
+    private class TimerFinalize implements Callback {
+        @Override
+        public void run(Selectable selectable) {
+            try {
+                selectable.getChannel().close();
+            } catch(IOException e) {
+                e.printStackTrace();
+                // TODO: no idea what to do here...
+            }
+        }
+    }
+
+    private Selectable timerSelectable() {
+        Selectable sel = selectable();
+        sel.setChannel(wakeup.source());
+        sel.onReadable(new TimerReadable());
+        sel.onExpired(new TimerExpired());
+        sel.onFinalize(new TimerFinalize());    // TODO: not sure the corresponding sel._finalize()
gets called anywhere...
+        sel.setReading(true);
+        sel.setDeadline(timer.deadline());
+        update(sel);
+        return sel;
+    }
+
+    // TODO: the C code allows records to be associated with a Reactor and the Selector is
get/set using that capability.
+    private Selector selector;
+
+    protected Selector getSelector() {
+        return selector;
+    }
+
+    protected void setSelector(Selector selector) {
+        this.selector = selector;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
new file mode 100644
index 0000000..cf3839d
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
@@ -0,0 +1,272 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor.impl;
+
+import java.nio.channels.SelectableChannel;
+import java.util.HashMap;
+
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Event.Type;
+import org.apache.qpid.proton.engine.Handler;
+import org.apache.qpid.proton.engine.Transport;
+import org.apache.qpid.proton.engine.impl.CollectorImpl;
+import org.apache.qpid.proton.reactor.Reactor;
+import org.apache.qpid.proton.reactor.Selectable;
+
+public class SelectableImpl implements Selectable {
+
+    private Callback readable;
+    private Callback writable;
+    private Callback error;
+    private Callback expire;
+    private Callback release;
+    private Callback finalize;
+
+    private boolean reading = false;
+    private boolean writing = false;
+    private long deadline = 0;
+    private SelectableChannel channel;
+    private Object attachment;
+    private boolean registered;
+    private Reactor reactor;
+    private Transport transport;
+    private boolean terminal;
+
+    @Override
+    public boolean isReading() {
+        return reading;
+    }
+
+    @Override
+    public boolean isWriting() {
+        return writing;
+    }
+
+    @Override
+    public long getDeadline() {
+        return deadline;
+    }
+
+    @Override
+    public void setReading(boolean reading) {
+        this.reading = reading;
+    }
+
+    @Override
+    public void setWriting(boolean writing) {
+        this.writing = writing;
+    }
+
+    @Override
+    public void setDeadline(long deadline) {
+        this.deadline = deadline;
+    }
+
+    @Override
+    public void onReadable(Callback runnable) {
+        this.readable = runnable;
+    }
+
+    @Override
+    public void onWritable(Callback runnable) {
+        this.writable = runnable;
+    }
+
+    @Override
+    public void onExpired(Callback runnable) {
+        this.expire = runnable;
+    }
+
+    @Override
+    public void onError(Callback runnable) {
+        this.error = runnable;
+    }
+
+    @Override
+    public void onRelease(Callback runnable) {
+        this.release = runnable;
+    }
+
+    @Override
+    public void onFinalize(Callback runnable) {
+        this.finalize = runnable;
+    }
+
+    @Override
+    public void readable() {
+        if (readable != null) {
+            readable.run(this);
+        }
+    }
+
+    @Override
+    public void writeable() {
+        if (writable != null) {
+            writable.run(this);
+        }
+    }
+
+    @Override
+    public void expired() {
+        if (expire != null) {
+            expire.run(this);
+        }
+    }
+
+    @Override
+    public void error() {
+        if (error != null) {
+            error.run(this);
+        }
+    }
+
+    @Override
+    public void release() {
+        if (release != null) {
+            release.run(this);
+        }
+    }
+
+    @Override
+    public void _finalize() {
+        if (finalize != null) {
+            finalize.run(this);
+        }
+    }
+
+    // These are equivalent to the C code's set/get file descritor functions.
+    @Override
+    public void setChannel(SelectableChannel channel) {
+        this.channel = channel;
+    }
+
+    @Override
+    public SelectableChannel getChannel() {
+        return channel;
+    }
+
+    @Override
+    public void setAttachment(Object attachment) {
+        this.attachment = attachment;
+    }
+
+    @Override
+    public Object getAttachment() {
+        return attachment;
+    }
+
+    @Override
+    public boolean isRegistered() {
+        return registered;
+    }
+
+    @Override
+    public void setRegistered(boolean registered) {
+        this.registered = registered;
+    }
+
+    @Override
+    public void setCollector(final Collector collector) {
+        final CollectorImpl collectorImpl = (CollectorImpl)collector;
+
+        onReadable(new Callback() {
+            @Override
+            public void run(Selectable selectable) {
+                collectorImpl.put(Type.SELECTABLE_READABLE, selectable);
+            }
+        });
+        onWritable(new Callback() {
+            @Override
+            public void run(Selectable selectable) {
+                collectorImpl.put(Type.SELECTABLE_WRITABLE, selectable);
+            }
+        });
+        onExpired(new Callback() {
+            @Override
+            public void run(Selectable selectable) {
+                collectorImpl.put(Type.SELECTABLE_EXPIRED, selectable);
+            }
+        });
+        onError(new Callback() {
+            @Override
+            public void run(Selectable selectable) {
+                collectorImpl.put(Type.SELECTABLE_ERROR, selectable);
+            }
+        });
+    }
+
+    @Override
+    public Reactor getReactor() {  // TODO: the C version uses set/getContext for this -
should we do the same?
+        return reactor;
+    }
+
+    @Override
+    public void terminate() {
+        terminal = true;
+    }
+
+    private final HashMap<RecordKeyType, RecordValueType> records = new HashMap<RecordKeyType,
RecordValueType>();
+
+    @Override
+    public boolean hasRecord(RecordKeyType type) {
+        return records.containsKey(type);
+    }
+
+    @Override
+    public void setRecord(RecordKeyType key, RecordValueType value) {
+        records.put(key, value);
+    }
+
+    @Override
+    public boolean isTerminal() {
+        return terminal;
+    }
+
+
+    @Override
+    public Transport getTransport() {
+        return transport;
+    }
+
+    @Override
+    public void setTransport(Transport transport) {
+        this.transport = transport;
+    }
+
+    @Override
+    public void setReactor(Reactor reactor) {
+        this.reactor = reactor;
+    }
+
+    // TODO: all this gets stuffed into records in the C code...
+    private BaseHandler _handler = new BaseHandler();
+    @Override
+    public void add(Handler handler) {
+        _handler.add(handler);
+    }
+
+    @Override
+    public Handler getHandler() {
+        return _handler;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
new file mode 100644
index 0000000..c74853e
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
@@ -0,0 +1,137 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor.impl;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.apache.qpid.proton.reactor.Selectable;
+import org.apache.qpid.proton.reactor.Selector;
+
+public class SelectorImpl implements Selector {
+
+    private final java.nio.channels.Selector selector;
+    private final HashSet<Selectable> selectables = new HashSet<Selectable>();
+    private final HashSet<Selectable> readable = new HashSet<Selectable>();
+    private final HashSet<Selectable> writeable = new HashSet<Selectable>();
+    private final HashSet<Selectable> expired = new HashSet<Selectable>();
+    private final HashSet<Selectable> error = new HashSet<Selectable>();
+
+    public SelectorImpl() throws IOException {
+        selector = java.nio.channels.Selector.open();
+    }
+
+    @Override
+    public void add(Selectable selectable) throws IOException {
+        selectable.getChannel().configureBlocking(false);
+        SelectionKey key = selectable.getChannel().register(selector, 0);
+        key.attach(selectable);
+        selectables.add(selectable);
+        update(selectable);
+    }
+
+    @Override
+    public void update(Selectable selectable) {
+        int interestedOps = 0;
+        if (selectable.isReading()) interestedOps |= SelectionKey.OP_READ;
+        if (selectable.isWriting()) interestedOps |= SelectionKey.OP_WRITE;
+        SelectionKey key = selectable.getChannel().keyFor(selector);
+        key.interestOps(interestedOps);
+    }
+
+    @Override
+    public void remove(Selectable selectable) {
+        SelectionKey key = selectable.getChannel().keyFor(selector);
+        key.cancel();
+        key.attach(null);
+        selectables.remove(selectable);
+    }
+
+    @Override
+    public void select(long timeout) throws IOException {
+        if (timeout > 0) {
+            long deadline = 0;
+            for (Selectable selectable : selectables) {    // TODO: this differs from the
C code which requires a call to update() to make deadline changes take affect
+                long d = selectable.getDeadline();
+                if (d > 0) {
+                    deadline = (deadline == 0) ? d : Math.min(deadline,  d);
+                }
+            }
+
+            if (deadline > 0) {
+                long now = System.currentTimeMillis();
+                long delta = deadline - now;
+                if (delta < 0) {
+                    timeout = 0;
+                } else if (delta < timeout) {
+                    timeout = delta;
+                }
+            }
+        }
+
+        if (timeout > 0) {
+            selector.select(timeout);
+        } else {
+            selector.selectNow();
+        }
+        long awoken = System.currentTimeMillis();
+
+        readable.clear();
+        writeable.clear();
+        expired.clear();
+        error.clear();  // TODO: nothing ever gets put in here...
+        for (SelectionKey key : selector.selectedKeys()) {
+            Selectable selectable = (Selectable)key.attachment();
+            if (key.isReadable()) readable.add(selectable);
+            if (key.isWritable()) writeable.add(selectable);
+        }
+        for (Selectable selectable : selectables) {    // TODO: this is different to the
C code which evaluates expiry at the point the selectable is iterated over.
+            long deadline = selectable.getDeadline();
+            if (deadline > 0 && awoken >= deadline) {
+                expired.add(selectable);
+            }
+        }
+    }
+
+    @Override
+    public Iterator<Selectable> readable() {
+        return readable.iterator();
+    }
+
+    @Override
+    public Iterator<Selectable> writeable() {
+        return writeable.iterator();
+    }
+
+    @Override
+    public Iterator<Selectable> expired() {
+        return expired.iterator();
+    }
+
+    @Override
+    public Iterator<Selectable> error() {
+        return error.iterator();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
new file mode 100644
index 0000000..3f650c2
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor.impl;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.proton.engine.Handler;
+import org.apache.qpid.proton.reactor.Reactor;
+import org.apache.qpid.proton.reactor.Task;
+
+public class TaskImpl implements Task, Comparable<TaskImpl> {
+    private final long deadline;
+    private final int counter;
+    private final AtomicInteger count = new AtomicInteger();
+
+    public TaskImpl(long deadline) {
+        this.deadline = deadline;
+        this.counter = count.getAndIncrement();
+    }
+    @Override
+    public int compareTo(TaskImpl other) {
+        int result;
+        if (deadline < other.deadline) {
+            result = -1;
+        } else if (deadline > other.deadline) {
+            result = 1;
+        } else {
+            result = counter - other.counter;
+        }
+        return result;
+    }
+    @Override
+    public long deadline() {
+        return deadline;
+    }
+    private Reactor reactor;
+    @Override
+    public void setReactor(Reactor reactor) {
+        this.reactor = reactor;
+
+    }
+    @Override
+    public Reactor getReactor() {
+        return reactor;
+    }
+    private Handler handler;
+    @Override
+    public void setHandler(Handler handler) {
+        this.handler = handler;
+    }
+    @Override
+    public Handler getHandler() {
+        return handler;
+    }
+    private Object attachment;
+    @Override
+    public Object getAttachment() {
+        return attachment;
+    }
+    @Override
+    public void setAttachment(Object attachment) {
+        this.attachment = attachment;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
new file mode 100644
index 0000000..32bb4f6
--- /dev/null
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.reactor.impl;
+
+import java.util.PriorityQueue;
+
+import org.apache.qpid.proton.engine.Collector;
+import org.apache.qpid.proton.engine.Event.Type;
+import org.apache.qpid.proton.engine.impl.CollectorImpl;
+import org.apache.qpid.proton.reactor.Task;
+
+public class Timer {
+
+    private CollectorImpl collector;
+    private PriorityQueue<Task> tasks = new PriorityQueue<Task>();
+
+    public Timer(Collector collector) {
+        this.collector = (CollectorImpl)collector;
+    }
+
+    Task schedule(long deadline) {
+        TaskImpl task = new TaskImpl(deadline);
+        tasks.add(task);
+        return task;
+    }
+
+    long deadline() {
+        if (tasks.size() > 0) {
+            Task task = tasks.peek();
+            return task.deadline();
+        } else {
+            return 0;
+        }
+    }
+
+    void tick(long now) {
+        while(!tasks.isEmpty()) {
+            Task task = tasks.peek();
+            if (now >= task.deadline()) {
+                tasks.poll();
+                collector.put(Type.TIMER_TASK, task);
+            } else {
+                break;
+            }
+        }
+    }
+
+    int tasks() {
+        return tasks.size();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message