qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject [1/5] qpid-proton-j git commit: PROTON-1385: remove redundant and offset 'java' dirs from examples area
Date Fri, 13 Jan 2017 18:15:16 GMT
Repository: qpid-proton-j
Updated Branches:
  refs/heads/master 2afe2ec09 -> 72b99372f


http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/reactor/README.md
----------------------------------------------------------------------
diff --git a/examples/reactor/README.md b/examples/reactor/README.md
new file mode 100644
index 0000000..c6b532f
--- /dev/null
+++ b/examples/reactor/README.md
@@ -0,0 +1,55 @@
+The Reactor API provides a means to dispatch events occurring across
+one or more connections. It can be used purely as a dispatch tool
+alongside your own I/O mechanism, however by default it is configured
+with a handler that provides I/O for you.
+
+When programming with the reactor it is important to understand the
+dispatch model used to process events. Every event is associated with
+a context object, i.e. the *target* object upon which the event
+occurred. These objects are contained either directly or indirectly
+within the Reactor:
+
+    Delivery --> Link --> Session --> Connection --+
+                                                   |
+                                            Task --+--> Reactor
+                                                   |
+                                      Selectable --+
+
+
+Each event is dispatched first to a target-specific handler, and
+second to a global handler. The target-specific handler for an event
+is located by searching from the event context up through the
+hierarchy (terminating with the Reactor) and retrieving the most
+specific handler found.
+
+This means that any handler set on the Reactor could receive events
+targeting any object. For example if no handlers are associated with a
+Connection or any of its child objects, then the Reactor's handler
+will receive all the events for that Connection.
+
+Putting a handler on any child, e.g. a Connection or Session or Link
+will prevent any handlers set on the ancestors of that object from
+seeing any events targeted for that object or its children unless that
+handler specifically chooses to delegate those events up to the
+parent, e.g. by overriding onUnhandled and delegating.
+
+The global handler (used to dispatch all events after the
+target-specific handler is invoked) can be accessed and modified using
+Reactor.set/getGlobalHandler. This can be useful for a number of
+reasons, e.g. you could log all events by doing this:
+
+    reactor.getGlobalHandler().add(new LoggerHandler());
+
+Where LoggerHandler does this:
+
+    public void onUnhandled(Event evt) {
+        System.out.println(evt);
+    }
+
+The above trick is particularly useful for debugging.
+
+Handlers themselves can have child handlers which will automatically
+delegate the event to those children *after* dispatching the event to
+itself. The default global handler is what provides the default I/O
+behavior of the reactor. To use the reactor as a pure dispatch
+mechanism you can simply set the global handler to null.

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/reactor/pom.xml
----------------------------------------------------------------------
diff --git a/examples/reactor/pom.xml b/examples/reactor/pom.xml
new file mode 100644
index 0000000..32e995e
--- /dev/null
+++ b/examples/reactor/pom.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <parent>
+    <groupId>org.apache.qpid</groupId>
+    <artifactId>proton-project</artifactId>
+    <version>0.17.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>proton-j-reactor-examples</artifactId>
+  <name>proton-j-reactor-examples</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.qpid</groupId>
+      <artifactId>proton-j</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+  </dependencies>
+
+  <scm>
+    <url>http://svn.apache.org/viewvc/qpid/proton/</url>
+  </scm>
+</project>

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/reactor/run
----------------------------------------------------------------------
diff --git a/examples/reactor/run b/examples/reactor/run
new file mode 100755
index 0000000..51bd155
--- /dev/null
+++ b/examples/reactor/run
@@ -0,0 +1,23 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+CLASS=$1
+shift
+mvn -q -e exec:java -Dexec.mainClass=org.apache.qpid.proton.example.reactor.${CLASS} -Dexec.args="$*"

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Cat.java
----------------------------------------------------------------------
diff --git a/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Cat.java b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Cat.java
new file mode 100644
index 0000000..cb8ceca
--- /dev/null
+++ b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Cat.java
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Pipe.SourceChannel;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+import org.apache.qpid.proton.reactor.Selectable;
+
+public class Cat extends BaseHandler {
+
+    private class EchoHandler extends BaseHandler {
+        @Override
+        public void onSelectableInit(Event event) {
+            Selectable selectable = event.getSelectable();
+            // We can configure a selectable with any SelectableChannel we want.
+            selectable.setChannel(channel);
+            // Ask to be notified when the channel is readable
+            selectable.setReading(true);
+            event.getReactor().update(selectable);
+        }
+
+        @Override
+        public void onSelectableReadable(Event event) {
+            Selectable selectable = event.getSelectable();
+
+            // The onSelectableReadable event tells us that there is data
+            // to be read, or the end of stream has been reached.
+            SourceChannel channel = (SourceChannel)selectable.getChannel();
+            ByteBuffer buffer = ByteBuffer.allocate(1024);
+            try {
+                while(true) {
+                    int amount = channel.read(buffer);
+                    if (amount < 0) {
+                        selectable.terminate();
+                        selectable.getReactor().update(selectable);
+                    }
+                    if (amount <= 0) break;
+                    System.out.write(buffer.array(), 0, buffer.position());
+                    buffer.clear();
+                }
+            } catch(IOException ioException) {
+                ioException.printStackTrace();
+                selectable.terminate();
+                selectable.getReactor().update(selectable);
+            }
+        }
+    }
+
+    private final SourceChannel channel;
+
+    private Cat(SourceChannel channel) {
+        this.channel = channel;
+    }
+
+    @Override
+    public void onReactorInit(Event event) {
+        Reactor reactor = event.getReactor();
+        Selectable selectable = reactor.selectable();
+        setHandler(selectable, new EchoHandler());
+        reactor.update(selectable);
+    }
+
+    public static void main(String[] args) throws IOException {
+        if (args.length != 1) {
+            System.err.println("Specify a file name as an argument.");
+            System.exit(1);
+        }
+        FileInputStream inFile = new FileInputStream(args[0]);
+        SourceChannel inChannel = EchoInputStreamWrapper.wrap(inFile);
+        Reactor reactor = Proton.reactor(new Cat(inChannel));
+        reactor.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/CountRandomly.java
----------------------------------------------------------------------
diff --git a/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/CountRandomly.java b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/CountRandomly.java
new file mode 100644
index 0000000..9a5a0b4
--- /dev/null
+++ b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/CountRandomly.java
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+
+// Let's try to modify our counter example. In addition to counting to
+// 10 in quarter second intervals, let's also print out a random number
+// every half second. This is not a super easy thing to express in a
+// purely sequential program, but not so difficult using events.
+public class CountRandomly extends BaseHandler {
+
+    private long startTime;
+    private CounterHandler counter;
+
+    class CounterHandler extends BaseHandler {
+        private final int limit;
+        private int count;
+
+        CounterHandler(int limit) {
+            this.limit = limit;
+        }
+
+        @Override
+        public void onTimerTask(Event event) {
+            count += 1;
+            System.out.println(count);
+
+            if (!done()) {
+                event.getReactor().schedule(250, this);
+            }
+        }
+
+        // Provide a method to check for doneness
+        private boolean done() {
+            return count >= limit;
+        }
+    }
+
+    @Override
+    public void onReactorInit(Event event) {
+        startTime = System.currentTimeMillis();
+        System.out.println("Hello, World!");
+
+        // Save the counter instance in an attribute so we can refer to
+        // it later.
+        counter = new CounterHandler(10);
+        event.getReactor().schedule(250, counter);
+
+        // Now schedule another event with a different handler. Note
+        // that the timer tasks go to separate handlers, and they don't
+        // interfere with each other.
+        event.getReactor().schedule(500, this);
+    }
+
+    @Override
+    public void onTimerTask(Event event) {
+        // keep on shouting until we are done counting
+        System.out.println("Yay, " + Math.round(Math.abs((Math.random() * 110) - 10)));
+        if (!counter.done()) {
+            event.getReactor().schedule(500, this);
+        }
+    }
+
+    @Override
+    public void onReactorFinal(Event event) {
+        long elapsedTime = System.currentTimeMillis() - startTime;
+        System.out.println("Goodbye, World! (after " + elapsedTime + " long milliseconds)");
+    }
+
+    public static void main(String[] args) throws IOException {
+        // In HelloWorld.java we said the reactor exits when there are no more
+        // events to process. While this is true, it's not actually complete.
+        // The reactor exits when there are no more events to process and no
+        // possibility of future events arising. For that reason the reactor
+        // will keep running until there are no more scheduled events and then
+        // exit.
+        Reactor reactor = Proton.reactor(new CountRandomly());
+        reactor.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Counter.java
----------------------------------------------------------------------
diff --git a/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Counter.java b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Counter.java
new file mode 100644
index 0000000..b05685a
--- /dev/null
+++ b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Counter.java
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+
+public class Counter extends BaseHandler {
+
+    private long startTime;
+
+    class CounterHandler extends BaseHandler {
+        private final int limit;
+        private int count;
+
+        CounterHandler(int limit) {
+            this.limit = limit;
+        }
+
+        @Override
+        public void onTimerTask(Event event) {
+            count += 1;
+            System.out.println(count);
+            if (count < limit) {
+                // A recurring task can be accomplished by just scheduling
+                // another event.
+                event.getReactor().schedule(250, this);
+            }
+        }
+    }
+
+    @Override
+    public void onReactorInit(Event event) {
+        startTime = System.currentTimeMillis();
+        System.out.println("Hello, World!");
+
+        // Note that unlike the previous scheduling example, we pass in
+        // a separate object for the handler. This means that the timer
+        // event we just scheduled will not be seen by the Counter
+        // implementation of BaseHandler as it is being handled by the
+        // CounterHandler instance we create.
+        event.getReactor().schedule(250, new CounterHandler(10));
+    }
+
+    @Override
+    public void onReactorFinal(Event event) {
+        long elapsedTime = System.currentTimeMillis() - startTime;
+        System.out.println("Goodbye, World! (after " + elapsedTime + " long milliseconds)");
+    }
+
+    public static void main(String[] args) throws IOException {
+        // In HelloWorld.java we said the reactor exits when there are no more
+        // events to process. While this is true, it's not actually complete.
+        // The reactor exits when there are no more events to process and no
+        // possibility of future events arising. For that reason the reactor
+        // will keep running until there are no more scheduled events and then
+        // exit.
+        Reactor reactor = Proton.reactor(new Counter());
+        reactor.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Delegates.java
----------------------------------------------------------------------
diff --git a/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Delegates.java b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Delegates.java
new file mode 100644
index 0000000..7b4e36f
--- /dev/null
+++ b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Delegates.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Handler;
+import org.apache.qpid.proton.reactor.Reactor;
+
+// Events know how to dispatch themselves to handlers. By combining
+// this with on_unhandled, you can provide a kind of inheritance
+/// between handlers using delegation.
+public class Delegates extends BaseHandler {
+
+    private final Handler[] handlers;
+
+    static class Hello extends BaseHandler {
+        @Override
+        public void onReactorInit(Event e) {
+            System.out.println("Hello, World!");
+        }
+    }
+
+    static class Goodbye extends BaseHandler {
+        @Override
+        public void onReactorFinal(Event e) {
+            System.out.println("Goodbye, World!");
+        }
+    }
+
+    public Delegates(Handler... handlers) {
+        this.handlers = handlers;
+    }
+
+    @Override
+    public void onUnhandled(Event event) {
+        for (Handler handler : handlers) {
+            event.dispatch(handler);
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+        Reactor reactor = Proton.reactor(new Delegates(new Hello(), new Goodbye()));
+        reactor.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Echo.java
----------------------------------------------------------------------
diff --git a/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Echo.java b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Echo.java
new file mode 100644
index 0000000..852bf8e
--- /dev/null
+++ b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Echo.java
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Pipe.SourceChannel;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+import org.apache.qpid.proton.reactor.Selectable;
+
+public class Echo extends BaseHandler {
+
+    private class EchoHandler extends BaseHandler {
+        @Override
+        public void onSelectableInit(Event event) {
+            Selectable selectable = event.getSelectable();
+            // We can configure a selectable with any SelectableChannel we want.
+            selectable.setChannel(channel);
+            // Ask to be notified when the channel is readable
+            selectable.setReading(true);
+            event.getReactor().update(selectable);
+        }
+
+        @Override
+        public void onSelectableReadable(Event event) {
+            Selectable selectable = event.getSelectable();
+
+            // The onSelectableReadable event tells us that there is data
+            // to be read, or the end of stream has been reached.
+            SourceChannel channel = (SourceChannel)selectable.getChannel();
+            ByteBuffer buffer = ByteBuffer.allocate(1024);
+            try {
+                while(true) {
+                    int amount = channel.read(buffer);
+                    if (amount < 0) {
+                        selectable.terminate();
+                        selectable.getReactor().update(selectable);
+                    }
+                    if (amount <= 0) break;
+                    System.out.write(buffer.array(), 0, buffer.position());
+                    buffer.clear();
+                }
+            } catch(IOException ioException) {
+                ioException.printStackTrace();
+                selectable.terminate();
+                selectable.getReactor().update(selectable);
+            }
+        }
+    }
+
+    private final SourceChannel channel;
+
+    private Echo(SourceChannel channel) {
+        this.channel = channel;
+    }
+
+    @Override
+    public void onReactorInit(Event event) {
+        // Every selectable is a possible source of future events. Our
+        // selectable stays alive until it reads the end of stream
+        // marker. This will keep the whole reactor running until we
+        // type Control-D.
+        System.out.println("Type whatever you want and then use Control-D to exit:");
+        Reactor reactor = event.getReactor();
+        Selectable selectable = reactor.selectable();
+        setHandler(selectable, new EchoHandler());
+        reactor.update(selectable);
+    }
+
+    public static void main(String[] args) throws IOException {
+        SourceChannel inChannel = EchoInputStreamWrapper.wrap(System.in);
+        Reactor reactor = Proton.reactor(new Echo(inChannel));
+        reactor.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/EchoInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/EchoInputStreamWrapper.java b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/EchoInputStreamWrapper.java
new file mode 100644
index 0000000..2e53d09
--- /dev/null
+++ b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/EchoInputStreamWrapper.java
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Pipe;
+import java.nio.channels.Pipe.SinkChannel;
+import java.nio.channels.Pipe.SourceChannel;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class EchoInputStreamWrapper extends Thread {
+
+    private final InputStream in;
+    private final SinkChannel out;
+    private final byte[] bufferBytes = new byte[1024];
+    private final ByteBuffer buffer = ByteBuffer.wrap(bufferBytes);
+    private final AtomicInteger idCounter = new AtomicInteger();
+
+    private EchoInputStreamWrapper(InputStream in, SinkChannel out) {
+        this.in = in;
+        this.out = out;
+        setName(getClass().getName() + "-" + idCounter.incrementAndGet());
+        setDaemon(true);
+    }
+
+    @Override
+    public void run() {
+        try {
+            while(true) {
+                int amount = in.read(bufferBytes);
+                if (amount < 0) break;
+                buffer.position(0);
+                buffer.limit(amount);
+                out.write(buffer);
+            }
+        } catch(IOException ioException) {
+            ioException.printStackTrace();
+        } finally {
+            try {
+                out.close();
+            } catch(IOException ioException) {
+                ioException.printStackTrace();
+            }
+        }
+    }
+
+    public static SourceChannel wrap(InputStream in) throws IOException {
+        Pipe pipe = Pipe.open();
+        new EchoInputStreamWrapper(in, pipe.sink()).start();
+        SourceChannel result = pipe.source();
+        result.configureBlocking(false);
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GlobalLogger.java
----------------------------------------------------------------------
diff --git a/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GlobalLogger.java b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GlobalLogger.java
new file mode 100644
index 0000000..ec56bd5
--- /dev/null
+++ b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GlobalLogger.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+
+// Not every event goes to the reactor's event handler. If we have a
+// separate handler for something like a scheduled task, then those
+// events aren't logged by the logger associated with the reactor's
+// handler. Sometimes this is useful if you don't want to see them, but
+// sometimes you want the global picture.
+public class GlobalLogger extends BaseHandler {
+
+    static class Logger extends BaseHandler {
+        @Override
+        public void onUnhandled(Event event) {
+            System.out.println("LOG: " + event);
+        }
+    }
+
+    static class Task extends BaseHandler {
+        @Override
+        public void onTimerTask(Event e) {
+            System.out.println("Mission accomplished!");
+        }
+    }
+
+    @Override
+    public void onReactorInit(Event event) {
+        System.out.println("Hello, World!");
+        event.getReactor().schedule(0, new Task());
+    }
+
+    @Override
+    public void onReactorFinal(Event e) {
+        System.out.println("Goodbye, World!");
+    }
+
+    public static void main(String[] args) throws IOException {
+        Reactor reactor = Proton.reactor(new GlobalLogger());
+
+        // In addition to having a regular handler, the reactor also has a
+        // global handler that sees every event. By adding the Logger to the
+        // global handler instead of the regular handler, we can log every
+        // single event that occurs in the system regardless of whether or not
+        // there are specific handlers associated with the objects that are the
+        // target of those events.
+        reactor.getGlobalHandler().add(new Logger());
+        reactor.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GoodbyeWorld.java
----------------------------------------------------------------------
diff --git a/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GoodbyeWorld.java b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GoodbyeWorld.java
new file mode 100644
index 0000000..6a69ba1
--- /dev/null
+++ b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/GoodbyeWorld.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+
+// So far the reactive hello-world doesn't look too different from a
+// regular old non-reactive hello-world. The onReactorInit method can
+// be used roughly as a 'main' method would. A program that only uses
+// that one event, however, isn't going to be very reactive. By using
+// other events, we can write a fully reactive program.
+public class GoodbyeWorld extends BaseHandler {
+
+    // As before we handle the reactor init event.
+    @Override
+    public void onReactorInit(Event event) {
+        System.out.println("Hello, World!");
+    }
+
+    // In addition to an initial event, the reactor also produces an
+    // event when it is about to exit. This may not behave much
+    // differently than just putting the goodbye print statement inside
+    // onReactorInit, but as we grow our program, this piece of it
+    // will always be what happens last, and will always happen
+    // regardless of what other paths the main logic of our program
+    // might take.
+    @Override
+    public void onReactorFinal(Event e) {
+        System.out.println("Goodbye, World!");;
+    }
+
+    public static void main(String[] args) throws IOException {
+        Reactor reactor = Proton.reactor(new GoodbyeWorld());
+        reactor.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java
----------------------------------------------------------------------
diff --git a/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java
new file mode 100644
index 0000000..39a36fb
--- /dev/null
+++ b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/HelloWorld.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+
+/*
+ * The proton reactor provides a general purpose event processing
+ * library for writing reactive programs. A reactive program is defined
+ * by a set of event handlers. An event handler is just any class or
+ * object that extends the Handler interface. For convenience, a class
+ * can extend BaseHandler and only handle the events that it cares to
+ * implement methods for.
+ */
+public class HelloWorld extends BaseHandler {
+
+    // The reactor init event is produced by the reactor itself when it
+    // starts.
+    @Override
+    public void onReactorInit(Event event) {
+        System.out.println("Hello, World!");
+    }
+
+    public static void main(String[] args) throws IOException {
+
+        // When you construct a reactor, you can give it a handler that
+        // is used, by default, to receive events generated by the reactor.
+        Reactor reactor = Proton.reactor(new HelloWorld());
+
+        // When you call run, the reactor will process events. The reactor init
+        // event is what kicks off everything else. When the reactor has no
+        // more events to process, it exits.
+        reactor.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/README.md
----------------------------------------------------------------------
diff --git a/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/README.md b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/README.md
new file mode 100644
index 0000000..261c917
--- /dev/null
+++ b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/README.md
@@ -0,0 +1,24 @@
+The examples in this directory provide a basic introduction to the
+proton reactor API and are best viewed in the order presented below.
+The examples contain comments that explain things in a tutorial-style
+manner.
+
+  - HelloWorld.java
+  - GoodbyeWorld.java
+
+  - Scheduling.java
+  - Counter.java
+  - CountRandomly.java
+
+  - Unhandled.java
+  - ReactorLogger.java
+  - GlobalLogger.java
+  - Delegates.java
+
+  - Handlers.java
+
+  - Echo.java
+  - Cat.java
+
+  - Send.java
+  - Recv.java

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/ReactorLogger.java
----------------------------------------------------------------------
diff --git a/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/ReactorLogger.java b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/ReactorLogger.java
new file mode 100644
index 0000000..31c7511
--- /dev/null
+++ b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/ReactorLogger.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+
+public class ReactorLogger extends BaseHandler {
+
+    public static class Logger extends BaseHandler {
+        @Override
+        public void onUnhandled(Event event) {
+            System.out.println("LOG: " + event);
+        }
+    }
+
+    @Override
+    public void onReactorInit(Event e) {
+        System.out.println("Hello, World!");
+    }
+
+    @Override
+    public void onReactorFinal(Event e) {
+        System.out.println("Goodbye, World!");
+    }
+
+    private static boolean loggingEnabled = false;
+
+    public static void main(String[] args) throws IOException {
+
+        // You can pass multiple handlers to a reactor when you construct it.
+        // Each of these handlers will see every event the reactor sees. By
+        // combining this with on_unhandled, you can log each event that goes
+        // to the reactor.
+        Reactor reactor = Proton.reactor(new ReactorLogger(), new Logger());
+        reactor.run();
+
+        // Note that if you wanted to add the logger later, you could also
+        // write the above as below. All arguments to the reactor are just
+        // added to the default handler for the reactor.
+        reactor = Proton.reactor(new ReactorLogger());
+        if (loggingEnabled)
+            reactor.getHandler().add(new Logger());
+        reactor.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Recv.java
----------------------------------------------------------------------
diff --git a/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Recv.java b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Recv.java
new file mode 100644
index 0000000..96a348a
--- /dev/null
+++ b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Recv.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.reactor.FlowController;
+import org.apache.qpid.proton.reactor.Handshaker;
+import org.apache.qpid.proton.reactor.Reactor;
+
+public class Recv extends BaseHandler {
+
+    private Recv() {
+        add(new Handshaker());
+        add(new FlowController());
+    }
+
+    @Override
+    public void onReactorInit(Event event) {
+        try {
+            // Create an amqp acceptor.
+            event.getReactor().acceptor("0.0.0.0", 5672);
+
+            // There is an optional third argument to the Reactor.acceptor
+            // call. Using it, we could supply a handler here that would
+            // become the handler for all accepted connections. If we omit
+            // it, the reactor simply inherits all the connection events.
+        } catch(IOException ioException) {
+            ioException.printStackTrace();
+        }
+    }
+
+    @Override
+    public void onDelivery(Event event) {
+        Receiver recv = (Receiver)event.getLink();
+        Delivery delivery = recv.current();
+        if (delivery.isReadable() && !delivery.isPartial()) {
+            int size = delivery.pending();
+            byte[] buffer = new byte[size];
+            int read = recv.recv(buffer, 0, buffer.length);
+            recv.advance();
+
+            Message msg = Proton.message();
+            msg.decode(buffer, 0, read);
+            System.out.println(((AmqpValue)msg.getBody()).getValue());
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+        Reactor r = Proton.reactor(new Recv());
+        r.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Scheduling.java
----------------------------------------------------------------------
diff --git a/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Scheduling.java b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Scheduling.java
new file mode 100644
index 0000000..3aed27a
--- /dev/null
+++ b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Scheduling.java
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+import org.apache.qpid.proton.reactor.Task;
+
+public class Scheduling extends BaseHandler {
+
+    private long startTime;
+
+    @Override
+    public void onReactorInit(Event event) {
+        startTime = System.currentTimeMillis();
+        System.out.println("Hello, World!");
+
+        // We can schedule a task event for some point in the future.
+        // This will cause the reactor to stick around until it has a
+        // chance to process the event.
+
+        // The first argument is the delay. The second argument is the
+        // handler for the event. We are just using self for now, but
+        // we could pass in another object if we wanted.
+        Task task = event.getReactor().schedule(1000, this);
+
+        // We can ignore the task if we want to, but we can also use it
+        // to pass stuff to the handler.
+        task.attachments().set("key", String.class, "Yay");
+    }
+
+    @Override
+    public void onTimerTask(Event event) {
+        Task task = event.getTask();
+        System.out.println(task.attachments().get("key", String.class) + " my task is complete!");
+    }
+
+    @Override
+    public void onReactorFinal(Event e) {
+        long elapsedTime = System.currentTimeMillis() - startTime;
+        System.out.println("Goodbye, World! (after " + elapsedTime + " long milliseconds)");
+    }
+
+    public static void main(String[] args) throws IOException {
+        Reactor reactor = Proton.reactor(new Scheduling());
+        reactor.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
----------------------------------------------------------------------
diff --git a/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
new file mode 100644
index 0000000..5978c45
--- /dev/null
+++ b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Send.java
@@ -0,0 +1,149 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.reactor.Handshaker;
+import org.apache.qpid.proton.reactor.Reactor;
+
+// This is a send in terms of low level AMQP events.
+public class Send extends BaseHandler {
+
+    private class SendHandler extends BaseHandler {
+
+        private final Message message;
+        private int nextTag = 0;
+
+        private SendHandler(Message message) {
+            this.message = message;
+
+            // Add a child handler that performs some default handshaking
+            // behaviour.
+            add(new Handshaker());
+        }
+
+        @Override
+        public void onConnectionInit(Event event) {
+            Connection conn = event.getConnection();
+
+            // Every session or link could have their own handler(s) if we
+            // wanted simply by adding the handler to the given session
+            // or link
+            Session ssn = conn.session();
+
+            // If a link doesn't have an event handler, the events go to
+            // its parent session. If the session doesn't have a handler
+            // the events go to its parent connection. If the connection
+            // doesn't have a handler, the events go to the reactor.
+            Sender snd = ssn.sender("sender");
+            conn.open();
+            ssn.open();
+            snd.open();
+        }
+
+        @Override
+        public void onLinkFlow(Event event) {
+            Sender snd = (Sender)event.getLink();
+            if (snd.getCredit() > 0) {
+                byte[] msgData = new byte[1024];
+                int length;
+                while(true) {
+                    try {
+                        length = message.encode(msgData, 0, msgData.length);
+                        break;
+                    } catch(BufferOverflowException e) {
+                        msgData = new byte[msgData.length * 2];
+                    }
+                }
+                byte[] tag = String.valueOf(nextTag++).getBytes();
+                Delivery dlv = snd.delivery(tag);
+                snd.send(msgData, 0, length);
+                dlv.settle();
+                snd.advance();
+                snd.close();
+                snd.getSession().close();
+                snd.getSession().getConnection().close();
+            }
+        }
+
+        @Override
+        public void onTransportError(Event event) {
+            ErrorCondition condition = event.getTransport().getCondition();
+            if (condition != null) {
+                System.err.println("Error: " + condition.getDescription());
+            } else {
+                System.err.println("Error (no description returned).");
+            }
+        }
+    }
+
+    private final String host;
+    private final int port;
+    private final Message message;
+
+    private Send(String host, int port, String content) {
+        this.host = host;
+        this.port = port;
+        message = Proton.message();
+        message.setBody(new AmqpValue(content));
+    }
+
+    @Override
+    public void onReactorInit(Event event) {
+        // You can use the connection method to create AMQP connections.
+
+        // This connection's handler is the SendHandler object. All the events
+        // for this connection will go to the SendHandler object instead of
+        // going to the reactor. If you were to omit the SendHandler object,
+        // all the events would go to the reactor.
+        event.getReactor().connectionToHost(host, port, new SendHandler(message));
+    }
+
+    public static void main(String[] args) throws IOException {
+        int port = 5672;
+        String host = "localhost";
+        if (args.length > 0) {
+            String[] parts = args[0].split(":", 2);
+            host = parts[0];
+            if (parts.length > 1) {
+                port = Integer.parseInt(parts[1]);
+            }
+        }
+        String content = args.length > 1 ? args[1] : "Hello World!";
+
+        Reactor r = Proton.reactor(new Send(host, port, content));
+        r.run();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Unhandled.java
----------------------------------------------------------------------
diff --git a/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Unhandled.java b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Unhandled.java
new file mode 100644
index 0000000..a3cc200
--- /dev/null
+++ b/examples/reactor/src/main/java/org/apache/qpid/proton/example/reactor/Unhandled.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.proton.example.reactor;
+
+import java.io.IOException;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.engine.BaseHandler;
+import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.reactor.Reactor;
+
+public class Unhandled extends BaseHandler {
+
+    // If an event occurs and its handler doesn't have an on_<event>
+    // method, the reactor will attempt to call the on_unhandled method
+    // if it exists. This can be useful not only for debugging, but for
+    // logging and for delegating/inheritance.
+    @Override
+    public void onUnhandled(Event event) {
+        System.out.println(event);
+    }
+
+    public static void main(String[] args) throws IOException {
+        Reactor reactor = Proton.reactor(new Unhandled());
+        reactor.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/9c2af48e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7f2b443..1ea5a3c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -136,8 +136,8 @@
   <modules>
     <module>proton-j</module>
     <module>tests</module>
-    <module>examples/engine/java</module>
-    <module>examples/java/reactor</module>
+    <module>examples/engine</module>
+    <module>examples/reactor</module>
   </modules>
 
   <url>http://qpid.apache.org/proton</url>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message