qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject [05/50] [abbrv] qpid-proton git commit: PROTON-881: Tidy up and doc reactor interfaces
Date Tue, 08 Sep 2015 16:37:23 GMT
PROTON-881: Tidy up and doc reactor interfaces

Tidy up the Java interfaces for the proton-j reactor by removing a few
unnecessary setters.  Document the interfaces using Javadoc.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/2f8728a8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/2f8728a8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/2f8728a8

Branch: refs/heads/proton-go
Commit: 2f8728a85fbea23db73fe60e42038782f89a5517
Parents: 513f152
Author: Adrian Preston <prestona@uk.ibm.com>
Authored: Wed Jul 1 00:30:53 2015 +0100
Committer: Rafael Schloming <rhs@alum.mit.edu>
Committed: Sun Jul 5 19:57:39 2015 -0400

----------------------------------------------------------------------
 .../qpid/proton/example/reactor/HelloWorld.java |   2 +-
 .../apache/qpid/proton/reactor/Acceptor.java    |   9 +
 .../qpid/proton/reactor/FlowController.java     |   4 +
 .../apache/qpid/proton/reactor/Handshaker.java  |   7 +
 .../org/apache/qpid/proton/reactor/Reactor.java | 278 +++++++++++++++----
 .../qpid/proton/reactor/ReactorChild.java       |   5 +-
 .../apache/qpid/proton/reactor/Selectable.java  | 227 +++++++++++----
 .../apache/qpid/proton/reactor/Selector.java    |  80 +++++-
 .../org/apache/qpid/proton/reactor/Task.java    |  20 +-
 .../qpid/proton/reactor/impl/AcceptorImpl.java  |   2 +-
 .../qpid/proton/reactor/impl/IOHandler.java     |  21 +-
 .../qpid/proton/reactor/impl/ReactorImpl.java   |  18 +-
 .../proton/reactor/impl/SelectableImpl.java     |   9 +-
 .../qpid/proton/reactor/impl/SelectorImpl.java  |   2 +-
 .../qpid/proton/reactor/impl/TaskImpl.java      |   3 +-
 15 files changed, 552 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java
----------------------------------------------------------------------
diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java
b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java
index 745004e..055d6df 100644
--- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java
+++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java
@@ -33,7 +33,7 @@ import org.apache.qpid.proton.reactor.Reactor;
  * The proton reactor provides a general purpose event processing
  * library for writing reactive programs. A reactive program is defined
  * by a set of event handlers. An event handler is just any class or
- * object that extends the Handler interface. For convinience, a class
+ * object that extends the Handler interface. For convenience, a class
  * can extend BaseHandler and only handle the events that it cares to
  * implement methods for.
  */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java
index 744f4cb..222ce40 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java
@@ -23,8 +23,17 @@ package org.apache.qpid.proton.reactor;
 
 import org.apache.qpid.proton.engine.Extendable;
 
