qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject [08/30] qpid-proton git commit: PROTON-1385: remove proton-j from the existing repo, it now has its own repo at: https://git-wip-us.apache.org/repos/asf/qpid-proton-j.git
Date Mon, 09 Jan 2017 15:07:21 GMT
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
deleted file mode 100644
index 146ee09..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor;
-
-/**
- * Interface used to identify classes that can be a child of a reactor.
- */
-public interface ReactorChild {
-
-    /** Frees any resources associated with this child. */
-    void free();
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
deleted file mode 100644
index e91a0ee..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor;
-
-import java.nio.channels.SelectableChannel;
-
-import org.apache.qpid.proton.engine.Collector;
-import org.apache.qpid.proton.engine.Extendable;
-
-/**
- * An entity that can be multiplexed using a {@link Selector}.
- * <p>
- * Every selectable is associated with exactly one {@link SelectableChannel}.
- * Selectables may be interested in three kinds of events: read events, write
- * events, and timer events. A selectable will express its interest in these
- * events through the {@link #isReading()}, {@link #isWriting()}, and
- * {@link #getDeadline()} methods.
- * <p>
- * When a read, write, or timer event occurs, the selectable must be notified by
- * calling {@link #readable()}, {@link #writeable()}, or {@link #expired()} as
- * appropriate.
- *
- * Once a selectable reaches a terminal state (see {@link #isTerminal()}, it
- * will never be interested in events of any kind. When this occurs it should be
- * removed from the Selector and discarded using {@link #free()}.
- */
-public interface Selectable extends ReactorChild, Extendable {
-
-    /**
-     * A callback that can be passed to the various "on" methods of the
-     * selectable - to allow code to be run when the selectable becomes ready
-     * for the associated operation.
-     */
-    interface Callback {
-        void run(Selectable selectable);
-    }
-
-    /**
-     * @return <code>true</code> if the selectable is interested in receiving
-     *         notification (via the {@link #readable()} method that indicate
-     *         that the associated {@link SelectableChannel} has data ready
-     *         to be read from it.
-     */
-    boolean isReading();
-
-    /**
-     * @return <code>true</code> if the selectable is interested in receiving
-     *         notifications (via the {@link #writeable()} method that indicate
-     *         that the associated {@link SelectableChannel} is ready to be
-     *         written to.
-     */
-    boolean isWriting();
-
-    /**
-     * @return a deadline after which this selectable can expect to receive
-     *         a notification (via the {@link #expired()} method that indicates
-     *         that the deadline has past.  The deadline is expressed in the
-     *         same format as {@link System#currentTimeMillis()}.  Returning
-     *         a deadline of zero (or a negative number) indicates that the
-     *         selectable does not wish to be notified of expiry.
-     */
-    long getDeadline();
-
-    /**
-     * Sets the value that will be returned by {@link #isReading()}.
-     * @param reading
-     */
-    void setReading(boolean reading);
-
-    /**
-     * Sets the value that will be returned by {@link #isWriting()}.
-     * @param writing
-     */
-    void setWriting(boolean writing);
-
-    /**
-     * Sets the value that will be returned by {@link #getDeadline()}.
-     * @param deadline
-     */
-    void setDeadline(long deadline);
-
-    /**
-     * Registers a callback that will be run when the selectable becomes ready
-     * for reading.
-     * @param runnable the callback to register.  Any previously registered
-     *                 callback will be replaced.
-     */
-    void onReadable(Callback runnable);
-
-    /**
-     * Registers a callback that will be run when the selectable becomes ready
-     * for writing.
-     * @param runnable the callback to register.  Any previously registered
-     *                 callback will be replaced.
-     */
-    void onWritable(Callback runnable);
-
-    /**
-     * Registers a callback that will be run when the selectable expires.
-     * @param runnable the callback to register.  Any previously registered
-     *                 callback will be replaced.
-     */
-    void onExpired(Callback runnable);
-
-    /**
-     * Registers a callback that will be run when the selectable is notified of
-     * an error.
-     * @param runnable the callback to register.  Any previously registered
-     *                 callback will be replaced.
-     */
-    void onError(Callback runnable);
-
-    /**
-     * Registers a callback that will be run when the selectable is notified
-     * that it has been released.
-     * @param runnable the callback to register.  Any previously registered
-     *                 callback will be replaced.
-     */
-    void onRelease(Callback runnable);
-
-    /**
-     * Registers a callback that will be run when the selectable is notified
-     * that it has been free'd.
-     * @param runnable the callback to register.  Any previously registered
-     *                 callback will be replaced.
-     */
-    void onFree(Callback runnable);
-
-    /**
-     * Notify the selectable that the underlying {@link SelectableChannel} is
-     * ready for a read operation.
-     */
-    void readable();
-
-    /**
-     * Notify the selectable that the underlying {@link SelectableChannel} is
-     * ready for a write operation.
-     */
-    void writeable();
-
-    /** Notify the selectable that it has expired. */
-    void expired();
-
-    /** Notify the selectable that an error has occurred. */
-    void error();
-
-    /** Notify the selectable that it has been released. */
-    void release();
-
-    /** Notify the selectable that it has been free'd. */
-    @Override
-    void free();
-
-    /**
-     * Associates a {@link SelectableChannel} with this selector.
-     * @param channel
-     */
-    void setChannel(SelectableChannel channel); // This is the equivalent to pn_selectable_set_fd(...)
-
-    /** @return the {@link SelectableChannel} associated with this selector. */
-    SelectableChannel getChannel(); // This is the equivalent to pn_selectable_get_fd(...)
-
-    /**
-     * Check if a selectable is registered.  This can be used for tracking
-     * whether a given selectable has been registerd with an external event
-     * loop.
-     * <p>
-     * <em>Note:</em> the reactor code, currently, does not use this flag.
-     * @return <code>true</code>if the selectable is registered.
-     */
-    boolean isRegistered();  // XXX: unused in C reactor code
-
-    /**
-     * Set the registered flag for a selectable.
-     * <p>
-     * <em>Note:</em> the reactor code, currently, does not use this flag.
-     * @param registered the value returned by {@link #isRegistered()}
-     */
-    void setRegistered(boolean registered); // XXX: unused in C reactor code
-
-    /**
-     * Configure a selectable with a set of callbacks that emit readable,
-     * writable, and expired events into the supplied collector.
-     * @param collector
-     */
-    void setCollector(final Collector collector);
-
-    /** @return the reactor to which this selectable is a child. */
-    Reactor getReactor() ;
-
-    /**
-     * Terminates the selectable.  Once a selectable reaches a terminal state
-     * it will never be interested in events of any kind.
-     */
-    public void terminate() ;
-
-    /**
-     * @return <code>true</code> if the selectable has reached a terminal state.
-     */
-    boolean isTerminal();
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
deleted file mode 100644
index 4228a8d..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-/**
- * A multiplexor of instances of {@link Selectable}.
- * <p>
- * Many instances of <code>Selectable</code> can be added to a selector, and
- * the {@link #select(long)} method used to block the calling thread until
- * one of the <code>Selectables</code> becomes read to perform an operation.
- * <p>
- * This class is not thread safe, so only one thread should be manipulating the
- * contents of the selector, or running the {@link #select(long)} method at
- * any given time.
- */
-public interface Selector {
-
-    /**
-     * Adds a selectable to the selector.
-     * @param selectable
-     * @throws IOException
-     */
-    void add(Selectable selectable) throws IOException;
-
-    /**
-     * Updates the selector to reflect any changes interest by the specified
-     * selectable.  This is achieved by calling the
-     * {@link Selectable#isReading()} and {@link Selectable#isWriting()}
-     * methods.
-     * @param selectable
-     */
-    void update(Selectable selectable);
-
-    /**
-     * Removes a selectable from the selector.
-     * @param selectable
-     */
-    void remove(Selectable selectable);
-
-    /**
-     * Waits for the specified timeout period for one or more selectables to
-     * become ready for an operation.  Selectables that become ready are
-     * returned by the {@link #readable()}, {@link #writeable()},
-     * {@link #expired()}, or {@link #error()} methods.
-     *
-     * @param timeout the maximum number of milliseconds to block the calling
-     *                thread waiting for a selectable to become ready for an
-     *                operation.  The value zero is interpreted as check but
-     *                don't block.
-     * @throws IOException
-     */
-    void select(long timeout) throws IOException;
-
-    /**
-     * @return the selectables that have become readable since the last call
-     *         to {@link #select(long)}.  Calling <code>select</code> clears
-     *         any previous values in this set before adding new values
-     *         corresponding to those selectables that have become readable.
-     */
-    Iterator<Selectable> readable();
-
-    /**
-     * @return the selectables that have become writable since the last call
-     *         to {@link #select(long)}.  Calling <code>select</code> clears
-     *         any previous values in this set before adding new values
-     *         corresponding to those selectables that have become writable.
-     */
-    Iterator<Selectable> writeable();
-
-    /**
-     * @return the selectables that have expired since the last call
-     *         to {@link #select(long)}.  Calling <code>select</code> clears
-     *         any previous values in this set before adding new values
-     *         corresponding to those selectables that have now expired.
-     */
-    Iterator<Selectable> expired();
-
-    /**
-     * @return the selectables that have encountered an error since the last
-     *         call to {@link #select(long)}.  Calling <code>select</code>
-     *         clears any previous values in this set before adding new values
-     *         corresponding to those selectables that have encountered an
-     *         error.
-     */
-    Iterator<Selectable> error() ;
-
-    /** Frees the resources used by this selector. */
-    void free();
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
deleted file mode 100644
index 7fb5964..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor;
-
-import org.apache.qpid.proton.engine.Event.Type;
-import org.apache.qpid.proton.engine.Extendable;
-import org.apache.qpid.proton.engine.Handler;
-
-/**
- * Represents work scheduled with a {@link Reactor} for execution at
- * some point in the future.
- * <p>
- * Tasks are created using the {@link Reactor#schedule(int, Handler)}
- * method.
- */
-public interface Task extends Extendable {
-
-    /**
-     * @return the deadline at which the handler associated with the scheduled
-     *         task should be delivered a {@link Type#TIMER_TASK} event.
-     */
-    long deadline();
-
-    /** @return the reactor that created this task. */
-    Reactor getReactor();
-
-    /**
-     * Cancel the execution of this task. No-op if invoked after the task was already executed.
-     */
-    void cancel();
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
deleted file mode 100644
index c5abbd8..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor.impl;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.engine.BaseHandler;
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.Handler;
-import org.apache.qpid.proton.engine.Record;
-import org.apache.qpid.proton.engine.Sasl;
-import org.apache.qpid.proton.engine.Sasl.SaslOutcome;
-import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.engine.impl.RecordImpl;
-import org.apache.qpid.proton.reactor.Acceptor;
-import org.apache.qpid.proton.reactor.Reactor;
-import org.apache.qpid.proton.reactor.impl.ReactorImpl;
-import org.apache.qpid.proton.reactor.Selectable;
-import org.apache.qpid.proton.reactor.Selectable.Callback;
-import org.apache.qpid.proton.messenger.impl.Address;
-
-@SuppressWarnings("deprecation")
-public class AcceptorImpl implements Acceptor {
-
-    private Record attachments = new RecordImpl();
-    private final SelectableImpl sel;
-    protected static final String CONNECTION_ACCEPTOR_KEY = "pn_reactor_connection_acceptor";
-
-    private class AcceptorReadable implements Callback {
-        @Override
-        public void run(Selectable selectable) {
-            Reactor reactor = selectable.getReactor();
-            try {
-                SocketChannel socketChannel = ((ServerSocketChannel)selectable.getChannel()).accept();
-                if (socketChannel == null) {
-                    throw new ReactorInternalException("Selectable readable, but no socket to accept");
-                }
-                Handler handler = BaseHandler.getHandler(AcceptorImpl.this);
-                if (handler == null) {
-                    handler = reactor.getHandler();
-                }
-                Connection conn = reactor.connection(handler);
-                Record conn_recs = conn.attachments();
-                conn_recs.set(CONNECTION_ACCEPTOR_KEY, Acceptor.class, AcceptorImpl.this);
-                InetSocketAddress peerAddr = (InetSocketAddress)socketChannel.getRemoteAddress();
-                if (peerAddr != null) {
-                    Address addr = new Address();
-                    addr.setHost(peerAddr.getHostString());
-                    addr.setPort(Integer.toString(peerAddr.getPort()));
-                    conn_recs.set(ReactorImpl.CONNECTION_PEER_ADDRESS_KEY, Address.class, addr);
-                }
-                Transport trans = Proton.transport();
-                Sasl sasl = trans.sasl();
-                sasl.server();
-                sasl.setMechanisms("ANONYMOUS");
-                sasl.done(SaslOutcome.PN_SASL_OK);
-                trans.bind(conn);
-                IOHandler.selectableTransport(reactor, socketChannel.socket(), trans);
-            } catch(IOException ioException) {
-                sel.error();
-            }
-        }
-    }
-
-    private static class AcceptorFree implements Callback {
-        @Override
-        public void run(Selectable selectable) {
-            try {
-                if (selectable.getChannel() != null) {
-                    selectable.getChannel().close();
-                }
-            } catch(IOException ioException) {
-                // Ignore - as we can't make the channel any more closed...
-            }
-        }
-    }
-
-    protected AcceptorImpl(Reactor reactor, String host, int port, Handler handler) throws IOException {
-        ServerSocketChannel ssc = ((ReactorImpl)reactor).getIO().serverSocketChannel();
-        ssc.bind(new InetSocketAddress(host, port));
-        sel = ((ReactorImpl)reactor).selectable(this);
-        sel.setChannel(ssc);
-        sel.onReadable(new AcceptorReadable());
-        sel.onFree(new AcceptorFree());
-        sel.setReactor(reactor);
-        BaseHandler.setHandler(this, handler);
-        sel.setReading(true);
-        reactor.update(sel);
-    }
-
-    @Override
-    public void close() {
-        if (!sel.isTerminal()) {
-            Reactor reactor = sel.getReactor();
-            try {
-                sel.getChannel().close();
-            } catch(IOException ioException) {
-                // Ignore.
-            }
-            sel.setChannel(null);
-            sel.terminate();
-            reactor.update(sel);
-        }
-    }
-
-    // Used for unit tests, where acceptor is bound to an ephemeral port
-    public int getPortNumber() throws IOException {
-        ServerSocketChannel ssc = (ServerSocketChannel)sel.getChannel();
-        return ((InetSocketAddress)ssc.getLocalAddress()).getPort();
-    }
-
-    @Override
-    public void free() {
-        sel.free();
-    }
-
-    @Override
-    public Record attachments() {
-        return attachments;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IO.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IO.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IO.java
deleted file mode 100644
index 1028ae8..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IO.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor.impl;
-
-import java.io.IOException;
-import java.nio.channels.Pipe;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-
-// Java equivalent to pn_io.
-// This is, currently, in the reactor.impl package because it is not
-// used elsewhere in the proton-j codebase.  Instead it is present to
-// facilitate mocking of various Java I/O related resources so that
-// the unit tests can check for leaks.
-public interface IO {
-
-    Pipe pipe() throws IOException;
-
-    Selector selector() throws IOException;
-
-    ServerSocketChannel serverSocketChannel() throws IOException;
-
-    SocketChannel socketChannel() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
deleted file mode 100644
index 2dd7e1a..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor.impl;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.channels.Channel;
-import java.nio.channels.SocketChannel;
-import java.util.Iterator;
-
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.BaseHandler;
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.engine.Sasl;
-import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.engine.impl.TransportImpl;
-import org.apache.qpid.proton.engine.Record;
-import org.apache.qpid.proton.reactor.Reactor;
-import org.apache.qpid.proton.reactor.Selectable;
-import org.apache.qpid.proton.reactor.Selectable.Callback;
-import org.apache.qpid.proton.reactor.Selector;
-import org.apache.qpid.proton.reactor.Acceptor;
-import org.apache.qpid.proton.reactor.impl.AcceptorImpl;
-import org.apache.qpid.proton.messenger.impl.Address;
-
-@SuppressWarnings("deprecation")
-public class IOHandler extends BaseHandler {
-
-    // pni_handle_quiesced from connection.c
-    private void handleQuiesced(Reactor reactor, Selector selector) throws IOException {
-        // check if we are still quiesced, other handlers of
-        // PN_REACTOR_QUIESCED could have produced more events to process
-        if (!reactor.quiesced()) return;
-        selector.select(reactor.getTimeout());
-        reactor.mark();
-        Iterator<Selectable> selectables = selector.readable();
-        while(selectables.hasNext()) {
-            selectables.next().readable();
-        }
-        selectables = selector.writeable();
-        while(selectables.hasNext()) {
-            selectables.next().writeable();
-        }
-        selectables = selector.expired();
-        while(selectables.hasNext()) {
-            selectables.next().expired();
-        }
-        selectables = selector.error();
-        while(selectables.hasNext()) {
-            selectables.next().error();
-        }
-        reactor.yield();
-    }
-
-    // pni_handle_open(...) from connection.c
-    private void handleOpen(Reactor reactor, Event event) {
-        Connection connection = event.getConnection();
-        if (connection.getRemoteState() != EndpointState.UNINITIALIZED) {
-            return;
-        }
-        // Outgoing Reactor connections set the virtual host automatically using the
-        // following rules:
-        String vhost = connection.getHostname();
-        if (vhost == null) {
-            // setHostname never called, use the host from the connection's
-            // socket address as the default virtual host:
-            String conAddr = reactor.getConnectionAddress(connection);
-            if (conAddr != null) {
-                Address addr = new Address(conAddr);
-                connection.setHostname(addr.getHost());
-            }
-        } else if (vhost.isEmpty()) {
-            // setHostname called explictly with a null string. This allows
-            // the application to completely avoid sending a virtual host
-            // name
-            connection.setHostname(null);
-        } else {
-            // setHostname set by application - use it.
-        }
-        Transport transport = Proton.transport();
-        Sasl sasl = transport.sasl();
-        sasl.client();
-        sasl.setMechanisms("ANONYMOUS");
-        transport.bind(connection);
-    }
-
-    // pni_handle_bound(...) from connection.c
-    // If this connection is an outgoing connection - not an incoming
-    // connection created by the Acceptor - create a socket connection to
-    // the peer address.
-    private void handleBound(Reactor reactor, Event event) {
-        Connection connection = event.getConnection();
-        Record conn_recs = connection.attachments();
-        if (conn_recs.get(AcceptorImpl.CONNECTION_ACCEPTOR_KEY, Acceptor.class) != null) {
-            // Connection was created via the Acceptor, so the socket already
-            // exists
-            return;
-        }
-        String url = reactor.getConnectionAddress(connection);
-        String hostname = connection.getHostname();
-        int port = 5672;
-
-        if (url != null) {
-            Address address = new Address(url);
-            hostname = address.getHost();
-            try {
-                port = Integer.parseInt(address.getImpliedPort());
-            } catch(NumberFormatException nfe) {
-                throw new IllegalArgumentException("Not a valid host: " + url, nfe);
-            }
-        } else if (hostname != null && !hostname.equals("")) {
-            // Backward compatibility with old code that illegally overloaded
-            // the connection's hostname
-            int colonIndex = hostname.indexOf(':');
-            if (colonIndex >= 0) {
-                try {
-                    port = Integer.parseInt(hostname.substring(colonIndex+1));
-                } catch(NumberFormatException nfe) {
-                    throw new IllegalArgumentException("Not a valid host: " + hostname, nfe);
-                }
-                hostname = hostname.substring(0, colonIndex);
-            }
-        } else {
-            throw new IllegalStateException("No address provided for Connection");
-        }
-
-        Transport transport = event.getConnection().getTransport();
-        Socket socket = null;   // In this case, 'null' is the proton-j equivalent of PN_INVALID_SOCKET
-        try {
-            SocketChannel socketChannel = ((ReactorImpl)reactor).getIO().socketChannel();
-            socketChannel.configureBlocking(false);
-            socketChannel.connect(new InetSocketAddress(hostname, port));
-            socket = socketChannel.socket();
-        } catch(Exception exception) {
-            ErrorCondition condition = new ErrorCondition();
-            condition.setCondition(Symbol.getSymbol("proton:io"));
-            condition.setDescription(exception.getMessage());
-            transport.setCondition(condition);
-            transport.close_tail();
-            transport.close_head();
-            transport.pop(Math.max(0, transport.pending())); // Force generation of TRANSPORT_HEAD_CLOSE (not in C code)
-        }
-        selectableTransport(reactor, socket, transport);
-    }
-
-    // pni_connection_capacity from connection.c
-    private static int capacity(Selectable selectable) {
-        Transport transport = ((SelectableImpl)selectable).getTransport();
-        int capacity = transport.capacity();
-        if (capacity < 0) {
-            if (transport.isClosed()) {
-                selectable.terminate();
-            }
-        }
-        return capacity;
-    }
-
-    // pni_connection_pending from connection.c
-    private static int pending(Selectable selectable) {
-        Transport transport = ((SelectableImpl)selectable).getTransport();
-        int pending = transport.pending();
-        if (pending < 0) {
-            if (transport.isClosed()) {
-                selectable.terminate();
-            }
-        }
-        return pending;
-    }
-
-    // pni_connection_deadline from connection.c
-    private static long deadline(SelectableImpl selectable) {
-        Reactor reactor = selectable.getReactor();
-        Transport transport = selectable.getTransport();
-        long deadline = transport.tick(reactor.now());
-        return deadline;
-    }
-
-    // pni_connection_update from connection.c
-    private static void update(Selectable selectable) {
-        SelectableImpl selectableImpl = (SelectableImpl)selectable;
-        int c = capacity(selectableImpl);
-        int p = pending(selectableImpl);
-        selectable.setReading(c > 0);
-        selectable.setWriting(p > 0);
-        selectable.setDeadline(deadline(selectableImpl));
-    }
-
-    // pni_connection_readable from connection.c
-    private static Callback connectionReadable = new Callback() {
-        @Override
-        public void run(Selectable selectable) {
-            Reactor reactor = selectable.getReactor();
-            Transport transport = ((SelectableImpl)selectable).getTransport();
-            int capacity = transport.capacity();
-            if (capacity > 0) {
-                SocketChannel socketChannel = (SocketChannel)selectable.getChannel();
-                try {
-                    int n = socketChannel.read(transport.tail());
-                    if (n == -1) {
-                        transport.close_tail();
-                    } else {
-                        transport.process();
-                    }
-                } catch (IOException e) {
-                    ErrorCondition condition = new ErrorCondition();
-                    condition.setCondition(Symbol.getSymbol("proton:io"));
-                    condition.setDescription(e.getMessage());
-                    transport.setCondition(condition);
-                    transport.close_tail();
-                }
-            }
-            // (Comment from C code:) occasionally transport events aren't
-            // generated when expected, so the following hack ensures we
-            // always update the selector
-            update(selectable);
-            reactor.update(selectable);
-        }
-    };
-
-    // pni_connection_writable from connection.c
-    private static Callback connectionWritable = new Callback() {
-        @Override
-        public void run(Selectable selectable) {
-            Reactor reactor = selectable.getReactor();
-            Transport transport = ((SelectableImpl)selectable).getTransport();
-            int pending = transport.pending();
-            if (pending > 0) {
-                SocketChannel channel = (SocketChannel)selectable.getChannel();
-                try {
-                    int n = channel.write(transport.head());
-                    if (n < 0) {
-                        transport.close_head();
-                    } else {
-                        transport.pop(n);
-                    }
-                } catch(IOException ioException) {
-                    ErrorCondition condition = new ErrorCondition();
-                    condition.setCondition(Symbol.getSymbol("proton:io"));
-                    condition.setDescription(ioException.getMessage());
-                    transport.setCondition(condition);
-                    transport.close_head();
-                }
-            }
-
-            int newPending = transport.pending();
-            if (newPending != pending) {
-                update(selectable);
-                reactor.update(selectable);
-            }
-        }
-    };
-
-    // pni_connection_error from connection.c
-    private static Callback connectionError = new Callback() {
-        @Override
-        public void run(Selectable selectable) {
-            Reactor reactor = selectable.getReactor();
-            selectable.terminate();
-            reactor.update(selectable);
-        }
-    };
-
-    // pni_connection_expired from connection.c
-    private static Callback connectionExpired = new Callback() {
-        @Override
-        public void run(Selectable selectable) {
-            Reactor reactor = selectable.getReactor();
-            Transport transport = ((SelectableImpl)selectable).getTransport();
-            long deadline = transport.tick(reactor.now());
-            selectable.setDeadline(deadline);
-            int c = capacity(selectable);
-            int p = pending(selectable);
-            selectable.setReading(c > 0);
-            selectable.setWriting(p > 0);
-            reactor.update(selectable);
-        }
-    };
-
-    private static Callback connectionFree = new Callback() {
-        @Override
-        public void run(Selectable selectable) {
-            Channel channel = selectable.getChannel();
-            if (channel != null) {
-                try {
-                    channel.close();
-                } catch(IOException ioException) {
-                    // Ignore
-                }
-            }
-        }
-    };
-
-    // pn_reactor_selectable_transport
-    // Note the socket argument can, validly be 'null' this is the equivalent of proton-c's PN_INVALID_SOCKET
-    protected static Selectable selectableTransport(Reactor reactor, Socket socket, Transport transport) {
-        Selectable selectable = reactor.selectable();
-        selectable.setChannel(socket != null ? socket.getChannel() : null);
-        selectable.onReadable(connectionReadable);
-        selectable.onWritable(connectionWritable);
-        selectable.onError(connectionError);
-        selectable.onExpired(connectionExpired);
-        selectable.onFree(connectionFree);
-        ((SelectableImpl)selectable).setTransport(transport);
-        ((TransportImpl)transport).setSelectable(selectable);
-        ((TransportImpl)transport).setReactor(reactor);
-        update(selectable);
-        reactor.update(selectable);
-        return selectable;
-    }
-
-    private void handleTransport(Reactor reactor, Event event) {
-        TransportImpl transport = (TransportImpl)event.getTransport();
-        Selectable selectable = transport.getSelectable();
-        if (selectable != null && !selectable.isTerminal()) {
-            update(selectable);
-            reactor.update(selectable);
-        }
-    }
-
-    @Override
-    public void onUnhandled(Event event) {
-        try {
-            ReactorImpl reactor = (ReactorImpl)event.getReactor();
-            Selector selector = reactor.getSelector();
-            if (selector == null) {
-                selector = new SelectorImpl(reactor.getIO());
-                reactor.setSelector(selector);
-            }
-
-            Selectable selectable;
-            switch(event.getType()) {
-            case SELECTABLE_INIT:
-                selectable = event.getSelectable();
-                selector.add(selectable);
-                break;
-            case SELECTABLE_UPDATED:
-                selectable = event.getSelectable();
-                selector.update(selectable);
-                break;
-            case SELECTABLE_FINAL:
-                selectable = event.getSelectable();
-                selector.remove(selectable);
-                selectable.release();
-                break;
-            case CONNECTION_LOCAL_OPEN:
-                handleOpen(reactor, event);
-                break;
-            case CONNECTION_BOUND:
-                handleBound(reactor, event);
-                break;
-            case TRANSPORT:
-                handleTransport(reactor, event);
-                break;
-            case TRANSPORT_CLOSED:
-                event.getTransport().unbind();
-                break;
-            case REACTOR_QUIESCED:
-                handleQuiesced(reactor, selector);
-                break;
-            default:
-                break;
-            }
-        } catch(IOException ioException) {
-            // XXX: Might not be the right exception type, but at least the exception isn't being swallowed
-            throw new ReactorInternalException(ioException);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOImpl.java
deleted file mode 100644
index 6376b16..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOImpl.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor.impl;
-
-import java.io.IOException;
-import java.nio.channels.Pipe;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-
-public class IOImpl implements IO {
-
-    @Override
-    public Pipe pipe() throws IOException {
-        return Pipe.open();
-    }
-
-    @Override
-    public Selector selector() throws IOException {
-        return Selector.open();
-    }
-
-    @Override
-    public ServerSocketChannel serverSocketChannel() throws IOException {
-        return ServerSocketChannel.open();
-    }
-
-    @Override
-    public SocketChannel socketChannel() throws IOException {
-        return SocketChannel.open();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
deleted file mode 100644
index 30c8df9..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java
+++ /dev/null
@@ -1,485 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor.impl;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.Pipe;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.qpid.proton.Proton;
-import org.apache.qpid.proton.engine.BaseHandler;
-import org.apache.qpid.proton.engine.Collector;
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.engine.Event.Type;
-import org.apache.qpid.proton.engine.EventType;
-import org.apache.qpid.proton.engine.Extendable;
-import org.apache.qpid.proton.engine.ExtendableAccessor;
-import org.apache.qpid.proton.engine.Handler;
-import org.apache.qpid.proton.engine.HandlerException;
-import org.apache.qpid.proton.engine.Record;
-import org.apache.qpid.proton.engine.impl.CollectorImpl;
-import org.apache.qpid.proton.engine.impl.ConnectionImpl;
-import org.apache.qpid.proton.engine.impl.RecordImpl;
-import org.apache.qpid.proton.reactor.Acceptor;
-import org.apache.qpid.proton.reactor.impl.AcceptorImpl;
-import org.apache.qpid.proton.reactor.Reactor;
-import org.apache.qpid.proton.reactor.ReactorChild;
-import org.apache.qpid.proton.reactor.Selectable;
-import org.apache.qpid.proton.reactor.Selectable.Callback;
-import org.apache.qpid.proton.reactor.Selector;
-import org.apache.qpid.proton.reactor.Task;
-import org.apache.qpid.proton.messenger.impl.Address;
-
-@SuppressWarnings("deprecation")
-public class ReactorImpl implements Reactor, Extendable {
-    public static final ExtendableAccessor<Event, Handler> ROOT = new ExtendableAccessor<>(Handler.class);
-
-    private CollectorImpl collector;
-    private long now;
-    private long timeout;
-    private Handler global;
-    private Handler handler;
-    private Set<ReactorChild> children;
-    private int selectables;
-    private boolean yield;
-    private boolean stop;
-    private Selectable selectable;
-    private EventType previous;
-    private Timer timer;
-    private final Pipe wakeup;
-    private Selector selector;
-    private Record attachments;
-    private final IO io;
-    protected static final String CONNECTION_PEER_ADDRESS_KEY = "pn_reactor_connection_peer_address";
-
-    @Override
-    public long mark() {
-        now = System.currentTimeMillis();
-        return now;
-    }
-
-    @Override
-    public long now() {
-        return now;
-    }
-
-    protected ReactorImpl(IO io) throws IOException {
-        collector = (CollectorImpl)Proton.collector();
-        global = new IOHandler();
-        handler = new BaseHandler();
-        children = new HashSet<ReactorChild>();
-        selectables = 0;
-        timer = new Timer(collector);
-        this.io = io;
-        wakeup = this.io.pipe();
-        mark();
-        attachments = new RecordImpl();
-    }
-
-    public ReactorImpl() throws IOException {
-        this(new IOImpl());
-    }
-
-    @Override
-    public void free() {
-        if (wakeup.source().isOpen()) {
-            try {
-                wakeup.source().close();
-            } catch(IOException e) {
-                // Ignore.
-            }
-        }
-        if (wakeup.sink().isOpen()) {
-            try {
-                wakeup.sink().close();
-            } catch(IOException e) {
-                // Ignore
-            }
-        }
-
-        if (selector != null) {
-            selector.free();
-        }
-
-        for (ReactorChild child : children) {
-            child.free();
-        }
-    }
-
-    @Override
-    public Record attachments() {
-        return attachments;
-    }
-
-    @Override
-    public long getTimeout() {
-        return timeout;
-    }
-
-    @Override
-    public void setTimeout(long timeout) {
-        this.timeout = timeout;
-    }
-
-    @Override
-    public Handler getGlobalHandler() {
-        return global;
-    }
-
-    @Override
-    public void setGlobalHandler(Handler handler) {
-        global = handler;
-    }
-
-    @Override
-    public Handler getHandler() {
-        return handler;
-    }
-
-    @Override
-    public void setHandler(Handler handler) {
-        this.handler = handler;
-    }
-
-    @Override
-    public Set<ReactorChild> children() {
-        return children;
-    }
-
-    @Override
-    public Collector collector() {
-        return collector;
-    }
-
-    private class ReleaseCallback implements Callback {
-        private final ReactorImpl reactor;
-        private final ReactorChild child;
-        public ReleaseCallback(ReactorImpl reactor, ReactorChild child) {
-            this.reactor = reactor;
-            this.child = child;
-        }
-        @Override
-        public void run(Selectable selectable) {
-            if (reactor.children.remove(child)) {
-                --reactor.selectables;
-                child.free();
-            }
-        }
-    }
-
-    @Override
-    public Selectable selectable() {
-        return selectable(null);
-    }
-
-    public SelectableImpl selectable(ReactorChild child) {
-        SelectableImpl result = new SelectableImpl();
-        result.setCollector(collector);
-        collector.put(Type.SELECTABLE_INIT, result);
-        result.setReactor(this);
-        children.add(child == null ? result : child);
-        result.onRelease(new ReleaseCallback(this, child == null ? result : child));
-        ++selectables;
-        return result;
-    }
-
-    @Override
-    public void update(Selectable selectable) {
-        SelectableImpl selectableImpl = (SelectableImpl)selectable;
-        if (!selectableImpl.isTerminated()) {
-            if (selectableImpl.isTerminal()) {
-                selectableImpl.terminated();
-                collector.put(Type.SELECTABLE_FINAL, selectable);
-            } else {
-                collector.put(Type.SELECTABLE_UPDATED, selectable);
-            }
-        }
-    }
-
-    // pn_event_handler
-    private Handler eventHandler(Event event) {
-        Handler result;
-        if (event.getLink() != null) {
-            result = BaseHandler.getHandler(event.getLink());
-            if (result != null) return result;
-        }
-        if (event.getSession() != null) {
-            result = BaseHandler.getHandler(event.getSession());
-            if (result != null) return result;
-        }
-        if (event.getConnection() != null) {
-            result = BaseHandler.getHandler(event.getConnection());
-            if (result != null) return result;
-        }
-
-        if (event.getTask() != null) {
-            result = BaseHandler.getHandler(event.getTask());
-            if (result != null) return result;
-        }
-
-        if (event.getSelectable() != null) {
-            result = BaseHandler.getHandler(event.getSelectable());
-            if (result != null) return result;
-        }
-
-        return handler;
-    }
-
-
-    @Override
-    public void yield() {
-        yield = true;
-    }
-
-    @Override
-    public boolean quiesced() {
-        Event event = collector.peek();
-        if (event == null) return true;
-        if (collector.more()) return false;
-        return event.getEventType() == Type.REACTOR_QUIESCED;
-    }
-
-    @Override
-    public boolean process() throws HandlerException {
-        mark();
-        EventType previous = null;
-        while (true) {
-            Event event = collector.peek();
-            if (event != null) {
-                if (yield) {
-                    yield = false;
-                    return true;
-                }
-                Handler handler = eventHandler(event);
-                dispatch(event, handler);
-                dispatch(event, global);
-
-                if (event.getEventType() == Type.CONNECTION_FINAL) {
-                    children.remove(event.getConnection());
-                }
-                this.previous = event.getEventType();
-                previous = this.previous;
-                collector.pop();
-
-            } else {
-                if (!stop && more()) {
-                    if (previous != Type.REACTOR_QUIESCED && this.previous != Type.REACTOR_FINAL) {
-                        collector.put(Type.REACTOR_QUIESCED, this);
-                    } else {
-                        return true;
-                    }
-                } else {
-                    if (selectable != null) {
-                        selectable.terminate();
-                        update(selectable);
-                        selectable = null;
-                    } else {
-                        collector.put(Type.REACTOR_FINAL, this);
-                        return false;
-                    }
-                }
-            }
-        }
-    }
-
-    private void dispatch(Event event, Handler handler) {
-        ROOT.set(event, handler);
-        event.dispatch(handler);
-    }
-
-    @Override
-    public void wakeup() {
-        try {
-            wakeup.sink().write(ByteBuffer.allocate(1));
-        } catch(ClosedChannelException channelClosedException) {
-            // Ignore - pipe already closed by reactor being shutdown.
-        } catch(IOException ioException) {
-            throw new ReactorInternalException(ioException);
-        }
-    }
-
-    @Override
-    public void start() {
-        collector.put(Type.REACTOR_INIT, this);
-        selectable = timerSelectable();
-    }
-
-    @Override
-    public void stop() throws HandlerException {
-        stop = true;
-    }
-
-    private boolean more() {
-        return timer.tasks() > 0 || selectables > 1;
-    }
-
-    @Override
-    public void run() throws HandlerException {
-        setTimeout(3141);
-        start();
-        while(process()) {}
-        stop();
-        process();
-        collector = null;
-    }
-
-    // pn_reactor_schedule from reactor.c
-    @Override
-    public Task schedule(int delay, Handler handler) {
-        Task task = timer.schedule(now + delay);
-        ((TaskImpl)task).setReactor(this);
-        BaseHandler.setHandler(task, handler);
-        if (selectable != null) {
-            selectable.setDeadline(timer.deadline());
-            update(selectable);
-        }
-        return task;
-    }
-
-    private void expireSelectable(Selectable selectable) {
-        ReactorImpl reactor = (ReactorImpl) selectable.getReactor();
-        reactor.timer.tick(reactor.now);
-        selectable.setDeadline(reactor.timer.deadline());
-        reactor.update(selectable);
-    }
-
-    private class TimerReadable implements Callback {
-
-        @Override
-        public void run(Selectable selectable) {
-            try {
-                wakeup.source().read(ByteBuffer.allocate(64));
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-            expireSelectable(selectable);
-        }
-
-    }
-
-    private class TimerExpired implements Callback {
-        @Override
-        public void run(Selectable selectable) {
-            expireSelectable(selectable);
-        }
-    }
-
-
-    // pni_timer_finalize from reactor.c
-    private static class TimerFree implements Callback {
-        @Override
-        public void run(Selectable selectable) {
-            try {
-                selectable.getChannel().close();
-            } catch(IOException ioException) {
-                // Ignore
-            }
-        }
-    }
-
-    private Selectable timerSelectable() {
-        Selectable sel = selectable();
-        sel.setChannel(wakeup.source());
-        sel.onReadable(new TimerReadable());
-        sel.onExpired(new TimerExpired());
-        sel.onFree(new TimerFree());
-        sel.setReading(true);
-        sel.setDeadline(timer.deadline());
-        update(sel);
-        return sel;
-    }
-
-    protected Selector getSelector() {
-        return selector;
-    }
-
-    protected void setSelector(Selector selector) {
-        this.selector = selector;
-    }
-
-    // pn_reactor_connection from connection.c
-    @Override
-    public Connection connection(Handler handler) {
-        Connection connection = Proton.connection();
-        BaseHandler.setHandler(connection, handler);
-        connection.collect(collector);
-        children.add(connection);
-        ((ConnectionImpl)connection).setReactor(this);
-        return connection;
-    }
-
-    @Override
-    public Connection connectionToHost(String host, int port, Handler handler) {
-        Connection connection = connection(handler);
-        setConnectionHost(connection, host, port);
-        return connection;
-    }
-
-    @Override
-    public String getConnectionAddress(Connection connection) {
-        Record r = connection.attachments();
-        Address addr = r.get(CONNECTION_PEER_ADDRESS_KEY, Address.class);
-        if (addr != null) {
-            StringBuilder sb = new StringBuilder(addr.getHost());
-            if (addr.getPort() != null)
-                sb.append(":" + addr.getPort());
-            return sb.toString();
-        }
-        return null;
-    }
-
-    @Override
-    public void setConnectionHost(Connection connection,
-                                  String host, int port) {
-        Record r = connection.attachments();
-        // cannot set the address on an incoming connection
-        if (r.get(AcceptorImpl.CONNECTION_ACCEPTOR_KEY, Acceptor.class) == null) {
-            Address addr = new Address();
-            addr.setHost(host);
-            if (port == 0) {
-                port = 5672;
-            }
-            addr.setPort(Integer.toString(port));
-            r.set(CONNECTION_PEER_ADDRESS_KEY, Address.class, addr);
-        } else {
-            throw new IllegalStateException("Cannot set the host address on an incoming Connection");
-        }
-    }
-
-    @Override
-    public Acceptor acceptor(String host, int port) throws IOException {
-        return this.acceptor(host, port, null);
-    }
-
-    @Override
-    public Acceptor acceptor(String host, int port, Handler handler) throws IOException {
-        return new AcceptorImpl(this, host, port, handler);
-    }
-
-    public IO getIO() {
-        return io;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorInternalException.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorInternalException.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorInternalException.java
deleted file mode 100644
index 6dde424..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorInternalException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor.impl;
-
-/**
- * Thrown by the reactor when it encounters an internal error condition.
- * This is analogous to an assertion failure in the proton-c reactor
- * implementation.
- */
-class ReactorInternalException extends RuntimeException {
-
-    private static final long serialVersionUID = 8979674526584642454L;
-
-    protected ReactorInternalException(String msg) {
-        super(msg);
-    }
-
-    protected ReactorInternalException(Throwable cause) {
-        super(cause);
-    }
-
-    protected ReactorInternalException(String msg, Throwable cause) {
-        super(msg, cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
deleted file mode 100644
index df4e6cc..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor.impl;
-
-import java.nio.channels.SelectableChannel;
-
-import org.apache.qpid.proton.engine.Collector;
-import org.apache.qpid.proton.engine.Event.Type;
-import org.apache.qpid.proton.engine.Record;
-import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.engine.impl.CollectorImpl;
-import org.apache.qpid.proton.engine.impl.RecordImpl;
-import org.apache.qpid.proton.reactor.Reactor;
-import org.apache.qpid.proton.reactor.Selectable;
-
-public class SelectableImpl implements Selectable {
-
-    private Callback readable;
-    private Callback writable;
-    private Callback error;
-    private Callback expire;
-    private Callback release;
-    private Callback free;
-
-    private boolean reading = false;
-    private boolean writing = false;
-    private long deadline = 0;
-    private SelectableChannel channel;
-    private Record attachments = new RecordImpl();
-    private boolean registered;
-    private Reactor reactor;
-    private Transport transport;
-    private boolean terminal;
-    private boolean terminated;
-
-    @Override
-    public boolean isReading() {
-        return reading;
-    }
-
-    @Override
-    public boolean isWriting() {
-        return writing;
-    }
-
-    @Override
-    public long getDeadline() {
-        return deadline;
-    }
-
-    @Override
-    public void setReading(boolean reading) {
-        this.reading = reading;
-    }
-
-    @Override
-    public void setWriting(boolean writing) {
-        this.writing = writing;
-    }
-
-    @Override
-    public void setDeadline(long deadline) {
-        this.deadline = deadline;
-    }
-
-    @Override
-    public void onReadable(Callback runnable) {
-        this.readable = runnable;
-    }
-
-    @Override
-    public void onWritable(Callback runnable) {
-        this.writable = runnable;
-    }
-
-    @Override
-    public void onExpired(Callback runnable) {
-        this.expire = runnable;
-    }
-
-    @Override
-    public void onError(Callback runnable) {
-        this.error = runnable;
-    }
-
-    @Override
-    public void onRelease(Callback runnable) {
-        this.release = runnable;
-    }
-
-    @Override
-    public void onFree(Callback runnable) {
-        this.free = runnable;
-    }
-
-    @Override
-    public void readable() {
-        if (readable != null) {
-            readable.run(this);
-        }
-    }
-
-    @Override
-    public void writeable() {
-        if (writable != null) {
-            writable.run(this);
-        }
-    }
-
-    @Override
-    public void expired() {
-        if (expire != null) {
-            expire.run(this);
-        }
-    }
-
-    @Override
-    public void error() {
-        if (error != null) {
-            error.run(this);
-        }
-    }
-
-    @Override
-    public void release() {
-        if (release != null) {
-            release.run(this);
-        }
-    }
-
-    @Override
-    public void free() {
-        if (free != null) {
-            free.run(this);
-        }
-    }
-
-    @Override
-    public void setChannel(SelectableChannel channel) {
-        this.channel = channel;
-    }
-
-    @Override
-    public SelectableChannel getChannel() {
-        return channel;
-    }
-
-    @Override
-    public boolean isRegistered() {
-        return registered;
-    }
-
-    @Override
-    public void setRegistered(boolean registered) {
-        this.registered = registered;
-    }
-
-    @Override
-    public void setCollector(final Collector collector) {
-        final CollectorImpl collectorImpl = (CollectorImpl)collector;
-
-        onReadable(new Callback() {
-            @Override
-            public void run(Selectable selectable) {
-                collectorImpl.put(Type.SELECTABLE_READABLE, selectable);
-            }
-        });
-        onWritable(new Callback() {
-            @Override
-            public void run(Selectable selectable) {
-                collectorImpl.put(Type.SELECTABLE_WRITABLE, selectable);
-            }
-        });
-        onExpired(new Callback() {
-            @Override
-            public void run(Selectable selectable) {
-                collectorImpl.put(Type.SELECTABLE_EXPIRED, selectable);
-            }
-        });
-        onError(new Callback() {
-            @Override
-            public void run(Selectable selectable) {
-                collectorImpl.put(Type.SELECTABLE_ERROR, selectable);
-            }
-        });
-    }
-
-    @Override
-    public Reactor getReactor() {
-        return reactor;
-    }
-
-    @Override
-    public void terminate() {
-        terminal = true;
-    }
-
-    @Override
-    public boolean isTerminal() {
-        return terminal;
-    }
-
-    protected Transport getTransport() {
-        return transport;
-    }
-
-    protected void setTransport(Transport transport) {
-        this.transport = transport;
-    }
-
-    protected void setReactor(Reactor reactor) {
-        this.reactor = reactor;
-    }
-
-    @Override
-    public Record attachments() {
-        return attachments;
-    }
-
-    public boolean isTerminated() {
-        return terminated;
-    }
-
-    public void terminated() {
-        terminated = true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
deleted file mode 100644
index b4efb39..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor.impl;
-
-import java.io.IOException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.HashSet;
-import java.util.Iterator;
-
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.reactor.Selectable;
-import org.apache.qpid.proton.reactor.Selector;
-
-class SelectorImpl implements Selector {
-
-    private final java.nio.channels.Selector selector;
-    private final HashSet<Selectable> selectables = new HashSet<Selectable>();
-    private final HashSet<Selectable> readable = new HashSet<Selectable>();
-    private final HashSet<Selectable> writeable = new HashSet<Selectable>();
-    private final HashSet<Selectable> expired = new HashSet<Selectable>();
-    private final HashSet<Selectable> error = new HashSet<Selectable>();
-
-    protected SelectorImpl(IO io) throws IOException {
-        selector = io.selector();
-    }
-
-    @Override
-    public void add(Selectable selectable) throws IOException {
-        // Selectable can be 'null' - if this is the case it can only ever receive expiry events.
-        if (selectable.getChannel() != null) {
-            selectable.getChannel().configureBlocking(false);
-            SelectionKey key = selectable.getChannel().register(selector, 0);
-            key.attach(selectable);
-        }
-        selectables.add(selectable);
-        update(selectable);
-    }
-
-    @Override
-    public void update(Selectable selectable) {
-        if (selectable.getChannel() != null) {
-            int interestedOps = 0;
-            if (selectable.getChannel() instanceof SocketChannel &&
-                    ((SocketChannel)selectable.getChannel()).isConnectionPending()) {
-                interestedOps |= SelectionKey.OP_CONNECT;
-            } else {
-                if (selectable.isReading()) {
-                    if (selectable.getChannel() instanceof ServerSocketChannel) {
-                        interestedOps |= SelectionKey.OP_ACCEPT;
-                    } else {
-                        interestedOps |= SelectionKey.OP_READ;
-                    }
-                }
-                if (selectable.isWriting()) interestedOps |= SelectionKey.OP_WRITE;
-            }
-            SelectionKey key = selectable.getChannel().keyFor(selector);
-            key.interestOps(interestedOps);
-        }
-    }
-
-    @Override
-    public void remove(Selectable selectable) {
-        if (selectable.getChannel() != null) {
-            SelectionKey key = selectable.getChannel().keyFor(selector);
-            if (key != null) {
-                key.cancel();
-                key.attach(null);
-            }
-        }
-        selectables.remove(selectable);
-    }
-
-    @Override
-    public void select(long timeout) throws IOException {
-
-        long now = System.currentTimeMillis();
-        if (timeout > 0) {
-            long deadline = 0;
-            // XXX: Note: this differs from the C code which requires a call to update() to make deadline changes take affect
-            for (Selectable selectable : selectables) {
-                long d = selectable.getDeadline();
-                if (d > 0) {
-                    deadline = (deadline == 0) ? d : Math.min(deadline,  d);
-                }
-            }
-
-            if (deadline > 0) {
-                long delta = deadline - now;
-                if (delta < 0) {
-                    timeout = 0;
-                } else if (delta < timeout) {
-                    timeout = delta;
-                }
-            }
-        }
-
-        error.clear();
-
-        long awoken = 0;
-        if (timeout > 0) {
-            long remainingTimeout = timeout;
-            while(remainingTimeout > 0) {
-                selector.select(remainingTimeout);
-                awoken = System.currentTimeMillis();
-
-                for (Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); iterator.hasNext();) {
-                    SelectionKey key = iterator.next();
-                    if (key.isConnectable()) {
-                        try {
-                            ((SocketChannel)key.channel()).finishConnect();
-                            update((Selectable)key.attachment());
-                        } catch(IOException ioException) {
-                            SelectableImpl selectable = (SelectableImpl)key.attachment();
-                            ErrorCondition condition = new ErrorCondition();
-                            condition.setCondition(Symbol.getSymbol("proton:io"));
-                            condition.setDescription(ioException.getMessage());
-                            Transport transport = selectable.getTransport();
-                            if (transport != null) {
-                                transport.setCondition(condition);
-                                transport.close_tail();
-                                transport.close_head();
-                                transport.pop(Math.max(0, transport.pending())); // Force generation of TRANSPORT_HEAD_CLOSE (not in C code)
-                            }
-                            error.add(selectable);
-                        }
-                        iterator.remove();
-                    }
-                }
-                if (!selector.selectedKeys().isEmpty()) {
-                    break;
-                }
-                remainingTimeout = remainingTimeout - (awoken - now);
-            }
-        } else {
-            selector.selectNow();
-            awoken = System.currentTimeMillis();
-        }
-
-        readable.clear();
-        writeable.clear();
-        expired.clear();
-        for (SelectionKey key : selector.selectedKeys()) {
-            Selectable selectable = (Selectable)key.attachment();
-            if (key.isReadable()) readable.add(selectable);
-            if (key.isAcceptable()) readable.add(selectable);
-            if (key.isWritable()) writeable.add(selectable);
-        }
-        selector.selectedKeys().clear();
-        // XXX: Note: this is different to the C code which evaluates expiry at the point the selectable is iterated over.
-        for (Selectable selectable : selectables) {
-            long deadline = selectable.getDeadline();
-            if (deadline > 0 && awoken >= deadline) {
-                expired.add(selectable);
-            }
-        }
-    }
-
-    @Override
-    public Iterator<Selectable> readable() {
-        return readable.iterator();
-    }
-
-    @Override
-    public Iterator<Selectable> writeable() {
-        return writeable.iterator();
-    }
-
-    @Override
-    public Iterator<Selectable> expired() {
-        return expired.iterator();
-    }
-
-    @Override
-    public Iterator<Selectable> error() {
-        return error.iterator();
-    }
-
-    @Override
-    public void free() {
-        try {
-            selector.close();
-        } catch(IOException ioException) {
-            // Ignore
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
deleted file mode 100644
index 11bb6b8..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor.impl;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.qpid.proton.engine.Record;
-import org.apache.qpid.proton.engine.impl.RecordImpl;
-import org.apache.qpid.proton.reactor.Reactor;
-import org.apache.qpid.proton.reactor.Task;
-
-public class TaskImpl implements Task, Comparable<TaskImpl> {
-    private final long deadline;
-    private final int counter;
-    private boolean cancelled = false;
-    private final AtomicInteger count = new AtomicInteger();
-    private Record attachments = new RecordImpl();
-    private Reactor reactor;
-
-    public TaskImpl(long deadline) {
-        this.deadline = deadline;
-        this.counter = count.getAndIncrement();
-    }
-
-    @Override
-    public int compareTo(TaskImpl other) {
-        int result;
-        if (deadline < other.deadline) {
-            result = -1;
-        } else if (deadline > other.deadline) {
-            result = 1;
-        } else {
-            result = counter - other.counter;
-        }
-        return result;
-    }
-
-    @Override
-    public long deadline() {
-        return deadline;
-    }
-
-    public boolean isCancelled() {
-        return cancelled;
-    }
-
-    @Override
-    public void cancel() {
-        cancelled = true;
-    }
-
-    public void setReactor(Reactor reactor) {
-        this.reactor = reactor;
-    }
-
-    @Override
-    public Reactor getReactor() {
-        return reactor;
-    }
-
-    @Override
-    public Record attachments() {
-        return attachments;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
deleted file mode 100644
index b8df19d..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.reactor.impl;
-
-import java.util.PriorityQueue;
-
-import org.apache.qpid.proton.engine.Collector;
-import org.apache.qpid.proton.engine.Event.Type;
-import org.apache.qpid.proton.engine.impl.CollectorImpl;
-import org.apache.qpid.proton.reactor.Task;
-
-public class Timer {
-
-    private CollectorImpl collector;
-    private PriorityQueue<TaskImpl> tasks = new PriorityQueue<TaskImpl>();
-
-    public Timer(Collector collector) {
-        this.collector = (CollectorImpl)collector;
-    }
-
-    Task schedule(long deadline) {
-        TaskImpl task = new TaskImpl(deadline);
-        tasks.add(task);
-        return task;
-    }
-
-    long deadline() {
-        flushCancelled();
-        if (tasks.size() > 0) {
-            Task task = tasks.peek();
-            return task.deadline();
-        } else {
-            return 0;
-        }
-    }
-
-    private void flushCancelled() {
-        while (!tasks.isEmpty()) {
-            TaskImpl task = tasks.peek();
-            if (task.isCancelled())
-                tasks.poll();
-            else
-                break;
-        }
-    }
-
-    void tick(long now) {
-        while(!tasks.isEmpty()) {
-            TaskImpl task = tasks.peek();
-            if (now >= task.deadline()) {
-                tasks.poll();
-                if (!task.isCancelled())
-                    collector.put(Type.TIMER_TASK, task);
-            } else {
-                break;
-            }
-        }
-    }
-
-    int tasks() {
-        flushCancelled();
-        return tasks.size();
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.codec.DataFactory
----------------------------------------------------------------------
diff --git a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.codec.DataFactory b/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.codec.DataFactory
deleted file mode 100644
index 46a716b..0000000
--- a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.codec.DataFactory
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.qpid.proton.codec.impl.DataFactoryImpl
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.driver.DriverFactory
----------------------------------------------------------------------
diff --git a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.driver.DriverFactory b/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.driver.DriverFactory
deleted file mode 100644
index 00e7a60..0000000
--- a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.driver.DriverFactory
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.qpid.proton.driver.impl.DriverFactoryImpl
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.engine.EngineFactory
----------------------------------------------------------------------
diff --git a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.engine.EngineFactory b/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.engine.EngineFactory
deleted file mode 100644
index 33f9865..0000000
--- a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.engine.EngineFactory
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.qpid.proton.engine.impl.EngineFactoryImpl
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.message.MessageFactory
----------------------------------------------------------------------
diff --git a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.message.MessageFactory b/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.message.MessageFactory
deleted file mode 100644
index 99eb726..0000000
--- a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.message.MessageFactory
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.qpid.proton.message.impl.MessageFactoryImpl
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.messenger.MessengerFactory
----------------------------------------------------------------------
diff --git a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.messenger.MessengerFactory b/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.messenger.MessengerFactory
deleted file mode 100644
index d0beeb4..0000000
--- a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.messenger.MessengerFactory
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.qpid.proton.messenger.impl.MessengerFactoryImpl
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message