Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 740DC18022 for ; Mon, 6 Jul 2015 00:06:31 +0000 (UTC) Received: (qmail 55863 invoked by uid 500); 6 Jul 2015 00:06:31 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 55784 invoked by uid 500); 6 Jul 2015 00:06:31 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 55768 invoked by uid 99); 6 Jul 2015 00:06:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Jul 2015 00:06:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F0522DFE80; Mon, 6 Jul 2015 00:06:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rhs@apache.org To: commits@qpid.apache.org Date: Mon, 06 Jul 2015 00:06:32 -0000 Message-Id: In-Reply-To: <6153e317aebf411ebd27ffbfb7fa759b@git.apache.org> References: <6153e317aebf411ebd27ffbfb7fa759b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] qpid-proton git commit: PROTON-881: Tidy up and doc reactor interfaces PROTON-881: Tidy up and doc reactor interfaces Tidy up the Java interfaces for the proton-j reactor by removing a few unnecessary setters. Document the interfaces using Javadoc. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/2f8728a8 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/2f8728a8 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/2f8728a8 Branch: refs/heads/master Commit: 2f8728a85fbea23db73fe60e42038782f89a5517 Parents: 513f152 Author: Adrian Preston Authored: Wed Jul 1 00:30:53 2015 +0100 Committer: Rafael Schloming Committed: Sun Jul 5 19:57:39 2015 -0400 ---------------------------------------------------------------------- .../qpid/proton/example/reactor/HelloWorld.java | 2 +- .../apache/qpid/proton/reactor/Acceptor.java | 9 + .../qpid/proton/reactor/FlowController.java | 4 + .../apache/qpid/proton/reactor/Handshaker.java | 7 + .../org/apache/qpid/proton/reactor/Reactor.java | 278 +++++++++++++++---- .../qpid/proton/reactor/ReactorChild.java | 5 +- .../apache/qpid/proton/reactor/Selectable.java | 227 +++++++++++---- .../apache/qpid/proton/reactor/Selector.java | 80 +++++- .../org/apache/qpid/proton/reactor/Task.java | 20 +- .../qpid/proton/reactor/impl/AcceptorImpl.java | 2 +- .../qpid/proton/reactor/impl/IOHandler.java | 21 +- .../qpid/proton/reactor/impl/ReactorImpl.java | 18 +- .../proton/reactor/impl/SelectableImpl.java | 9 +- .../qpid/proton/reactor/impl/SelectorImpl.java | 2 +- .../qpid/proton/reactor/impl/TaskImpl.java | 3 +- 15 files changed, 552 insertions(+), 135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java ---------------------------------------------------------------------- diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java index 745004e..055d6df 100644 --- a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java +++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java @@ -33,7 +33,7 @@ import org.apache.qpid.proton.reactor.Reactor; * The proton reactor provides a general purpose event processing * library for writing reactive programs. A reactive program is defined * by a set of event handlers. An event handler is just any class or - * object that extends the Handler interface. For convinience, a class + * object that extends the Handler interface. For convenience, a class * can extend BaseHandler and only handle the events that it cares to * implement methods for. */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java index 744f4cb..222ce40 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Acceptor.java @@ -23,8 +23,17 @@ package org.apache.qpid.proton.reactor; import org.apache.qpid.proton.engine.Extendable; +/** + * Acceptors are children of a {@link Reactor} that accept in-bound network + * connections. + */ public interface Acceptor extends ReactorChild, Extendable { + /** + * Closes the acceptor, stopping it accepting any further in-bound + * connections. Already accepted connections continue to be processed by + * the associated reactor. + */ void close(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java index c8b999b..716b2a7 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/FlowController.java @@ -26,6 +26,10 @@ import org.apache.qpid.proton.engine.Event; import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Receiver; +/** + * A handler that applies flow control to a connection. This handler tops-up + * link credit each time credit is expended by the receipt of messages. + */ public class FlowController extends BaseHandler { private int drained; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java index f9b670a..cbd496e 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Handshaker.java @@ -26,6 +26,13 @@ import org.apache.qpid.proton.engine.Endpoint; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Event; +/** + * A handler that mirrors the actions of the remote end of a connection. This + * handler responds in kind when the remote end of the connection is opened and + * closed. Likewise if the remote end of the connection opens or closes + * sessions and links, this handler responds by opening or closing the local end + * of the session or link. + */ public class Handshaker extends BaseHandler { private void open(Endpoint endpoint) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java index 3201c5a..f2a38a5 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java @@ -24,15 +24,26 @@ package org.apache.qpid.proton.reactor; import java.io.IOException; import java.util.Set; +import org.apache.qpid.proton.engine.BaseHandler; import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Event.Type; import org.apache.qpid.proton.engine.Handler; import org.apache.qpid.proton.engine.HandlerException; import org.apache.qpid.proton.engine.Record; import org.apache.qpid.proton.reactor.impl.ReactorImpl; - - +/** + * The proton reactor provides a general purpose event processing + * library for writing reactive programs. A reactive program is defined + * by a set of event handlers. An event handler is just any class or + * object that extends the Handler interface. For convenience, a class + * can extend {@link BaseHandler} and only handle the events that it cares to + * implement methods for. + *

+ * This class is not thread safe (with the exception of the {@link #wakeup()} + * method) and should only be used by a single thread at any given time. + */ public interface Reactor { public static final class Factory @@ -42,55 +53,228 @@ public interface Reactor { } } - public long mark(); - public long now(); - public Record attachments(); - public long getTimeout(); - - public void setTimeout(long timeout); - - public Handler getGlobalHandler(); - - public void setGlobalHandler(Handler handler); - - public Handler getHandler(); - - public void setHandler(Handler handler); - - public Set children(); - - public Collector collector(); - - - public Selectable selectable(); - - - - public void update(Selectable selectable); - - + /** + * Updates the last time that the reactor's state has changed, potentially + * resulting in events being generated. + * @return the current time in milliseconds + * {@link System#currentTimeMillis()}. + */ + long mark(); + + /** @return the last time that {@link #mark()} was called. */ + long now(); + + /** @return an instance of {@link Record} that can be used to associate + * other objects (attachments) with this instance of the + * Reactor class. + */ + Record attachments(); + + /** + * @param timeout a timeout value, to associate with this instance of + * the reactor. This can be retrieved using the + * {@link #getTimeout()} method. + */ + void setTimeout(long timeout); + + /** + * @return the value previously set using {@link #setTimeout(long)} or + * 0 if no previous value has been set. + */ + long getTimeout(); + + /** + * @return the global handler for this reactor. Every event the reactor + * sees is dispatched to the global handler. To receive every + * event generated by the reactor, associate a child handler + * with the global handler. For example: + *

+     *            getGlobalHandler().add(yourHandler);
+     *         
+ */ + Handler getGlobalHandler(); + + /** + * Sets a new global handler. You probably don't want to do this and + * would be better adding a handler to the value returned by the + * {{@link #getGlobalHandler()} method. + * @param handler the new global handler. + */ + void setGlobalHandler(Handler handler); + + /** + * @return the handler for this reactor. Every event the reactor sees, + * which is not handled by a child of the reactor (such as a + * timer, connection, acceptor, or selector) is passed to this + * handler. To receive these events, it is recommend that you + * associate a child handler with the handler returned by this + * method. For example: + *
+     *           getHandler().add(yourHandler);
+     *         
+ */ + Handler getHandler(); + + /** + * Sets a new handler, that will receive any events not handled by a child + * of the reactor. Note that setting a handler via this method replaces + * the previous handler, and will result in no further events being + * dispatched to the child handlers associated with the previous handler. + * For this reason it is recommended that you do not use this method and + * instead add child handlers to the value returned by the + * {@link #getHandler()} method. + * @param handler the new handler for this reactor. + */ + void setHandler(Handler handler); + + /** + * @return a set containing the child objects associated wit this reactor. + * This will contain any active instances of: {@link Task} - created + * using the {@link #schedule(int, Handler)} method, + * {@link Connection} - created using the + * {@link #connection(Handler)} method, {@link Acceptor} - + * created using the {@link #acceptor(String, int)} method. + * {@link #acceptor(String, int, Handler)} method, or + * {@link Selectable} - created using the {@link #selectable()} + * method. + */ + Set children(); + + /** + * @return the Collector used to gather events generated by this reactor. + */ + Collector collector(); + + /** + * Creates a new Selectable as a child of this reactor. + * @return the newly created Selectable. + */ + Selectable selectable(); + + /** + * Updates the specified Selectable either emitting a + * {@link Type#SELECTABLE_UPDATED} event if the selectable is not terminal, + * or {@link Type#SELECTABLE_FINAL} if the selectable is terminal and has + * not already emitted a {@link Type#SELECTABLE_FINAL} event. + * @param selectable + */ + void update(Selectable selectable); + + /** + * Yields, causing the next call to {@link #process()} to return + * successfully - without processing any events. If multiple calls + * can be made to yield and only the next invocation of + * {@link #process()} will be affected. + */ void yield() ; - public boolean quiesced(); - - public boolean process() throws HandlerException; - - public void wakeup() throws IOException; - - public void start() ; - - public void stop() throws HandlerException; - - public void run() throws HandlerException; - - // pn_reactor_schedule from reactor.c - public Task schedule(int delay, Handler handler); - + /** + * @return true if the reactor is in quiesced state (e.g. has + * no events to process). false is returned otherwise. + */ + boolean quiesced(); + + /** + * Process any events pending for this reactor. Events are dispatched to + * the handlers registered with the reactor, or child objects associated + * with the reactor. This method blocks until the reactor has no more work + * to do (and no more work pending, in terms of scheduled tasks or open + * selectors to process). + * @return true if the reactor may have more events in the + * future. For example: if there are scheduled tasks, or open + * selectors. false is returned if the reactor has + * (and will have) no more events to process. + * @throws HandlerException if an unchecked exception is thrown by one of + * the handlers - it will be re-thrown attached to an instance of + * HandlerException. + */ + boolean process() throws HandlerException; + + /** + * Wakes up the thread (if any) blocked in the {@link #process()} method. + * This is the only method of this class that is thread safe, in that it + * can be used at the same time as another thread is using the reactor. + */ + void wakeup(); + + /** + * Starts the reactor. This method should be invoked before the first call + * to {@link #process()}. + */ + void start(); + + /** + * Stops the reactor. This method should be invoked after the last call to + * {@link #process()}. + * @throws HandlerException + */ + void stop() throws HandlerException; + + /** + * Simplifies the use of the reactor by wrapping the use of + * start, run, and stop method + * calls. + *

+ * Logically the implementation of this method is: + *

+     *   start();
+     *   while(process()) {}
+     *   stop();
+     * 
+ * @throws HandlerException if an unchecked exception is thrown by one of + * the handlers - it will be re-thrown attached to an instance of + * HandlerException. + */ + void run() throws HandlerException; + + /** + * Schedules execution of a task to take place at some point in the future. + * @param delay the number of milliseconds, in the future, to schedule the + * task for. + * @param handler a handler to associate with the task. This is notified + * when the deadline for the task is reached. + * @return an object representing the task that has been scheduled. + */ + Task schedule(int delay, Handler handler); + + /** + * Creates a new out-bound connection. + * @param handler a handler that is notified when events occur for the + * connection. Typically the host and port to connect to + * would be supplied to the connection object inside the + * logic which handles the {@link Type#CONNECTION_INIT} + * event. + * @return the newly created connection object. + */ Connection connection(Handler handler); + /** + * Creates a new acceptor. This is equivalent to calling: + *
+     *   acceptor(host, port, null);
+     * 
+ * @param host + * @param port + * @return the newly created acceptor object. + * @throws IOException + */ Acceptor acceptor(String host, int port) throws IOException; - Acceptor acceptor(String host, int port, Handler handler) throws IOException; - // This also frees any children that the reactor has! - public void free(); + /** + * Creates a new acceptor. This acceptor listens for in-bound connections. + * @param host the host name or address of the NIC to listen on. + * @param port the port number to listen on. + * @param handler if non-null this handler is registered with + * each new connection accepted by the acceptor. + * @return the newly created acceptor object. + * @throws IOException + */ + Acceptor acceptor(String host, int port, Handler handler) + throws IOException; + + /** + * Frees any resources (such as sockets and selectors) held by the reactor + * or its children. + */ + void free(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java index c39bdd9..146ee09 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java @@ -21,8 +21,11 @@ package org.apache.qpid.proton.reactor; -// Interface used to identify classes that can be a child of a reactor. +/** + * Interface used to identify classes that can be a child of a reactor. + */ public interface ReactorChild { + /** Frees any resources associated with this child. */ void free(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java index fa459d1..e91a0ee 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java @@ -25,72 +25,197 @@ import java.nio.channels.SelectableChannel; import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Extendable; -import org.apache.qpid.proton.engine.Transport; +/** + * An entity that can be multiplexed using a {@link Selector}. + *

+ * Every selectable is associated with exactly one {@link SelectableChannel}. + * Selectables may be interested in three kinds of events: read events, write + * events, and timer events. A selectable will express its interest in these + * events through the {@link #isReading()}, {@link #isWriting()}, and + * {@link #getDeadline()} methods. + *

+ * When a read, write, or timer event occurs, the selectable must be notified by + * calling {@link #readable()}, {@link #writeable()}, or {@link #expired()} as + * appropriate. + * + * Once a selectable reaches a terminal state (see {@link #isTerminal()}, it + * will never be interested in events of any kind. When this occurs it should be + * removed from the Selector and discarded using {@link #free()}. + */ public interface Selectable extends ReactorChild, Extendable { - public interface Callback { + /** + * A callback that can be passed to the various "on" methods of the + * selectable - to allow code to be run when the selectable becomes ready + * for the associated operation. + */ + interface Callback { void run(Selectable selectable); } - public boolean isReading(); - + /** + * @return true if the selectable is interested in receiving + * notification (via the {@link #readable()} method that indicate + * that the associated {@link SelectableChannel} has data ready + * to be read from it. + */ + boolean isReading(); + + /** + * @return true if the selectable is interested in receiving + * notifications (via the {@link #writeable()} method that indicate + * that the associated {@link SelectableChannel} is ready to be + * written to. + */ boolean isWriting(); - long getDeadline() ; - - void setReading(boolean reading) ; - + /** + * @return a deadline after which this selectable can expect to receive + * a notification (via the {@link #expired()} method that indicates + * that the deadline has past. The deadline is expressed in the + * same format as {@link System#currentTimeMillis()}. Returning + * a deadline of zero (or a negative number) indicates that the + * selectable does not wish to be notified of expiry. + */ + long getDeadline(); + + /** + * Sets the value that will be returned by {@link #isReading()}. + * @param reading + */ + void setReading(boolean reading); + + /** + * Sets the value that will be returned by {@link #isWriting()}. + * @param writing + */ void setWriting(boolean writing); - void setDeadline(long deadline) ; - - public void onReadable(Callback runnable) ; - - public void onWritable(Callback runnable); - - public void onExpired(Callback runnable); - - public void onError(Callback runnable); - - public void onRelease(Callback runnable); - - public void onFree(Callback runnable); - - void readable() ; - - void writeable() ; - - void expired() ; - + /** + * Sets the value that will be returned by {@link #getDeadline()}. + * @param deadline + */ + void setDeadline(long deadline); + + /** + * Registers a callback that will be run when the selectable becomes ready + * for reading. + * @param runnable the callback to register. Any previously registered + * callback will be replaced. + */ + void onReadable(Callback runnable); + + /** + * Registers a callback that will be run when the selectable becomes ready + * for writing. + * @param runnable the callback to register. Any previously registered + * callback will be replaced. + */ + void onWritable(Callback runnable); + + /** + * Registers a callback that will be run when the selectable expires. + * @param runnable the callback to register. Any previously registered + * callback will be replaced. + */ + void onExpired(Callback runnable); + + /** + * Registers a callback that will be run when the selectable is notified of + * an error. + * @param runnable the callback to register. Any previously registered + * callback will be replaced. + */ + void onError(Callback runnable); + + /** + * Registers a callback that will be run when the selectable is notified + * that it has been released. + * @param runnable the callback to register. Any previously registered + * callback will be replaced. + */ + void onRelease(Callback runnable); + + /** + * Registers a callback that will be run when the selectable is notified + * that it has been free'd. + * @param runnable the callback to register. Any previously registered + * callback will be replaced. + */ + void onFree(Callback runnable); + + /** + * Notify the selectable that the underlying {@link SelectableChannel} is + * ready for a read operation. + */ + void readable(); + + /** + * Notify the selectable that the underlying {@link SelectableChannel} is + * ready for a write operation. + */ + void writeable(); + + /** Notify the selectable that it has expired. */ + void expired(); + + /** Notify the selectable that an error has occurred. */ void error(); - void release() ; + /** Notify the selectable that it has been released. */ + void release(); + /** Notify the selectable that it has been free'd. */ @Override - void free() ; - - // These are equivalent to the C code's set/get file descriptor functions. - void setChannel(SelectableChannel channel) ; - - public SelectableChannel getChannel() ; - - boolean isRegistered() ; - - void setRegistered(boolean registered) ; - - void setCollector(final Collector collector) ; - - public Reactor getReactor() ; - + void free(); + + /** + * Associates a {@link SelectableChannel} with this selector. + * @param channel + */ + void setChannel(SelectableChannel channel); // This is the equivalent to pn_selectable_set_fd(...) + + /** @return the {@link SelectableChannel} associated with this selector. */ + SelectableChannel getChannel(); // This is the equivalent to pn_selectable_get_fd(...) + + /** + * Check if a selectable is registered. This can be used for tracking + * whether a given selectable has been registerd with an external event + * loop. + *

+ * Note: the reactor code, currently, does not use this flag. + * @return trueif the selectable is registered. + */ + boolean isRegistered(); // XXX: unused in C reactor code + + /** + * Set the registered flag for a selectable. + *

+ * Note: the reactor code, currently, does not use this flag. + * @param registered the value returned by {@link #isRegistered()} + */ + void setRegistered(boolean registered); // XXX: unused in C reactor code + + /** + * Configure a selectable with a set of callbacks that emit readable, + * writable, and expired events into the supplied collector. + * @param collector + */ + void setCollector(final Collector collector); + + /** @return the reactor to which this selectable is a child. */ + Reactor getReactor() ; + + /** + * Terminates the selectable. Once a selectable reaches a terminal state + * it will never be interested in events of any kind. + */ public void terminate() ; - public boolean isTerminal(); - - public Transport getTransport() ; - - public void setTransport(Transport transport) ; - - public void setReactor(Reactor reactor) ; + /** + * @return true if the selectable has reached a terminal state. + */ + boolean isTerminal(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java index 592e32a..4228a8d 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java @@ -24,20 +24,88 @@ package org.apache.qpid.proton.reactor; import java.io.IOException; import java.util.Iterator; +/** + * A multiplexor of instances of {@link Selectable}. + *

+ * Many instances of Selectable can be added to a selector, and + * the {@link #select(long)} method used to block the calling thread until + * one of the Selectables becomes read to perform an operation. + *

+ * This class is not thread safe, so only one thread should be manipulating the + * contents of the selector, or running the {@link #select(long)} method at + * any given time. + */ public interface Selector { - void add(Selectable selectable) throws IOException ; + /** + * Adds a selectable to the selector. + * @param selectable + * @throws IOException + */ + void add(Selectable selectable) throws IOException; + /** + * Updates the selector to reflect any changes interest by the specified + * selectable. This is achieved by calling the + * {@link Selectable#isReading()} and {@link Selectable#isWriting()} + * methods. + * @param selectable + */ void update(Selectable selectable); - void remove(Selectable selectable) ; + /** + * Removes a selectable from the selector. + * @param selectable + */ + void remove(Selectable selectable); + + /** + * Waits for the specified timeout period for one or more selectables to + * become ready for an operation. Selectables that become ready are + * returned by the {@link #readable()}, {@link #writeable()}, + * {@link #expired()}, or {@link #error()} methods. + * + * @param timeout the maximum number of milliseconds to block the calling + * thread waiting for a selectable to become ready for an + * operation. The value zero is interpreted as check but + * don't block. + * @throws IOException + */ + void select(long timeout) throws IOException; + + /** + * @return the selectables that have become readable since the last call + * to {@link #select(long)}. Calling select clears + * any previous values in this set before adding new values + * corresponding to those selectables that have become readable. + */ + Iterator readable(); + + /** + * @return the selectables that have become writable since the last call + * to {@link #select(long)}. Calling select clears + * any previous values in this set before adding new values + * corresponding to those selectables that have become writable. + */ + Iterator writeable(); - void select(long timeout) throws IOException ; + /** + * @return the selectables that have expired since the last call + * to {@link #select(long)}. Calling select clears + * any previous values in this set before adding new values + * corresponding to those selectables that have now expired. + */ + Iterator expired(); - Iterator readable() ; - Iterator writeable() ; - Iterator expired() ; + /** + * @return the selectables that have encountered an error since the last + * call to {@link #select(long)}. Calling select + * clears any previous values in this set before adding new values + * corresponding to those selectables that have encountered an + * error. + */ Iterator error() ; + /** Frees the resources used by this selector. */ void free(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java index 9d2557c..69701ab 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java @@ -21,12 +21,26 @@ package org.apache.qpid.proton.reactor; +import org.apache.qpid.proton.engine.Event.Type; import org.apache.qpid.proton.engine.Extendable; +import org.apache.qpid.proton.engine.Handler; +/** + * Represents work scheduled with a {@link Reactor} for execution at + * some point in the future. + *

+ * Tasks are created using the {@link Reactor#schedule(int, Handler)} + * method. + */ public interface Task extends Extendable { - public long deadline(); - public void setReactor(Reactor reactor); - public Reactor getReactor(); + /** + * @return the deadline at which the handler associated with the scheduled + * task should be delivered a {@link Type#TIMER_TASK} event. + */ + long deadline(); + + /** @return the reactor that created this task. */ + Reactor getReactor(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java index 7fe97af..431b745 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java @@ -43,7 +43,7 @@ import org.apache.qpid.proton.reactor.Selectable.Callback; public class AcceptorImpl implements Acceptor { private Record attachments = new RecordImpl(); - private final Selectable sel; + private final SelectableImpl sel; private class AcceptorReadable implements Callback { @Override http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java index fa807e4..40eddac 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java @@ -124,7 +124,7 @@ public class IOHandler extends BaseHandler { // pni_connection_capacity from connection.c private static int capacity(Selectable selectable) { - Transport transport = selectable.getTransport(); + Transport transport = ((SelectableImpl)selectable).getTransport(); int capacity = transport.capacity(); if (capacity < 0) { if (transport.isClosed()) { @@ -136,7 +136,7 @@ public class IOHandler extends BaseHandler { // pni_connection_pending from connection.c private static int pending(Selectable selectable) { - Transport transport = selectable.getTransport(); + Transport transport = ((SelectableImpl)selectable).getTransport(); int pending = transport.pending(); if (pending < 0) { if (transport.isClosed()) { @@ -147,7 +147,7 @@ public class IOHandler extends BaseHandler { } // pni_connection_deadline from connection.c - private static long deadline(Selectable selectable) { + private static long deadline(SelectableImpl selectable) { Reactor reactor = selectable.getReactor(); Transport transport = selectable.getTransport(); long deadline = transport.tick(reactor.now()); @@ -156,11 +156,12 @@ public class IOHandler extends BaseHandler { // pni_connection_update from connection.c private static void update(Selectable selectable) { - int c = capacity(selectable); - int p = pending(selectable); + SelectableImpl selectableImpl = (SelectableImpl)selectable; + int c = capacity(selectableImpl); + int p = pending(selectableImpl); selectable.setReading(c > 0); selectable.setWriting(p > 0); - selectable.setDeadline(deadline(selectable)); + selectable.setDeadline(deadline(selectableImpl)); } // pni_connection_readable from connection.c @@ -168,7 +169,7 @@ public class IOHandler extends BaseHandler { @Override public void run(Selectable selectable) { Reactor reactor = selectable.getReactor(); - Transport transport = selectable.getTransport(); + Transport transport = ((SelectableImpl)selectable).getTransport(); int capacity = transport.capacity(); if (capacity > 0) { SocketChannel socketChannel = (SocketChannel)selectable.getChannel(); @@ -200,7 +201,7 @@ public class IOHandler extends BaseHandler { @Override public void run(Selectable selectable) { Reactor reactor = selectable.getReactor(); - Transport transport = selectable.getTransport(); + Transport transport = ((SelectableImpl)selectable).getTransport(); int pending = transport.pending(); if (pending > 0) { SocketChannel channel = (SocketChannel)selectable.getChannel(); @@ -243,7 +244,7 @@ public class IOHandler extends BaseHandler { @Override public void run(Selectable selectable) { Reactor reactor = selectable.getReactor(); - Transport transport = selectable.getTransport(); + Transport transport = ((SelectableImpl)selectable).getTransport(); long deadline = transport.tick(reactor.now()); selectable.setDeadline(deadline); int c = capacity(selectable); @@ -278,7 +279,7 @@ public class IOHandler extends BaseHandler { selectable.onError(connectionError); selectable.onExpired(connectionExpired); selectable.onFree(connectionFree); - selectable.setTransport(transport); + ((SelectableImpl)selectable).setTransport(transport); ((TransportImpl)transport).setSelectable(selectable); ((TransportImpl)transport).setReactor(reactor); update(selectable); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java index 45f9d4b..5fc451d 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java @@ -23,6 +23,7 @@ package org.apache.qpid.proton.reactor.impl; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.nio.channels.Pipe; import java.util.HashSet; import java.util.Set; @@ -186,8 +187,8 @@ public class ReactorImpl implements Reactor, Extendable { return selectable(null); } - public Selectable selectable(ReactorChild child) { - Selectable result = new SelectableImpl(); + public SelectableImpl selectable(ReactorChild child) { + SelectableImpl result = new SelectableImpl(); result.setCollector(collector); collector.put(Type.SELECTABLE_INIT, result); result.setReactor(this); @@ -295,10 +296,15 @@ public class ReactorImpl implements Reactor, Extendable { } } - @Override - public void wakeup() throws IOException { - wakeup.sink().write(ByteBuffer.allocate(1)); + public void wakeup() { + try { + wakeup.sink().write(ByteBuffer.allocate(1)); + } catch(ClosedChannelException channelClosedException) { + // Ignore - pipe already closed by reactor being shutdown. + } catch(IOException ioException) { + throw new ReactorInternalException(ioException); + } } @Override @@ -331,7 +337,7 @@ public class ReactorImpl implements Reactor, Extendable { @Override public Task schedule(int delay, Handler handler) { Task task = timer.schedule(now + delay); - task.setReactor(this); + ((TaskImpl)task).setReactor(this); BaseHandler.setHandler(task, handler); if (selectable != null) { selectable.setDeadline(timer.deadline()); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java index 5ab0176..df4e6cc 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java @@ -219,18 +219,15 @@ public class SelectableImpl implements Selectable { return terminal; } - @Override - public Transport getTransport() { + protected Transport getTransport() { return transport; } - @Override - public void setTransport(Transport transport) { + protected void setTransport(Transport transport) { this.transport = transport; } - @Override - public void setReactor(Reactor reactor) { + protected void setReactor(Reactor reactor) { this.reactor = reactor; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java index ed4ad69..4c2f1ed 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java @@ -133,7 +133,7 @@ class SelectorImpl implements Selector { ((SocketChannel)key.channel()).finishConnect(); update((Selectable)key.attachment()); } catch(IOException ioException) { - Selectable selectable = (Selectable)key.attachment(); + SelectableImpl selectable = (SelectableImpl)key.attachment(); ErrorCondition condition = new ErrorCondition(); condition.setCondition(Symbol.getSymbol("proton:io")); condition.setDescription(ioException.getMessage()); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2f8728a8/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java index 5311059..00c9a84 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java @@ -33,6 +33,7 @@ public class TaskImpl implements Task, Comparable { private final int counter; private final AtomicInteger count = new AtomicInteger(); private Record attachments = new RecordImpl(); + private Reactor reactor; public TaskImpl(long deadline) { this.deadline = deadline; @@ -57,8 +58,6 @@ public class TaskImpl implements Task, Comparable { return deadline; } - private Reactor reactor; - @Override public void setReactor(Reactor reactor) { this.reactor = reactor; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org