+/**
+ * Acceptors are children of a {@link Reactor} that accept in-bound network
+ * connections.
+ */
 public interface Acceptor extends ReactorChild, Extendable {
 
+    /**
+     * Closes the acceptor, stopping it accepting any further in-bound
+     * connections.  Already accepted connections continue to be processed by
+     * the associated reactor.
+     */
     void close();
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java
index c8b999b..716b2a7 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java
@@ -26,6 +26,10 @@ import org.apache.qpid.proton.engine.Event;
 import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.engine.Receiver;
 
+/**
+ * A handler that applies flow control to a connection.  This handler tops-up
+ * link credit each time credit is expended by the receipt of messages.
+ */
 public class FlowController extends BaseHandler {
 
     private int drained;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java
index f9b670a..cbd496e 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java
@@ -26,6 +26,13 @@ import org.apache.qpid.proton.engine.Endpoint;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Event;
 
+/**
+ * A handler that mirrors the actions of the remote end of a connection.  This
+ * handler responds in kind when the remote end of the connection is opened and
+ * closed.  Likewise if the remote end of the connection opens or closes
+ * sessions and links, this handler responds by opening or closing the local end
+ * of the session or link.
+ */
 public class Handshaker extends BaseHandler {
 
     private void open(Endpoint endpoint) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
index 3201c5a..f2a38a5 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java
@@ -24,15 +24,26 @@ package org.apache.qpid.proton.reactor;
 import java.io.IOException;
 import java.util.Set;
 
+import org.apache.qpid.proton.engine.BaseHandler;
 import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Event.Type;
 import org.apache.qpid.proton.engine.Handler;
 import org.apache.qpid.proton.engine.HandlerException;
 import org.apache.qpid.proton.engine.Record;
 import org.apache.qpid.proton.reactor.impl.ReactorImpl;
 
-
-
+/**
+ * The proton reactor provides a general purpose event processing
+ * library for writing reactive programs. A reactive program is defined
+ * by a set of event handlers. An event handler is just any class or
+ * object that extends the Handler interface. For convenience, a class
+ * can extend {@link BaseHandler} and only handle the events that it cares to
+ * implement methods for.
+ * <p>
+ * This class is not thread safe (with the exception of the {@link #wakeup()}
+ * method) and should only be used by a single thread at any given time.
+ */
 public interface Reactor {
 
     public static final class Factory
@@ -42,55 +53,228 @@ public interface Reactor {
         }
     }
 
-    public long mark();
-    public long now();
-    public Record attachments();
-    public long getTimeout();
-
-    public void setTimeout(long timeout);
-
-    public Handler getGlobalHandler();
-
-    public void setGlobalHandler(Handler handler);
-
-    public Handler getHandler();
-
-    public void setHandler(Handler handler);
-
-    public Set<ReactorChild> children();
-
-    public Collector collector();
-
-
-    public Selectable selectable();
-
-
-
-    public void update(Selectable selectable);
-
-
+    /**
+     * Updates the last time that the reactor's state has changed, potentially
+     * resulting in events being generated.
+     * @return the current time in milliseconds
+     *         {@link System#currentTimeMillis()}.
+     */
+    long mark();
+
+    /** @return the last time that {@link #mark()} was called. */
+    long now();
+
+    /** @return an instance of {@link Record} that can be used to associate
+     *          other objects (attachments) with this instance of the
+     *          Reactor class.
+     */
+    Record attachments();
+
+    /**
+     * @param timeout a timeout value, to associate with this instance of
+     *        the reactor.  This can be retrieved using the
+     *        {@link #getTimeout()} method.
+     */
+    void setTimeout(long timeout);
+
+    /**
+     * @return the value previously set using {@link #setTimeout(long)} or
+     *         0 if no previous value has been set.
+     */
+    long getTimeout();
+
+    /**
+     * @return the global handler for this reactor.  Every event the reactor
+     *         sees is dispatched to the global handler.  To receive every
+     *         event generated by the reactor, associate a child handler
+     *         with the global handler.  For example:
+     *         <pre>
+     *            getGlobalHandler().add(yourHandler);
+     *         </pre>
+     */
+    Handler getGlobalHandler();
+
+    /**
+     * Sets a new global handler.  You probably don't want to do this and
+     * would be better adding a handler to the value returned by the
+     * {{@link #getGlobalHandler()} method.
+     * @param handler the new global handler.
+     */
+    void setGlobalHandler(Handler handler);
+
+    /**
+     * @return the handler for this reactor.  Every event the reactor sees,
+     *         which is not handled by a child of the reactor (such as a
+     *         timer, connection, acceptor, or selector) is passed to this
+     *         handler.  To receive these events, it is recommend that you
+     *         associate a child handler with the handler returned by this
+     *         method.  For example:
+     *         <pre>
+     *           getHandler().add(yourHandler);
+     *         </pre>
+     */
+    Handler getHandler();
+
+    /**
+     * Sets a new handler, that will receive any events not handled by a child
+     * of the reactor.  Note that setting a handler via this method replaces
+     * the previous handler, and will result in no further events being
+     * dispatched to the child handlers associated with the previous handler.
+     * For this reason it is recommended that you do not use this method and
+     * instead add child handlers to the value returned by the
+     * {@link #getHandler()} method.
+     * @param handler the new handler for this reactor.
+     */
+    void setHandler(Handler handler);
+
+    /**
+     * @return a set containing the child objects associated wit this reactor.
+     *         This will contain any active instances of: {@link Task} - created
+     *         using the {@link #schedule(int, Handler)} method,
+     *         {@link Connection} - created using the
+     *         {@link #connection(Handler)} method, {@link Acceptor} -
+     *         created using the {@link #acceptor(String, int)} method.
+     *         {@link #acceptor(String, int, Handler)} method, or
+     *         {@link Selectable} - created using the {@link #selectable()}
+     *         method.
+     */
+    Set<ReactorChild> children();
+
+    /**
+     * @return the Collector used to gather events generated by this reactor.
+     */
+    Collector collector();
+
+    /**
+     * Creates a new <code>Selectable</code> as a child of this reactor.
+     * @return the newly created <code>Selectable</code>.
+     */
+    Selectable selectable();
+
+    /**
+     * Updates the specified <code>Selectable</code> either emitting a
+     * {@link Type#SELECTABLE_UPDATED} event if the selectable is not terminal,
+     * or {@link Type#SELECTABLE_FINAL} if the selectable is terminal and has
+     * not already emitted a {@link Type#SELECTABLE_FINAL} event.
+     * @param selectable
+     */
+    void update(Selectable selectable);
+
+    /**
+     * Yields, causing the next call to {@link #process()} to return
+     * successfully - without processing any events.  If multiple calls
+     * can be made to <code>yield</code> and only the next invocation of
+     * {@link #process()} will be affected.
+     */
     void yield() ;
 
-    public boolean quiesced();
-
-    public boolean process() throws HandlerException;
-
-    public void wakeup() throws IOException;
-
-    public void start() ;
-
-    public void stop() throws HandlerException;
-
-    public void run() throws HandlerException;
-
-    // pn_reactor_schedule from reactor.c
-    public Task schedule(int delay, Handler handler);
-
+    /**
+     * @return <code>true</code> if the reactor is in quiesced state (e.g. has
+     *         no events to process).  <code>false</code> is returned otherwise.
+     */
+    boolean quiesced();
+
+    /**
+     * Process any events pending for this reactor.  Events are dispatched to
+     * the handlers registered with the reactor, or child objects associated
+     * with the reactor.  This method blocks until the reactor has no more work
+     * to do (and no more work pending, in terms of scheduled tasks or open
+     * selectors to process).
+     * @return <code>true</code> if the reactor may have more events in the
+     *         future.  For example: if there are scheduled tasks, or open
+     *         selectors.  <code>false</code> is returned if the reactor has
+     *         (and will have) no more events to process.
+     * @throws HandlerException if an unchecked exception is thrown by one of
+     *         the handlers - it will be re-thrown attached to an instance of
+     *         <code>HandlerException</code>.
+     */
+    boolean process() throws HandlerException;
+
+    /**
+     * Wakes up the thread (if any) blocked in the {@link #process()} method.
+     * This is the only method of this class that is thread safe, in that it
+     * can be used at the same time as another thread is using the reactor.
+     */
+    void wakeup();
+
+    /**
+     * Starts the reactor.  This method should be invoked before the first call
+     * to {@link #process()}.
+     */
+    void start();
+
+    /**
+     * Stops the reactor.  This method should be invoked after the last call to
+     * {@link #process()}.
+     * @throws HandlerException
+     */
+    void stop() throws HandlerException;
+
+    /**
+     * Simplifies the use of the reactor by wrapping the use of
+     * <code>start</code>, <code>run</code>, and <code>stop</code>
method
+     * calls.
+     * <p>
+     * Logically the implementation of this method is:
+     * <pre>
+     *   start();
+     *   while(process()) {}
+     *   stop();
+     * </pre>
+     * @throws HandlerException if an unchecked exception is thrown by one of
+     *         the handlers - it will be re-thrown attached to an instance of
+     *         <code>HandlerException</code>.
+     */
+    void run() throws HandlerException;
+
+    /**
+     * Schedules execution of a task to take place at some point in the future.
+     * @param delay the number of milliseconds, in the future, to schedule the
+     *              task for.
+     * @param handler a handler to associate with the task.  This is notified
+     *                when the deadline for the task is reached.
+     * @return an object representing the task that has been scheduled.
+     */
+    Task schedule(int delay, Handler handler);
+
+    /**
+     * Creates a new out-bound connection.
+     * @param handler a handler that is notified when events occur for the
+     *                connection.  Typically the host and port to connect to
+     *                would be supplied to the connection object inside the
+     *                logic which handles the {@link Type#CONNECTION_INIT}
+     *                event.
+     * @return the newly created connection object.
+     */
     Connection connection(Handler handler);
 
+    /**
+     * Creates a new acceptor.  This is equivalent to calling:
+     * <pre>
+     *   acceptor(host, port, null);
+     * </pre>
+     * @param host
+     * @param port
+     * @return the newly created acceptor object.
+     * @throws IOException
+     */
     Acceptor acceptor(String host, int port) throws IOException;
-    Acceptor acceptor(String host, int port, Handler handler) throws IOException;
 
-    // This also frees any children that the reactor has!
-    public void free();
+    /**
+     * Creates a new acceptor.  This acceptor listens for in-bound connections.
+     * @param host the host name or address of the NIC to listen on.
+     * @param port the port number to listen on.
+     * @param handler if non-<code>null</code> this handler is registered with
+     *                each new connection accepted by the acceptor.
+     * @return the newly created acceptor object.
+     * @throws IOException
+     */
+    Acceptor acceptor(String host, int port, Handler handler)
+            throws IOException;
+
+    /**
+     * Frees any resources (such as sockets and selectors) held by the reactor
+     * or its children.
+     */
+    void free();
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
index c39bdd9..146ee09 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
@@ -21,8 +21,11 @@
 
 package org.apache.qpid.proton.reactor;
 
-// Interface used to identify classes that can be a child of a reactor.
+/**
+ * Interface used to identify classes that can be a child of a reactor.
+ */
 public interface ReactorChild {
 
+    /** Frees any resources associated with this child. */
     void free();
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
index fa459d1..e91a0ee 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
@@ -25,72 +25,197 @@ import java.nio.channels.SelectableChannel;
 
 import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Extendable;
-import org.apache.qpid.proton.engine.Transport;
 
+/**
+ * An entity that can be multiplexed using a {@link Selector}.
+ * <p>
+ * Every selectable is associated with exactly one {@link SelectableChannel}.
+ * Selectables may be interested in three kinds of events: read events, write
+ * events, and timer events. A selectable will express its interest in these
+ * events through the {@link #isReading()}, {@link #isWriting()}, and
+ * {@link #getDeadline()} methods.
+ * <p>
+ * When a read, write, or timer event occurs, the selectable must be notified by
+ * calling {@link #readable()}, {@link #writeable()}, or {@link #expired()} as
+ * appropriate.
+ *
+ * Once a selectable reaches a terminal state (see {@link #isTerminal()}, it
+ * will never be interested in events of any kind. When this occurs it should be
+ * removed from the Selector and discarded using {@link #free()}.
+ */
 public interface Selectable extends ReactorChild, Extendable {
 
-    public interface Callback {
+    /**
+     * A callback that can be passed to the various "on" methods of the
+     * selectable - to allow code to be run when the selectable becomes ready
+     * for the associated operation.
+     */
+    interface Callback {
         void run(Selectable selectable);
     }
 
-    public boolean isReading();
-
+    /**
+     * @return <code>true</code> if the selectable is interested in receiving
+     *         notification (via the {@link #readable()} method that indicate
+     *         that the associated {@link SelectableChannel} has data ready
+     *         to be read from it.
+     */
+    boolean isReading();
+
+    /**
+     * @return <code>true</code> if the selectable is interested in receiving
+     *         notifications (via the {@link #writeable()} method that indicate
+     *         that the associated {@link SelectableChannel} is ready to be
+     *         written to.
+     */
     boolean isWriting();
 
-    long getDeadline() ;
-
-    void setReading(boolean reading) ;
-
+    /**
+     * @return a deadline after which this selectable can expect to receive
+     *         a notification (via the {@link #expired()} method that indicates
+     *         that the deadline has past.  The deadline is expressed in the
+     *         same format as {@link System#currentTimeMillis()}.  Returning
+     *         a deadline of zero (or a negative number) indicates that the
+     *         selectable does not wish to be notified of expiry.
+     */
+    long getDeadline();
+
+    /**
+     * Sets the value that will be returned by {@link #isReading()}.
+     * @param reading
+     */
+    void setReading(boolean reading);
+
+    /**
+     * Sets the value that will be returned by {@link #isWriting()}.
+     * @param writing
+     */
     void setWriting(boolean writing);
 
-    void setDeadline(long deadline) ;
-
-    public void onReadable(Callback runnable) ;
-
-    public void onWritable(Callback runnable);
-
-    public void onExpired(Callback runnable);
-
-    public void onError(Callback runnable);
-
-    public void onRelease(Callback runnable);
-
-    public void onFree(Callback runnable);
-
-    void readable() ;
-
-    void writeable() ;
-
-    void expired() ;
-
+    /**
+     * Sets the value that will be returned by {@link #getDeadline()}.
+     * @param deadline
+     */
+    void setDeadline(long deadline);
+
+    /**
+     * Registers a callback that will be run when the selectable becomes ready
+     * for reading.
+     * @param runnable the callback to register.  Any previously registered
+     *                 callback will be replaced.
+     */
+    void onReadable(Callback runnable);
+
+    /**
+     * Registers a callback that will be run when the selectable becomes ready
+     * for writing.
+     * @param runnable the callback to register.  Any previously registered
+     *                 callback will be replaced.
+     */
+    void onWritable(Callback runnable);
+
+    /**
+     * Registers a callback that will be run when the selectable expires.
+     * @param runnable the callback to register.  Any previously registered
+     *                 callback will be replaced.
+     */
+    void onExpired(Callback runnable);
+
+    /**
+     * Registers a callback that will be run when the selectable is notified of
+     * an error.
+     * @param runnable the callback to register.  Any previously registered
+     *                 callback will be replaced.
+     */
+    void onError(Callback runnable);
+
+    /**
+     * Registers a callback that will be run when the selectable is notified
+     * that it has been released.
+     * @param runnable the callback to register.  Any previously registered
+     *                 callback will be replaced.
+     */
+    void onRelease(Callback runnable);
+
+    /**
+     * Registers a callback that will be run when the selectable is notified
+     * that it has been free'd.
+     * @param runnable the callback to register.  Any previously registered
+     *                 callback will be replaced.
+     */
+    void onFree(Callback runnable);
+
+    /**
+     * Notify the selectable that the underlying {@link SelectableChannel} is
+     * ready for a read operation.
+     */
+    void readable();
+
+    /**
+     * Notify the selectable that the underlying {@link SelectableChannel} is
+     * ready for a write operation.
+     */
+    void writeable();
+
+    /** Notify the selectable that it has expired. */
+    void expired();
+
+    /** Notify the selectable that an error has occurred. */
     void error();
 
-    void release() ;
+    /** Notify the selectable that it has been released. */
+    void release();
 
+    /** Notify the selectable that it has been free'd. */
     @Override
-    void free() ;
-
-    // These are equivalent to the C code's set/get file descriptor functions.
-    void setChannel(SelectableChannel channel) ;
-
-    public SelectableChannel getChannel() ;
-
-    boolean isRegistered() ;
-
-    void setRegistered(boolean registered) ;
-
-    void setCollector(final Collector collector) ;
-
-    public Reactor getReactor() ;
-
+    void free();
+
+    /**
+     * Associates a {@link SelectableChannel} with this selector.
+     * @param channel
+     */
+    void setChannel(SelectableChannel channel); // This is the equivalent to pn_selectable_set_fd(...)
+
+    /** @return the {@link SelectableChannel} associated with this selector. */
+    SelectableChannel getChannel(); // This is the equivalent to pn_selectable_get_fd(...)
+
+    /**
+     * Check if a selectable is registered.  This can be used for tracking
+     * whether a given selectable has been registerd with an external event
+     * loop.
+     * <p>
+     * <em>Note:</em> the reactor code, currently, does not use this flag.
+     * @return <code>true</code>if the selectable is registered.
+     */
+    boolean isRegistered();  // XXX: unused in C reactor code
+
+    /**
+     * Set the registered flag for a selectable.
+     * <p>
+     * <em>Note:</em> the reactor code, currently, does not use this flag.
+     * @param registered the value returned by {@link #isRegistered()}
+     */
+    void setRegistered(boolean registered); // XXX: unused in C reactor code
+
+    /**
+     * Configure a selectable with a set of callbacks that emit readable,
+     * writable, and expired events into the supplied collector.
+     * @param collector
+     */
+    void setCollector(final Collector collector);
+
+    /** @return the reactor to which this selectable is a child. */
+    Reactor getReactor() ;
+
+    /**
+     * Terminates the selectable.  Once a selectable reaches a terminal state
+     * it will never be interested in events of any kind.
+     */
     public void terminate() ;
 
-    public boolean isTerminal();
-
-    public Transport getTransport() ;
-
-    public void setTransport(Transport transport) ;
-
-    public void setReactor(Reactor reactor) ;
+    /**
+     * @return <code>true</code> if the selectable has reached a terminal state.
+     */
+    boolean isTerminal();
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
index 592e32a..4228a8d 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
@@ -24,20 +24,88 @@ package org.apache.qpid.proton.reactor;
 import java.io.IOException;
 import java.util.Iterator;
 
+/**
+ * A multiplexor of instances of {@link Selectable}.
+ * <p>
+ * Many instances of <code>Selectable</code> can be added to a selector, and
+ * the {@link #select(long)} method used to block the calling thread until
+ * one of the <code>Selectables</code> becomes read to perform an operation.
+ * <p>
+ * This class is not thread safe, so only one thread should be manipulating the
+ * contents of the selector, or running the {@link #select(long)} method at
+ * any given time.
+ */
 public interface Selector {
 
-    void add(Selectable selectable) throws IOException ;
+    /**
+     * Adds a selectable to the selector.
+     * @param selectable
+     * @throws IOException
+     */
+    void add(Selectable selectable) throws IOException;
 
+    /**
+     * Updates the selector to reflect any changes interest by the specified
+     * selectable.  This is achieved by calling the
+     * {@link Selectable#isReading()} and {@link Selectable#isWriting()}
+     * methods.
+     * @param selectable
+     */
     void update(Selectable selectable);
 
-    void remove(Selectable selectable) ;
+    /**
+     * Removes a selectable from the selector.
+     * @param selectable
+     */
+    void remove(Selectable selectable);
+
+    /**
+     * Waits for the specified timeout period for one or more selectables to
+     * become ready for an operation.  Selectables that become ready are
+     * returned by the {@link #readable()}, {@link #writeable()},
+     * {@link #expired()}, or {@link #error()} methods.
+     *
+     * @param timeout the maximum number of milliseconds to block the calling
+     *                thread waiting for a selectable to become ready for an
+     *                operation.  The value zero is interpreted as check but
+     *                don't block.
+     * @throws IOException
+     */
+    void select(long timeout) throws IOException;
+
+    /**
+     * @return the selectables that have become readable since the last call
+     *         to {@link #select(long)}.  Calling <code>select</code> clears
+     *         any previous values in this set before adding new values
+     *         corresponding to those selectables that have become readable.
+     */
+    Iterator<Selectable> readable();
+
+    /**
+     * @return the selectables that have become writable since the last call
+     *         to {@link #select(long)}.  Calling <code>select</code> clears
+     *         any previous values in this set before adding new values
+     *         corresponding to those selectables that have become writable.
+     */
+    Iterator<Selectable> writeable();
 
-    void select(long timeout) throws IOException ;
+    /**
+     * @return the selectables that have expired since the last call
+     *         to {@link #select(long)}.  Calling <code>select</code> clears
+     *         any previous values in this set before adding new values
+     *         corresponding to those selectables that have now expired.
+     */
+    Iterator<Selectable> expired();
 
-    Iterator<Selectable> readable() ;
-    Iterator<Selectable> writeable() ;
-    Iterator<Selectable> expired() ;
+    /**
+     * @return the selectables that have encountered an error since the last
+     *         call to {@link #select(long)}.  Calling <code>select</code>
+     *         clears any previous values in this set before adding new values
+     *         corresponding to those selectables that have encountered an
+     *         error.
+     */
     Iterator<Selectable> error() ;
 
+    /** Frees the resources used by this selector. */
     void free();
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
index 9d2557c..69701ab 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
@@ -21,12 +21,26 @@
 
 package org.apache.qpid.proton.reactor;
 
+import org.apache.qpid.proton.engine.Event.Type;
 import org.apache.qpid.proton.engine.Extendable;
+import org.apache.qpid.proton.engine.Handler;
 
+/**
+ * Represents work scheduled with a {@link Reactor} for execution at
+ * some point in the future.
+ * <p>
+ * Tasks are created using the {@link Reactor#schedule(int, Handler)}
+ * method.
+ */
 public interface Task extends Extendable {
 
-    public long deadline();
-    public void setReactor(Reactor reactor);
-    public Reactor getReactor();
+    /**
+     * @return the deadline at which the handler associated with the scheduled
+     *         task should be delivered a {@link Type#TIMER_TASK} event.
+     */
+    long deadline();
+
+    /** @return the reactor that created this task. */
+    Reactor getReactor();
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
index 7fe97af..431b745 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
@@ -43,7 +43,7 @@ import org.apache.qpid.proton.reactor.Selectable.Callback;
 public class AcceptorImpl implements Acceptor {
 
     private Record attachments = new RecordImpl();
-    private final Selectable sel;
+    private final SelectableImpl sel;
 
     private class AcceptorReadable implements Callback {
         @Override

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
index fa807e4..40eddac 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
@@ -124,7 +124,7 @@ public class IOHandler extends BaseHandler {
 
     // pni_connection_capacity from connection.c
     private static int capacity(Selectable selectable) {
-        Transport transport = selectable.getTransport();
+        Transport transport = ((SelectableImpl)selectable).getTransport();
         int capacity = transport.capacity();
         if (capacity < 0) {
             if (transport.isClosed()) {
@@ -136,7 +136,7 @@ public class IOHandler extends BaseHandler {
 
     // pni_connection_pending from connection.c
     private static int pending(Selectable selectable) {
-        Transport transport = selectable.getTransport();
+        Transport transport = ((SelectableImpl)selectable).getTransport();
         int pending = transport.pending();
         if (pending < 0) {
             if (transport.isClosed()) {
@@ -147,7 +147,7 @@ public class IOHandler extends BaseHandler {
     }
 
     // pni_connection_deadline from connection.c
-    private static long deadline(Selectable selectable) {
+    private static long deadline(SelectableImpl selectable) {
         Reactor reactor = selectable.getReactor();
         Transport transport = selectable.getTransport();
         long deadline = transport.tick(reactor.now());
@@ -156,11 +156,12 @@ public class IOHandler extends BaseHandler {
 
     // pni_connection_update from connection.c
     private static void update(Selectable selectable) {
-        int c = capacity(selectable);
-        int p = pending(selectable);
+        SelectableImpl selectableImpl = (SelectableImpl)selectable;
+        int c = capacity(selectableImpl);
+        int p = pending(selectableImpl);
         selectable.setReading(c > 0);
         selectable.setWriting(p > 0);
-        selectable.setDeadline(deadline(selectable));
+        selectable.setDeadline(deadline(selectableImpl));
     }
 
     // pni_connection_readable from connection.c
@@ -168,7 +169,7 @@ public class IOHandler extends BaseHandler {
         @Override
         public void run(Selectable selectable) {
             Reactor reactor = selectable.getReactor();
-            Transport transport = selectable.getTransport();
+            Transport transport = ((SelectableImpl)selectable).getTransport();
             int capacity = transport.capacity();
             if (capacity > 0) {
                 SocketChannel socketChannel = (SocketChannel)selectable.getChannel();
@@ -200,7 +201,7 @@ public class IOHandler extends BaseHandler {
         @Override
         public void run(Selectable selectable) {
             Reactor reactor = selectable.getReactor();
-            Transport transport = selectable.getTransport();
+            Transport transport = ((SelectableImpl)selectable).getTransport();
             int pending = transport.pending();
             if (pending > 0) {
                 SocketChannel channel = (SocketChannel)selectable.getChannel();
@@ -243,7 +244,7 @@ public class IOHandler extends BaseHandler {
         @Override
         public void run(Selectable selectable) {
             Reactor reactor = selectable.getReactor();
-            Transport transport = selectable.getTransport();
+            Transport transport = ((SelectableImpl)selectable).getTransport();
             long deadline = transport.tick(reactor.now());
             selectable.setDeadline(deadline);
             int c = capacity(selectable);
@@ -278,7 +279,7 @@ public class IOHandler extends BaseHandler {
         selectable.onError(connectionError);
         selectable.onExpired(connectionExpired);
         selectable.onFree(connectionFree);
-        selectable.setTransport(transport);
+        ((SelectableImpl)selectable).setTransport(transport);
         ((TransportImpl)transport).setSelectable(selectable);
         ((TransportImpl)transport).setReactor(reactor);
         update(selectable);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
index 45f9d4b..5fc451d 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
@@ -23,6 +23,7 @@ package org.apache.qpid.proton.reactor.impl;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
 import java.nio.channels.Pipe;
 import java.util.HashSet;
 import java.util.Set;
@@ -186,8 +187,8 @@ public class ReactorImpl implements Reactor, Extendable {
         return selectable(null);
     }
 
-    public Selectable selectable(ReactorChild child) {
-        Selectable result = new SelectableImpl();
+    public SelectableImpl selectable(ReactorChild child) {
+        SelectableImpl result = new SelectableImpl();
         result.setCollector(collector);
         collector.put(Type.SELECTABLE_INIT, result);
         result.setReactor(this);
@@ -295,10 +296,15 @@ public class ReactorImpl implements Reactor, Extendable {
         }
     }
 
-
     @Override
-    public void wakeup() throws IOException {
-        wakeup.sink().write(ByteBuffer.allocate(1));
+    public void wakeup() {
+        try {
+            wakeup.sink().write(ByteBuffer.allocate(1));
+        } catch(ClosedChannelException channelClosedException) {
+            // Ignore - pipe already closed by reactor being shutdown.
+        } catch(IOException ioException) {
+            throw new ReactorInternalException(ioException);
+        }
     }
 
     @Override
@@ -331,7 +337,7 @@ public class ReactorImpl implements Reactor, Extendable {
     @Override
     public Task schedule(int delay, Handler handler) {
         Task task = timer.schedule(now + delay);
-        task.setReactor(this);
+        ((TaskImpl)task).setReactor(this);
         BaseHandler.setHandler(task, handler);
         if (selectable != null) {
             selectable.setDeadline(timer.deadline());

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
index 5ab0176..df4e6cc 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
@@ -219,18 +219,15 @@ public class SelectableImpl implements Selectable {
         return terminal;
     }
 
-    @Override
-    public Transport getTransport() {
+    protected Transport getTransport() {
         return transport;
     }
 
-    @Override
-    public void setTransport(Transport transport) {
+    protected void setTransport(Transport transport) {
         this.transport = transport;
     }
 
-    @Override
-    public void setReactor(Reactor reactor) {
+    protected void setReactor(Reactor reactor) {
         this.reactor = reactor;
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
index ed4ad69..4c2f1ed 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
@@ -133,7 +133,7 @@ class SelectorImpl implements Selector {
                             ((SocketChannel)key.channel()).finishConnect();
                             update((Selectable)key.attachment());
                         } catch(IOException ioException) {
-                            Selectable selectable = (Selectable)key.attachment();
+                            SelectableImpl selectable = (SelectableImpl)key.attachment();
                             ErrorCondition condition = new ErrorCondition();
                             condition.setCondition(Symbol.getSymbol("proton:io"));
                             condition.setDescription(ioException.getMessage());

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
index 5311059..00c9a84 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
@@ -33,6 +33,7 @@ public class TaskImpl implements Task, Comparable<TaskImpl> {
     private final int counter;
     private final AtomicInteger count = new AtomicInteger();
     private Record attachments = new RecordImpl();
+    private Reactor reactor;
 
     public TaskImpl(long deadline) {
         this.deadline = deadline;
@@ -57,8 +58,6 @@ public class TaskImpl implements Task, Comparable<TaskImpl> {
         return deadline;
     }
 
-    private Reactor reactor;
-    @Override
     public void setReactor(Reactor reactor) {
         this.reactor = reactor;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message