Return-Path: X-Original-To: apmail-qpid-commits-archive@www.apache.org Delivered-To: apmail-qpid-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F1F0018FBA for ; Sun, 5 Jul 2015 23:45:03 +0000 (UTC) Received: (qmail 44886 invoked by uid 500); 5 Jul 2015 23:45:03 -0000 Delivered-To: apmail-qpid-commits-archive@qpid.apache.org Received: (qmail 44792 invoked by uid 500); 5 Jul 2015 23:45:03 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 44365 invoked by uid 99); 5 Jul 2015 23:45:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 05 Jul 2015 23:45:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D4535E360D; Sun, 5 Jul 2015 23:45:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rhs@apache.org To: commits@qpid.apache.org Date: Sun, 05 Jul 2015 23:45:03 -0000 Message-Id: <66b50345ecd6469597601b6a1793ede2@git.apache.org> In-Reply-To: <232a9358b57044748b5da9fc6483eafb@git.apache.org> References: <232a9358b57044748b5da9fc6483eafb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/38] qpid-proton git commit: PROTON-881: Initial commit of proton-j reactor implementation PROTON-881: Initial commit of proton-j reactor implementation Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e0187017 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e0187017 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e0187017 Branch: refs/heads/master Commit: e0187017456a4df58df2f1d04b1941d99eacbe10 Parents: cb4f9b9 Author: Adrian Preston Authored: Fri Apr 17 14:33:11 2015 +0100 Committer: Adrian Preston Committed: Wed May 6 23:22:27 2015 +0100 ---------------------------------------------------------------------- examples/java/reactor/.gitignore | 1 + .../proton/example/reactor/CountRandomly.java | 104 +++++ .../qpid/proton/example/reactor/Counter.java | 81 ++++ .../qpid/proton/example/reactor/Delegates.java | 68 +++ .../example/reactor/EchoInputStreamWrapper.java | 76 ++++ .../proton/example/reactor/GlobalLogger.java | 114 +++++ .../proton/example/reactor/GoodbyeWorld.java | 62 +++ .../qpid/proton/example/reactor/HelloWorld.java | 60 +++ .../proton/example/reactor/ReactorLogger.java | 103 +++++ .../qpid/proton/example/reactor/Scheduling.java | 60 +++ .../qpid/proton/example/reactor/Unhandled.java | 46 ++ .../java/org/apache/qpid/proton/Proton.java | 20 +- .../apache/qpid/proton/engine/BaseHandler.java | 28 ++ .../apache/qpid/proton/engine/Collector.java | 1 + .../apache/qpid/proton/engine/Connection.java | 4 +- .../org/apache/qpid/proton/engine/Event.java | 10 + .../org/apache/qpid/proton/engine/Handler.java | 18 + .../qpid/proton/engine/HandlerEndpoint.java | 28 ++ .../org/apache/qpid/proton/engine/Link.java | 4 +- .../org/apache/qpid/proton/engine/Session.java | 2 +- .../qpid/proton/engine/impl/CollectorImpl.java | 10 +- .../qpid/proton/engine/impl/ConnectionImpl.java | 2 +- .../qpid/proton/engine/impl/EventImpl.java | 103 ++++- .../proton/engine/impl/HandlerEndpointImpl.java | 44 ++ .../qpid/proton/engine/impl/LinkImpl.java | 28 +- .../qpid/proton/engine/impl/SessionImpl.java | 14 +- .../qpid/proton/engine/impl/TransportImpl.java | 25 ++ .../org/apache/qpid/proton/reactor/Reactor.java | 99 +++++ .../apache/qpid/proton/reactor/Selectable.java | 113 +++++ .../apache/qpid/proton/reactor/Selector.java | 43 ++ .../org/apache/qpid/proton/reactor/Task.java | 35 ++ .../qpid/proton/reactor/impl/IOHandler.java | 333 ++++++++++++++ .../qpid/proton/reactor/impl/ReactorImpl.java | 445 +++++++++++++++++++ .../proton/reactor/impl/SelectableImpl.java | 272 ++++++++++++ .../qpid/proton/reactor/impl/SelectorImpl.java | 137 ++++++ .../qpid/proton/reactor/impl/TaskImpl.java | 83 ++++ .../apache/qpid/proton/reactor/impl/Timer.java | 70 +++ 37 files changed, 2725 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/.gitignore ---------------------------------------------------------------------- diff --git a/examples/java/reactor/.gitignore b/examples/java/reactor/.gitignore new file mode 100644 index 0000000..5e56e04 --- /dev/null +++ b/examples/java/reactor/.gitignore @@ -0,0 +1 @@ +/bin http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/CountRandomly.java ---------------------------------------------------------------------- diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/CountRandomly.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/CountRandomly.java new file mode 100644 index 0000000..0dcdf4a --- /dev/null +++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/CountRandomly.java @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.example.reactor; + +import java.io.IOException; + +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.reactor.Reactor; + +// Let's try to modify our counter example. In addition to counting to +// 10 in quarter second intervals, let's also print out a random number +// every half second. This is not a super easy thing to express in a +// purely sequential program, but not so difficult using events. +public class CountRandomly extends BaseHandler { + + private long startTime; + private CounterHandler counter; + + class CounterHandler extends BaseHandler { + private final int limit; + private int count; + + CounterHandler(int limit) { + this.limit = limit; + } + + @Override + public void onTimerTask(Event event) { + count += 1; + System.out.println(count); + + if (!done()) { + event.getReactor().schedule(250, this); + } + } + + private boolean done() { + return count >= limit; + } + } + + @Override + public void onReactorInit(Event event) { + startTime = System.currentTimeMillis(); + System.out.println("Hello, World!"); + + // Save the counter instance in an attribute so we can refer to + // it later. + counter = new CounterHandler(10); + event.getReactor().schedule(250, counter); + + // Now schedule another event with a different handler. Note + // that the timer tasks go to separate handlers, and they don't + // interfere with each other. + event.getReactor().schedule(500, this); + } + + @Override + public void onTimerTask(Event event) { + // keep on shouting until we are done counting + System.out.println("Yay, " + Math.round(Math.abs((Math.random() * 110) - 10))); + if (!counter.done()) { + event.getReactor().schedule(500, this); + } + } + + @Override + public void onReactorFinal(Event event) { + long elapsedTime = System.currentTimeMillis() - startTime; + System.out.println("Goodbye, World! (after " + elapsedTime + " long milliseconds)"); + } + + public static void main(String[] args) throws IOException { + // In HelloWorld.java we said the reactor exits when there are no more + // events to process. While this is true, it's not actually complete. + // The reactor exits when there are no more events to process and no + // possibility of future events arising. For that reason the reactor + // will keep running until there are no more scheduled events and then + // exit. + Reactor reactor = Proton.reactor(new CountRandomly()); + reactor.run(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Counter.java ---------------------------------------------------------------------- diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Counter.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Counter.java new file mode 100644 index 0000000..a34038e --- /dev/null +++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Counter.java @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.example.reactor; + +import java.io.IOException; + +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.reactor.Reactor; + +public class Counter extends BaseHandler { + + private long startTime; + + class CounterHandler extends BaseHandler { + private final int limit; + private int count; + + CounterHandler(int limit) { + this.limit = limit; + } + + @Override + public void onTimerTask(Event event) { + count += 1; + System.out.println(count); + if (count < limit) { + event.getReactor().schedule(250, this); + } + } + } + + @Override + public void onReactorInit(Event event) { + startTime = System.currentTimeMillis(); + System.out.println("Hello, World!"); + + // Note that unlike the previous scheduling example, we pass in + // a separate object for the handler. This means that the timer + // event we just scheduled will not be seen by Program as it is + // being handled by the Counter instance we create. + event.getReactor().schedule(250, new CounterHandler(10)); + } + + @Override + public void onReactorFinal(Event event) { + long elapsedTime = System.currentTimeMillis() - startTime; + System.out.println("Goodbye, World! (after " + elapsedTime + " long milliseconds)"); + } + + public static void main(String[] args) throws IOException { + // In HelloWorld.java we said the reactor exits when there are no more + // events to process. While this is true, it's not actually complete. + // The reactor exits when there are no more events to process and no + // possibility of future events arising. For that reason the reactor + // will keep running until there are no more scheduled events and then + // exit. + Reactor reactor = Proton.reactor(new Counter()); + reactor.run(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Delegates.java ---------------------------------------------------------------------- diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Delegates.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Delegates.java new file mode 100644 index 0000000..7b4e36f --- /dev/null +++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Delegates.java @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.example.reactor; + +import java.io.IOException; + +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Handler; +import org.apache.qpid.proton.reactor.Reactor; + +// Events know how to dispatch themselves to handlers. By combining +// this with on_unhandled, you can provide a kind of inheritance +/// between handlers using delegation. +public class Delegates extends BaseHandler { + + private final Handler[] handlers; + + static class Hello extends BaseHandler { + @Override + public void onReactorInit(Event e) { + System.out.println("Hello, World!"); + } + } + + static class Goodbye extends BaseHandler { + @Override + public void onReactorFinal(Event e) { + System.out.println("Goodbye, World!"); + } + } + + public Delegates(Handler... handlers) { + this.handlers = handlers; + } + + @Override + public void onUnhandled(Event event) { + for (Handler handler : handlers) { + event.dispatch(handler); + } + } + + public static void main(String[] args) throws IOException { + Reactor reactor = Proton.reactor(new Delegates(new Hello(), new Goodbye())); + reactor.run(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/EchoInputStreamWrapper.java ---------------------------------------------------------------------- diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/EchoInputStreamWrapper.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/EchoInputStreamWrapper.java new file mode 100644 index 0000000..2e53d09 --- /dev/null +++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/EchoInputStreamWrapper.java @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.example.reactor; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Pipe; +import java.nio.channels.Pipe.SinkChannel; +import java.nio.channels.Pipe.SourceChannel; +import java.util.concurrent.atomic.AtomicInteger; + +public class EchoInputStreamWrapper extends Thread { + + private final InputStream in; + private final SinkChannel out; + private final byte[] bufferBytes = new byte[1024]; + private final ByteBuffer buffer = ByteBuffer.wrap(bufferBytes); + private final AtomicInteger idCounter = new AtomicInteger(); + + private EchoInputStreamWrapper(InputStream in, SinkChannel out) { + this.in = in; + this.out = out; + setName(getClass().getName() + "-" + idCounter.incrementAndGet()); + setDaemon(true); + } + + @Override + public void run() { + try { + while(true) { + int amount = in.read(bufferBytes); + if (amount < 0) break; + buffer.position(0); + buffer.limit(amount); + out.write(buffer); + } + } catch(IOException ioException) { + ioException.printStackTrace(); + } finally { + try { + out.close(); + } catch(IOException ioException) { + ioException.printStackTrace(); + } + } + } + + public static SourceChannel wrap(InputStream in) throws IOException { + Pipe pipe = Pipe.open(); + new EchoInputStreamWrapper(in, pipe.sink()).start(); + SourceChannel result = pipe.source(); + result.configureBlocking(false); + return result; + } + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GlobalLogger.java ---------------------------------------------------------------------- diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GlobalLogger.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GlobalLogger.java new file mode 100644 index 0000000..1bb3d3e --- /dev/null +++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GlobalLogger.java @@ -0,0 +1,114 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.example.reactor; + +import java.io.IOException; + +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.reactor.Reactor; + +/* +# Not every event goes to the reactor's event handler. If we have a +# separate handler for something like a scheduled task, then those +# events aren't logged by the logger associated with the reactor's +# handler. Sometimes this is useful if you don't want to see them, but +# sometimes you want the global picture. + +class Logger: + + def on_unhandled(self, name, event): + print "LOG:", name, event + +class Task: + + def on_timer_task(self, event): + print "Mission accomplished!" + +class Program: + + def on_reactor_init(self, event): + print "Hello, World!" + event.reactor.schedule(0, Task()) + + def on_reactor_final(self, event): + print "Goodbye, World!" + +r = Reactor(Program()) + +# In addition to having a regular handler, the reactor also has a +# global handler that sees every event. By adding the Logger to the +# global handler instead of the regular handler, we can log every +# single event that occurs in the system regardless of whether or not +# there are specific handlers associated with the objects that are the +# target of those events. +r.global_handler.add(Logger()) +r.run() + + */ + +// Not every event goes to the reactor's event handler. If we have a +// separate handler for something like a scheduled task, then those +// events aren't logged by the logger associated with the reactor's +// handler. Sometimes this is useful if you don't want to see them, but +// sometimes you want the global picture. +public class GlobalLogger extends BaseHandler { + + static class Logger extends BaseHandler { + @Override + public void onUnhandled(Event event) { + System.out.println("LOG: " + event); + } + } + + static class Task extends BaseHandler { + @Override + public void onTimerTask(Event e) { + System.out.println("Mission accomplished!"); + } + } + + @Override + public void onReactorInit(Event event) { + System.out.println("Hello, World!"); + event.getReactor().schedule(0, new Task()); + } + + @Override + public void onReactorFinal(Event e) { + System.out.println("Goodbye, World!"); + } + + public static void main(String[] args) throws IOException { + Reactor reactor = Proton.reactor(new GlobalLogger()); + + // In addition to having a regular handler, the reactor also has a + // global handler that sees every event. By adding the Logger to the + // global handler instead of the regular handler, we can log every + // single event that occurs in the system regardless of whether or not + // there are specific handlers associated with the objects that are the + // target of those events. + reactor.getGlobalHandler().add(new Logger()); + reactor.run(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GoodbyeWorld.java ---------------------------------------------------------------------- diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GoodbyeWorld.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GoodbyeWorld.java new file mode 100644 index 0000000..b04273b --- /dev/null +++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GoodbyeWorld.java @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.example.reactor; + +import java.io.IOException; + +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.reactor.Reactor; + +// TODO: sort out docs! +// So far the reactive hello-world doesn't look too different from a +// regular old non-reactive hello-world. The on_reactor_init method can +// be used roughly as a 'main' method would. A program that only uses +// that one event, however, isn't going to be very reactive. By using +// other events, we can write a fully reactive program. + +public class GoodbyeWorld extends BaseHandler { + + // As before we handle the reactor init event. + @Override + public void onReactorInit(Event event) { + System.out.println("Hello, World!"); + } + + // In addition to an initial event, the reactor also produces an + // event when it is about to exit. This may not behave much + // differently than just putting the goodbye print statement inside + // on_reactor_init, but as we grow our program, this piece of it + // will always be what happens last, and will always happen + // regardless of what other paths the main logic of our program + // might take. + @Override + public void onReactorFinal(Event e) { + System.out.println("Goodbye, World!");; + } + + public static void main(String[] args) throws IOException { + Reactor reactor = Proton.reactor(new GoodbyeWorld()); + reactor.run(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java ---------------------------------------------------------------------- diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java new file mode 100644 index 0000000..745004e --- /dev/null +++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.example.reactor; + +import java.io.IOException; + +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.reactor.Reactor; + +// TODO: sort out docs! +/* + * The proton reactor provides a general purpose event processing + * library for writing reactive programs. A reactive program is defined + * by a set of event handlers. An event handler is just any class or + * object that extends the Handler interface. For convinience, a class + * can extend BaseHandler and only handle the events that it cares to + * implement methods for. + */ +public class HelloWorld extends BaseHandler { + + // The reactor init event is produced by the reactor itself when it + // starts. + @Override + public void onReactorInit(Event event) { + System.out.println("Hello, World!"); + } + + public static void main(String[] args) throws IOException { + + // When you construct a reactor, you can give it a handler that + // is used, by default. + Reactor reactor = Proton.reactor(new HelloWorld()); + + // When you call run, the reactor will process events. The reactor init + // event is what kicks off everything else. When the reactor has no + // more events to process, it exits. + reactor.run(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/ReactorLogger.java ---------------------------------------------------------------------- diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/ReactorLogger.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/ReactorLogger.java new file mode 100644 index 0000000..b4a8cba --- /dev/null +++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/ReactorLogger.java @@ -0,0 +1,103 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.example.reactor; + +import java.io.IOException; + +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.reactor.Reactor; + +/* +class Logger: + + def on_unhandled(self, name, event): + print "LOG:", name, event + +class Program: + + def on_reactor_init(self, event): + print "Hello, World!" + + def on_reactor_final(self, event): + print "Goodbye, World!" + +# You can pass multiple handlers to a reactor when you construct it. +# Each of these handlers will see every event the reactor sees. By +# combining this with on_unhandled, you can log each event that goes +# to the reactor. +r = Reactor(Program(), Logger()) +r.run() + +# Note that if you wanted to add the logger later, you could also +# write the above as below. All arguments to the reactor are just +# added to the default handler for the reactor. + +def logging_enabled(): + return False + +r = Reactor(Program()) +if logging_enabled(): + r.handler.add(Logger()) +r.run() + + */ +public class ReactorLogger extends BaseHandler { + + public static class Logger extends BaseHandler { + @Override + public void onUnhandled(Event event) { + System.out.println("LOG: " + event); + } + } + + @Override + public void onReactorInit(Event e) { + System.out.println("Hello, World!"); + } + + @Override + public void onReactorFinal(Event e) { + System.out.println("Goodbye, World!"); + } + + private static boolean loggingEnabled = false; + + public static void main(String[] args) throws IOException { + + // You can pass multiple handlers to a reactor when you construct it. + // Each of these handlers will see every event the reactor sees. By + // combining this with on_unhandled, you can log each event that goes + // to the reactor. + Reactor reactor = Proton.reactor(new ReactorLogger(), new Logger()); + reactor.run(); + + // Note that if you wanted to add the logger later, you could also + // write the above as below. All arguments to the reactor are just + // added to the default handler for the reactor. + reactor = Proton.reactor(new ReactorLogger()); + if (loggingEnabled) + reactor.getHandler().add(new Logger()); + reactor.run(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Scheduling.java ---------------------------------------------------------------------- diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Scheduling.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Scheduling.java new file mode 100644 index 0000000..47e0cb3 --- /dev/null +++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Scheduling.java @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.example.reactor; + +import java.io.IOException; + +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.reactor.Reactor; +import org.apache.qpid.proton.reactor.Task; + +public class Scheduling extends BaseHandler { + + private long startTime; + + @Override + public void onReactorInit(Event event) { + startTime = System.currentTimeMillis(); + System.out.println("Hello, World!"); + Task task = event.getReactor().schedule(1000, this); + task.setAttachment("Yay"); + } + + @Override + public void onTimerTask(Event event) { + Task task = event.getTask(); + System.out.println(task.getAttachment() + " my task is complete!"); + } + + @Override + public void onReactorFinal(Event e) { + long elapsedTime = System.currentTimeMillis() - startTime; + System.out.println("Goodbye, World! (after " + elapsedTime + " long milliseconds)"); + } + + public static void main(String[] args) throws IOException { + Reactor reactor = Proton.reactor(new Scheduling()); + reactor.run(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Unhandled.java ---------------------------------------------------------------------- diff --git a/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Unhandled.java b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Unhandled.java new file mode 100644 index 0000000..a3cc200 --- /dev/null +++ b/examples/java/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Unhandled.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.example.reactor; + +import java.io.IOException; + +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.reactor.Reactor; + +public class Unhandled extends BaseHandler { + + // If an event occurs and its handler doesn't have an on_ + // method, the reactor will attempt to call the on_unhandled method + // if it exists. This can be useful not only for debugging, but for + // logging and for delegating/inheritance. + @Override + public void onUnhandled(Event event) { + System.out.println(event); + } + + public static void main(String[] args) throws IOException { + Reactor reactor = Proton.reactor(new Unhandled()); + reactor.run(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/Proton.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/Proton.java b/proton-j/src/main/java/org/apache/qpid/proton/Proton.java index 39b04e5..b64225a 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/Proton.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/Proton.java @@ -21,8 +21,7 @@ package org.apache.qpid.proton; import java.io.IOException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; + import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; import org.apache.qpid.proton.amqp.messaging.Footer; @@ -33,14 +32,16 @@ import org.apache.qpid.proton.amqp.messaging.Section; import org.apache.qpid.proton.codec.Codec; import org.apache.qpid.proton.codec.Data; import org.apache.qpid.proton.driver.Driver; -import org.apache.qpid.proton.engine.Engine; import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Engine; +import org.apache.qpid.proton.engine.Handler; import org.apache.qpid.proton.engine.SslDomain; import org.apache.qpid.proton.engine.SslPeerDetails; import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.messenger.Messenger; +import org.apache.qpid.proton.reactor.Reactor; public final class Proton { @@ -110,4 +111,17 @@ public final class Proton return Driver.Factory.create(); } + public static Reactor reactor() throws IOException + { + return Reactor.Factory.create(); + } + + public static Reactor reactor(Handler... handlers) throws IOException + { + Reactor reactor = Reactor.Factory.create(); + for (Handler handler : handlers) { + reactor.getHandler().add(handler); + } + return reactor; + } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/BaseHandler.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/BaseHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/BaseHandler.java index 94f4d12..ac17c5e 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/BaseHandler.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/BaseHandler.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.proton.engine; +import java.util.HashSet; +import java.util.Iterator; + /** * BaseHandler @@ -28,6 +31,7 @@ package org.apache.qpid.proton.engine; public class BaseHandler implements Handler { + private HashSet children = new HashSet(); @Override public void onConnectionInit(Event e) { onUnhandled(e); } @Override public void onConnectionLocalOpen(Event e) { onUnhandled(e); } @@ -62,6 +66,30 @@ public class BaseHandler implements Handler @Override public void onTransportTailClosed(Event e) { onUnhandled(e); } @Override public void onTransportClosed(Event e) { onUnhandled(e); } + @Override public void onReactorInit(Event e) { onUnhandled(e); } + @Override public void onReactorQuiesced(Event e) { onUnhandled(e); } + @Override public void onReactorFinal(Event e) { onUnhandled(e); } + + @Override public void onTimerTask(Event e) { onUnhandled(e); } + + @Override public void onSelectableInit(Event e) { onUnhandled(e); } + @Override public void onSelectableUpdated(Event e) { onUnhandled(e); } + @Override public void onSelectableReadable(Event e) { onUnhandled(e); } + @Override public void onSelectableWritable(Event e) { onUnhandled(e); } + @Override public void onSelectableExpired(Event e) { onUnhandled(e); } + @Override public void onSelectableError(Event e) { onUnhandled(e); } + @Override public void onSelectableFinal(Event e) { onUnhandled(e); } + @Override public void onUnhandled(Event event) {} + @Override + public void add(Handler child) { + children.add(child); + } + + @Override + public Iterator children() { + return children.iterator(); + } + } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java index f9e6fe5..142406e 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Collector.java @@ -41,4 +41,5 @@ public interface Collector void pop(); + boolean more(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java index 5547a57..5cb57a2 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Connection.java @@ -22,8 +22,8 @@ package org.apache.qpid.proton.engine; import java.util.EnumSet; import java.util.Map; -import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.engine.impl.ConnectionImpl; @@ -35,7 +35,7 @@ import org.apache.qpid.proton.engine.impl.ConnectionImpl; * {@link #sessionHead(EnumSet, EnumSet)}, {@link #linkHead(EnumSet, EnumSet)} * {@link #getWorkHead()} respectively. */ -public interface Connection extends Endpoint +public interface Connection extends HandlerEndpoint { public static final class Factory http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java index ddb6937..d69b282 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.proton.engine; +import org.apache.qpid.proton.reactor.Reactor; +import org.apache.qpid.proton.reactor.Selectable; +import org.apache.qpid.proton.reactor.Task; + /** * Event @@ -95,6 +99,12 @@ public interface Event Transport getTransport(); + Reactor getReactor(); + + Selectable getSelectable(); + + Task getTask(); + Event copy(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/Handler.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Handler.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Handler.java index 5ff77e0..fe72091 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Handler.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Handler.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.proton.engine; +import java.util.Iterator; + /** * Handler @@ -62,6 +64,22 @@ public interface Handler void onTransportTailClosed(Event e); void onTransportClosed(Event e); + void onReactorInit(Event e); + void onReactorQuiesced(Event e); + void onReactorFinal(Event e); + + void onTimerTask(Event e); + + void onSelectableInit(Event e); + void onSelectableUpdated(Event e); + void onSelectableReadable(Event e); + void onSelectableWritable(Event e); + void onSelectableExpired(Event e); + void onSelectableError(Event e); + void onSelectableFinal(Event e); + void onUnhandled(Event e); + void add(Handler child); + Iterator children(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/HandlerEndpoint.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/HandlerEndpoint.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/HandlerEndpoint.java new file mode 100644 index 0000000..ecadc0a --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/HandlerEndpoint.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.engine; + +public interface HandlerEndpoint extends Endpoint { + + void add(Handler handler); + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java index c965a29..caafc14 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java @@ -21,7 +21,7 @@ package org.apache.qpid.proton.engine; import java.util.EnumSet; -import java.util.Iterator; + import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.amqp.transport.Source; @@ -37,7 +37,7 @@ import org.apache.qpid.proton.amqp.transport.Target; * * TODO describe the application's responsibility to honour settlement. */ -public interface Link extends Endpoint +public interface Link extends HandlerEndpoint { /** http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java index f2f048a..eaddac0 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java @@ -28,7 +28,7 @@ import java.util.EnumSet; * * Note that session level flow control is handled internally by Proton. */ -public interface Session extends Endpoint +public interface Session extends HandlerEndpoint { /** * Returns a newly created sender endpoint http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java index e222819..fe09a23 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java @@ -23,9 +23,6 @@ package org.apache.qpid.proton.engine.impl; import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Event; -import java.util.LinkedList; -import java.util.Queue; - /** * CollectorImpl @@ -42,11 +39,13 @@ public class CollectorImpl implements Collector public CollectorImpl() {} + @Override public Event peek() { return head; } + @Override public void pop() { if (head != null) { @@ -87,4 +86,9 @@ public class CollectorImpl implements Collector return event; } + @Override + public boolean more() { + return head != null && head.next != null; + } + } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java index 17ffde7..eecc05e 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java @@ -29,7 +29,7 @@ import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.engine.*; import org.apache.qpid.proton.amqp.transport.Open; -public class ConnectionImpl extends EndpointImpl implements ProtonJConnection +public class ConnectionImpl extends HandlerEndpointImpl implements ProtonJConnection { public static final int MAX_CHANNELS = 65535; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java index 3317bf9..65a2000 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java @@ -20,13 +20,18 @@ */ package org.apache.qpid.proton.engine.impl; +import java.util.Iterator; + +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Event; import org.apache.qpid.proton.engine.Handler; -import org.apache.qpid.proton.engine.Connection; -import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.engine.Link; -import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.engine.Transport; +import org.apache.qpid.proton.reactor.Reactor; +import org.apache.qpid.proton.reactor.Selectable; +import org.apache.qpid.proton.reactor.Task; /** * EventImpl @@ -57,16 +62,19 @@ class EventImpl implements Event context = null; } + @Override public Type getType() { return type; } + @Override public Object getContext() { return context; } + @Override public void dispatch(Handler handler) { switch (type) { @@ -157,12 +165,51 @@ class EventImpl implements Event case TRANSPORT_CLOSED: handler.onTransportClosed(this); break; + case REACTOR_FINAL: + handler.onReactorFinal(this); + break; + case REACTOR_QUIESCED: + handler.onReactorQuiesced(this); + break; + case REACTOR_INIT: + handler.onReactorInit(this); + break; + case SELECTABLE_ERROR: + handler.onSelectableError(this); + break; + case SELECTABLE_EXPIRED: + handler.onSelectableExpired(this); + break; + case SELECTABLE_FINAL: + handler.onSelectableFinal(this); + break; + case SELECTABLE_INIT: + handler.onSelectableInit(this); + break; + case SELECTABLE_READABLE: + handler.onSelectableReadable(this); + break; + case SELECTABLE_UPDATED: + handler.onSelectableWritable(this); + break; + case SELECTABLE_WRITABLE: + handler.onSelectableWritable(this); + break; + case TIMER_TASK: + handler.onTimerTask(this); + break; default: handler.onUnhandled(this); break; } + + Iterator children = handler.children(); + while(children.hasNext()) { + dispatch(children.next()); + } } + @Override public Connection getConnection() { if (context instanceof Connection) { @@ -182,6 +229,7 @@ class EventImpl implements Event } } + @Override public Session getSession() { if (context instanceof Session) { @@ -195,6 +243,7 @@ class EventImpl implements Event } } + @Override public Link getLink() { if (context instanceof Link) { @@ -208,6 +257,7 @@ class EventImpl implements Event } } + @Override public Delivery getDelivery() { if (context instanceof Delivery) { @@ -217,6 +267,7 @@ class EventImpl implements Event } } + @Override public Transport getTransport() { if (context instanceof Transport) { @@ -225,6 +276,52 @@ class EventImpl implements Event return null; } } + + @Override + public Selectable getSelectable() { + if (context instanceof Selectable) { + return (Selectable) context; + } else { + return null; + } + } + + @Override + public Reactor getReactor() { + if (context instanceof Reactor) { + return (Reactor) context; + } else if (context instanceof Task) { + return ((Task)context).getReactor(); + } else if (context instanceof Transport) { + return ((TransportImpl)context).getReactor(); + } else if (context instanceof Delivery) { + Transport transport = ((Delivery)context).getLink().getSession().getConnection().getTransport(); + return ((TransportImpl)transport).getReactor(); + } else if (context instanceof Link) { + Transport transport = ((Link)context).getSession().getConnection().getTransport(); + return ((TransportImpl)transport).getReactor(); + } else if (context instanceof Session) { + Transport transport = ((Session)context).getConnection().getTransport(); + return ((TransportImpl)transport).getReactor(); + } else if (context instanceof Connection) { + Transport transport = ((Connection)context).getTransport(); + return ((TransportImpl)transport).getReactor(); + } else if (context instanceof Selectable) { + return ((Selectable)context).getReactor(); + } + return null; + } + + @Override + public Task getTask() { + if (context instanceof Task) { + return (Task) context; + } else { + return null; + } + } + + @Override public Event copy() { EventImpl newEvent = new EventImpl(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandlerEndpointImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandlerEndpointImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandlerEndpointImpl.java new file mode 100644 index 0000000..a108412 --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandlerEndpointImpl.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.engine.impl; + +import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.Handler; +import org.apache.qpid.proton.engine.HandlerEndpoint; + + +public abstract class HandlerEndpointImpl extends EndpointImpl implements HandlerEndpoint { + + private Handler handler = null; + + @Override + public void add(Handler handler) { + if (this.handler == null) { + this.handler = new BaseHandler(); + } + this.handler.add(handler); + } + + public Handler getHandler() { + return handler; + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java index 9966526..af92fb8 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java @@ -21,15 +21,16 @@ package org.apache.qpid.proton.engine.impl; import java.util.EnumSet; + import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; -import org.apache.qpid.proton.engine.EndpointState; -import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.amqp.transport.Source; import org.apache.qpid.proton.amqp.transport.Target; +import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Link; -public abstract class LinkImpl extends EndpointImpl implements Link +public abstract class LinkImpl extends HandlerEndpointImpl implements Link { private final SessionImpl _session; @@ -68,16 +69,19 @@ public abstract class LinkImpl extends EndpointImpl implements Link } + @Override public String getName() { return _name; } + @Override public DeliveryImpl delivery(byte[] tag) { return delivery(tag, 0, tag.length); } + @Override public DeliveryImpl delivery(byte[] tag, int offset, int length) { if (offset != 0 || length != tag.length) @@ -146,11 +150,13 @@ public abstract class LinkImpl extends EndpointImpl implements Link } } + @Override public DeliveryImpl current() { return _current; } + @Override public boolean advance() { if(_current != null ) @@ -178,11 +184,13 @@ public abstract class LinkImpl extends EndpointImpl implements Link return _session.getConnectionImpl(); } + @Override public SessionImpl getSession() { return _session; } + @Override public Source getRemoteSource() { return _remoteSource; @@ -193,6 +201,7 @@ public abstract class LinkImpl extends EndpointImpl implements Link _remoteSource = source; } + @Override public Target getRemoteTarget() { return _remoteTarget; @@ -203,28 +212,33 @@ public abstract class LinkImpl extends EndpointImpl implements Link _remoteTarget = target; } + @Override public Source getSource() { return _source; } + @Override public void setSource(Source source) { // TODO - should be an error if local state is ACTIVE _source = source; } + @Override public Target getTarget() { return _target; } + @Override public void setTarget(Target target) { // TODO - should be an error if local state is ACTIVE _target = target; } + @Override public Link next(EnumSet local, EnumSet remote) { LinkNode.Query query = new EndpointImplQuery(local, remote); @@ -237,6 +251,7 @@ public abstract class LinkImpl extends EndpointImpl implements Link abstract TransportLink getTransportLink(); + @Override public int getCredit() { return _credit; @@ -267,6 +282,7 @@ public abstract class LinkImpl extends EndpointImpl implements Link _credit--; } + @Override public int getQueued() { return _queued; @@ -282,6 +298,7 @@ public abstract class LinkImpl extends EndpointImpl implements Link _queued--; } + @Override public int getUnsettled() { return _unsettled; @@ -302,6 +319,7 @@ public abstract class LinkImpl extends EndpointImpl implements Link _drain = drain; } + @Override public boolean getDrain() { return _drain; @@ -354,6 +372,7 @@ public abstract class LinkImpl extends EndpointImpl implements Link _remoteReceiverSettleMode = remoteReceiverSettleMode; } + @Override public int drained() { int drained = 0; @@ -384,11 +403,13 @@ public abstract class LinkImpl extends EndpointImpl implements Link _drained = value; } + @Override public int getRemoteCredit() { return _credit - _queued; } + @Override public DeliveryImpl head() { return _head; @@ -406,6 +427,7 @@ public abstract class LinkImpl extends EndpointImpl implements Link getConnectionImpl().put(Event.Type.LINK_LOCAL_CLOSE, this); } + @Override public void detach() { _detached = true; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java index 45fcb70..9969b93 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java @@ -20,14 +20,18 @@ */ package org.apache.qpid.proton.engine.impl; -import java.util.*; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; import org.apache.qpid.proton.engine.EndpointState; +import org.apache.qpid.proton.engine.Event; import org.apache.qpid.proton.engine.ProtonJSession; import org.apache.qpid.proton.engine.Session; -import org.apache.qpid.proton.engine.Event; -public class SessionImpl extends EndpointImpl implements ProtonJSession +public class SessionImpl extends HandlerEndpointImpl implements ProtonJSession { private final ConnectionImpl _connection; @@ -52,6 +56,7 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession _connection.put(Event.Type.SESSION_INIT, this); } + @Override public SenderImpl sender(String name) { SenderImpl sender = _senders.get(name); @@ -74,6 +79,7 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession return sender; } + @Override public ReceiverImpl receiver(String name) { ReceiverImpl receiver = _receivers.get(name); @@ -96,6 +102,7 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession return receiver; } + @Override public Session next(EnumSet local, EnumSet remote) { LinkNode.Query query = new EndpointImplQuery(local, remote); @@ -111,6 +118,7 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession return _connection; } + @Override public ConnectionImpl getConnection() { return getConnectionImpl(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java index a5c8ba9..f4813cd 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java @@ -57,6 +57,8 @@ import org.apache.qpid.proton.engine.TransportResult; import org.apache.qpid.proton.engine.TransportResultFactory; import org.apache.qpid.proton.engine.impl.ssl.SslImpl; import org.apache.qpid.proton.framing.TransportFrame; +import org.apache.qpid.proton.reactor.Reactor; +import org.apache.qpid.proton.reactor.Selectable; public class TransportImpl extends EndpointImpl implements ProtonJTransport, FrameBody.FrameBodyHandler, @@ -129,6 +131,9 @@ public class TransportImpl extends EndpointImpl private long _lastBytesOutput = 0; private long _remoteIdleDeadline = 0; + private Selectable _selectable; + private Reactor _reactor; + /** * @deprecated This constructor's visibility will be reduced to the default scope in a future release. * Client code outside this module should use a {@link EngineFactory} instead @@ -1438,14 +1443,17 @@ public class TransportImpl extends EndpointImpl } } + @Override public void setIdleTimeout(int timeout) { _localIdleTimeout = timeout; } + @Override public int getIdleTimeout() { return _localIdleTimeout; } + @Override public int getRemoteIdleTimeout() { return _remoteIdleTimeout; } @@ -1525,6 +1533,7 @@ public class TransportImpl extends EndpointImpl _outputProcessor.close_head(); } + @Override public boolean isClosed() { int p = pending(); int c = capacity(); @@ -1588,4 +1597,20 @@ public class TransportImpl extends EndpointImpl @Override void localClose() {} + + public void setSelectable(Selectable selectable) { + _selectable = selectable; + } + + public Selectable getSelectable() { + return _selectable; + } + + public void setReactor(Reactor reactor) { + _reactor = reactor; + } + + public Reactor getReactor() { + return _reactor; + } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java new file mode 100644 index 0000000..02c5de2 --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.reactor; + +import java.io.IOException; +import java.util.Set; + +import org.apache.qpid.proton.engine.Collector; +import org.apache.qpid.proton.engine.Handler; +import org.apache.qpid.proton.reactor.impl.ReactorImpl; + + + +public interface Reactor { + + public static final class Factory + { + public static Reactor create() throws IOException { + return new ReactorImpl(); + } + } + + public long mark(); + public long now(); + public void attach(Object attachment); + public Object attachment(); + public long getTimeout(); + + public void setTimeout(long timeout); + + public Handler getGlobalHandler(); + + public void setGlobalHandler(Handler handler); + + public Handler getHandler(); + + public void setHandler(Handler handler); + +/* TODO + * pn_io_t *pn_reactor_io(pn_reactor_t *reactor) { +166 assert(reactor); +167 return reactor->io; +168 } +169 + + */ + + public Set children(); + + public Collector collector(); + + + public Selectable selectable(); + + + + public void update(Selectable selectable); + + + void yield() ; + + public boolean quiesced(); + + public boolean process(); + + public void wakeup() throws IOException; + + public void start() ; + + public void stop() ; + + public void run(); + + // pn_reactor_schedule from reactor.c + public Task schedule(int delay, Handler handler); + // TODO: acceptor + // TODO: connection + // TODO: acceptorClose + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java new file mode 100644 index 0000000..c2b560f --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java @@ -0,0 +1,113 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.reactor; + +import java.nio.channels.SelectableChannel; + +import org.apache.qpid.proton.engine.Collector; +import org.apache.qpid.proton.engine.Handler; +import org.apache.qpid.proton.engine.Transport; + +public interface Selectable { + + public interface Callback { + void run(Selectable selectable); + } + + public boolean isReading(); + + boolean isWriting(); + + long getDeadline() ; + + void setReading(boolean reading) ; + + void setWriting(boolean writing); + + void setDeadline(long deadline) ; + + public void onReadable(Callback runnable) ; + + public void onWritable(Callback runnable); + + public void onExpired(Callback runnable); + + public void onError(Callback runnable); + + public void onRelease(Callback runnable); + + public void onFinalize(Callback runnable); + + void readable() ; + + void writeable() ; + + void expired() ; + + void error(); + + void release() ; + + void _finalize() ; + + // These are equivalent to the C code's set/get file descritor functions. + void setChannel(SelectableChannel channel) ; + + public SelectableChannel getChannel() ; + + void setAttachment(Object attachment) ; + + Object getAttachment() ; + + boolean isRegistered() ; + + void setRegistered(boolean registered) ; + + void setCollector(final Collector collector) ; + + public Reactor getReactor() ; + public void terminate() ; + + public enum RecordKeyType { + PNI_TERMINATED + } + + public enum RecordValueType { + PN_VOID + } + + public boolean hasRecord(RecordKeyType type); + + public void setRecord(RecordKeyType key, RecordValueType value) ; + public boolean isTerminal(); + + + public Transport getTransport() ; + + public void setTransport(Transport transport) ; + + public void setReactor(Reactor reactor) ; + + public void add(Handler handler); + + public Handler getHandler() ; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java new file mode 100644 index 0000000..12188e2 --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.reactor; + +import java.io.IOException; +import java.util.Iterator; + +public interface Selector { + + public void add(Selectable selectable) throws IOException ; + + public void update(Selectable selectable); + + public void remove(Selectable selectable) ; + + public void select(long timeout) throws IOException ; + + public Iterator readable() ; + + public Iterator writeable() ; + + public Iterator expired() ; + public Iterator error() ; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e0187017/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java new file mode 100644 index 0000000..88031c5 --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.reactor; + +import org.apache.qpid.proton.engine.Handler; + +public interface Task { + + public long deadline(); + public void setReactor(Reactor reactor) ; + public Reactor getReactor(); + public void setHandler(Handler handler) ; + public Handler getHandler() ; + public Object getAttachment() ; + public void setAttachment(Object attachment) ; +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org