Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 60176200C1B for ; Mon, 9 Jan 2017 16:07:19 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 5ED77160B4C; Mon, 9 Jan 2017 15:07:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 19F8B160B54 for ; Mon, 9 Jan 2017 16:07:15 +0100 (CET) Received: (qmail 93519 invoked by uid 500); 9 Jan 2017 15:07:15 -0000 Mailing-List: contact commits-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@qpid.apache.org Delivered-To: mailing list commits@qpid.apache.org Received: (qmail 92862 invoked by uid 99); 9 Jan 2017 15:07:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Jan 2017 15:07:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7454FDFCDF; Mon, 9 Jan 2017 15:07:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: robbie@apache.org To: commits@qpid.apache.org Date: Mon, 09 Jan 2017 15:07:21 -0000 Message-Id: <0541f3190f0d49e8b33c41fdcbcd0468@git.apache.org> In-Reply-To: <479f6bdf5cd14350bccbc3ec5acb0920@git.apache.org> References: <479f6bdf5cd14350bccbc3ec5acb0920@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [08/30] qpid-proton git commit: PROTON-1385: remove proton-j from the existing repo, it now has its own repo at: https://git-wip-us.apache.org/repos/asf/qpid-proton-j.git archived-at: Mon, 09 Jan 2017 15:07:19 -0000 http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java deleted file mode 100644 index 146ee09..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/ReactorChild.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.proton.reactor; - -/** - * Interface used to identify classes that can be a child of a reactor. - */ -public interface ReactorChild { - - /** Frees any resources associated with this child. */ - void free(); -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java deleted file mode 100644 index e91a0ee..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selectable.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.proton.reactor; - -import java.nio.channels.SelectableChannel; - -import org.apache.qpid.proton.engine.Collector; -import org.apache.qpid.proton.engine.Extendable; - -/** - * An entity that can be multiplexed using a {@link Selector}. - *

- * Every selectable is associated with exactly one {@link SelectableChannel}. - * Selectables may be interested in three kinds of events: read events, write - * events, and timer events. A selectable will express its interest in these - * events through the {@link #isReading()}, {@link #isWriting()}, and - * {@link #getDeadline()} methods. - *

- * When a read, write, or timer event occurs, the selectable must be notified by - * calling {@link #readable()}, {@link #writeable()}, or {@link #expired()} as - * appropriate. - * - * Once a selectable reaches a terminal state (see {@link #isTerminal()}, it - * will never be interested in events of any kind. When this occurs it should be - * removed from the Selector and discarded using {@link #free()}. - */ -public interface Selectable extends ReactorChild, Extendable { - - /** - * A callback that can be passed to the various "on" methods of the - * selectable - to allow code to be run when the selectable becomes ready - * for the associated operation. - */ - interface Callback { - void run(Selectable selectable); - } - - /** - * @return true if the selectable is interested in receiving - * notification (via the {@link #readable()} method that indicate - * that the associated {@link SelectableChannel} has data ready - * to be read from it. - */ - boolean isReading(); - - /** - * @return true if the selectable is interested in receiving - * notifications (via the {@link #writeable()} method that indicate - * that the associated {@link SelectableChannel} is ready to be - * written to. - */ - boolean isWriting(); - - /** - * @return a deadline after which this selectable can expect to receive - * a notification (via the {@link #expired()} method that indicates - * that the deadline has past. The deadline is expressed in the - * same format as {@link System#currentTimeMillis()}. Returning - * a deadline of zero (or a negative number) indicates that the - * selectable does not wish to be notified of expiry. - */ - long getDeadline(); - - /** - * Sets the value that will be returned by {@link #isReading()}. - * @param reading - */ - void setReading(boolean reading); - - /** - * Sets the value that will be returned by {@link #isWriting()}. - * @param writing - */ - void setWriting(boolean writing); - - /** - * Sets the value that will be returned by {@link #getDeadline()}. - * @param deadline - */ - void setDeadline(long deadline); - - /** - * Registers a callback that will be run when the selectable becomes ready - * for reading. - * @param runnable the callback to register. Any previously registered - * callback will be replaced. - */ - void onReadable(Callback runnable); - - /** - * Registers a callback that will be run when the selectable becomes ready - * for writing. - * @param runnable the callback to register. Any previously registered - * callback will be replaced. - */ - void onWritable(Callback runnable); - - /** - * Registers a callback that will be run when the selectable expires. - * @param runnable the callback to register. Any previously registered - * callback will be replaced. - */ - void onExpired(Callback runnable); - - /** - * Registers a callback that will be run when the selectable is notified of - * an error. - * @param runnable the callback to register. Any previously registered - * callback will be replaced. - */ - void onError(Callback runnable); - - /** - * Registers a callback that will be run when the selectable is notified - * that it has been released. - * @param runnable the callback to register. Any previously registered - * callback will be replaced. - */ - void onRelease(Callback runnable); - - /** - * Registers a callback that will be run when the selectable is notified - * that it has been free'd. - * @param runnable the callback to register. Any previously registered - * callback will be replaced. - */ - void onFree(Callback runnable); - - /** - * Notify the selectable that the underlying {@link SelectableChannel} is - * ready for a read operation. - */ - void readable(); - - /** - * Notify the selectable that the underlying {@link SelectableChannel} is - * ready for a write operation. - */ - void writeable(); - - /** Notify the selectable that it has expired. */ - void expired(); - - /** Notify the selectable that an error has occurred. */ - void error(); - - /** Notify the selectable that it has been released. */ - void release(); - - /** Notify the selectable that it has been free'd. */ - @Override - void free(); - - /** - * Associates a {@link SelectableChannel} with this selector. - * @param channel - */ - void setChannel(SelectableChannel channel); // This is the equivalent to pn_selectable_set_fd(...) - - /** @return the {@link SelectableChannel} associated with this selector. */ - SelectableChannel getChannel(); // This is the equivalent to pn_selectable_get_fd(...) - - /** - * Check if a selectable is registered. This can be used for tracking - * whether a given selectable has been registerd with an external event - * loop. - *

- * Note: the reactor code, currently, does not use this flag. - * @return trueif the selectable is registered. - */ - boolean isRegistered(); // XXX: unused in C reactor code - - /** - * Set the registered flag for a selectable. - *

- * Note: the reactor code, currently, does not use this flag. - * @param registered the value returned by {@link #isRegistered()} - */ - void setRegistered(boolean registered); // XXX: unused in C reactor code - - /** - * Configure a selectable with a set of callbacks that emit readable, - * writable, and expired events into the supplied collector. - * @param collector - */ - void setCollector(final Collector collector); - - /** @return the reactor to which this selectable is a child. */ - Reactor getReactor() ; - - /** - * Terminates the selectable. Once a selectable reaches a terminal state - * it will never be interested in events of any kind. - */ - public void terminate() ; - - /** - * @return true if the selectable has reached a terminal state. - */ - boolean isTerminal(); - -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java deleted file mode 100644 index 4228a8d..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Selector.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.proton.reactor; - -import java.io.IOException; -import java.util.Iterator; - -/** - * A multiplexor of instances of {@link Selectable}. - *

- * Many instances of Selectable can be added to a selector, and - * the {@link #select(long)} method used to block the calling thread until - * one of the Selectables becomes read to perform an operation. - *

- * This class is not thread safe, so only one thread should be manipulating the - * contents of the selector, or running the {@link #select(long)} method at - * any given time. - */ -public interface Selector { - - /** - * Adds a selectable to the selector. - * @param selectable - * @throws IOException - */ - void add(Selectable selectable) throws IOException; - - /** - * Updates the selector to reflect any changes interest by the specified - * selectable. This is achieved by calling the - * {@link Selectable#isReading()} and {@link Selectable#isWriting()} - * methods. - * @param selectable - */ - void update(Selectable selectable); - - /** - * Removes a selectable from the selector. - * @param selectable - */ - void remove(Selectable selectable); - - /** - * Waits for the specified timeout period for one or more selectables to - * become ready for an operation. Selectables that become ready are - * returned by the {@link #readable()}, {@link #writeable()}, - * {@link #expired()}, or {@link #error()} methods. - * - * @param timeout the maximum number of milliseconds to block the calling - * thread waiting for a selectable to become ready for an - * operation. The value zero is interpreted as check but - * don't block. - * @throws IOException - */ - void select(long timeout) throws IOException; - - /** - * @return the selectables that have become readable since the last call - * to {@link #select(long)}. Calling select clears - * any previous values in this set before adding new values - * corresponding to those selectables that have become readable. - */ - Iterator readable(); - - /** - * @return the selectables that have become writable since the last call - * to {@link #select(long)}. Calling select clears - * any previous values in this set before adding new values - * corresponding to those selectables that have become writable. - */ - Iterator writeable(); - - /** - * @return the selectables that have expired since the last call - * to {@link #select(long)}. Calling select clears - * any previous values in this set before adding new values - * corresponding to those selectables that have now expired. - */ - Iterator expired(); - - /** - * @return the selectables that have encountered an error since the last - * call to {@link #select(long)}. Calling select - * clears any previous values in this set before adding new values - * corresponding to those selectables that have encountered an - * error. - */ - Iterator error() ; - - /** Frees the resources used by this selector. */ - void free(); -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java deleted file mode 100644 index 7fb5964..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.proton.reactor; - -import org.apache.qpid.proton.engine.Event.Type; -import org.apache.qpid.proton.engine.Extendable; -import org.apache.qpid.proton.engine.Handler; - -/** - * Represents work scheduled with a {@link Reactor} for execution at - * some point in the future. - *

- * Tasks are created using the {@link Reactor#schedule(int, Handler)} - * method. - */ -public interface Task extends Extendable { - - /** - * @return the deadline at which the handler associated with the scheduled - * task should be delivered a {@link Type#TIMER_TASK} event. - */ - long deadline(); - - /** @return the reactor that created this task. */ - Reactor getReactor(); - - /** - * Cancel the execution of this task. No-op if invoked after the task was already executed. - */ - void cancel(); -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java deleted file mode 100644 index c5abbd8..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.proton.reactor.impl; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; - -import org.apache.qpid.proton.Proton; -import org.apache.qpid.proton.engine.BaseHandler; -import org.apache.qpid.proton.engine.Connection; -import org.apache.qpid.proton.engine.Handler; -import org.apache.qpid.proton.engine.Record; -import org.apache.qpid.proton.engine.Sasl; -import org.apache.qpid.proton.engine.Sasl.SaslOutcome; -import org.apache.qpid.proton.engine.Transport; -import org.apache.qpid.proton.engine.impl.RecordImpl; -import org.apache.qpid.proton.reactor.Acceptor; -import org.apache.qpid.proton.reactor.Reactor; -import org.apache.qpid.proton.reactor.impl.ReactorImpl; -import org.apache.qpid.proton.reactor.Selectable; -import org.apache.qpid.proton.reactor.Selectable.Callback; -import org.apache.qpid.proton.messenger.impl.Address; - -@SuppressWarnings("deprecation") -public class AcceptorImpl implements Acceptor { - - private Record attachments = new RecordImpl(); - private final SelectableImpl sel; - protected static final String CONNECTION_ACCEPTOR_KEY = "pn_reactor_connection_acceptor"; - - private class AcceptorReadable implements Callback { - @Override - public void run(Selectable selectable) { - Reactor reactor = selectable.getReactor(); - try { - SocketChannel socketChannel = ((ServerSocketChannel)selectable.getChannel()).accept(); - if (socketChannel == null) { - throw new ReactorInternalException("Selectable readable, but no socket to accept"); - } - Handler handler = BaseHandler.getHandler(AcceptorImpl.this); - if (handler == null) { - handler = reactor.getHandler(); - } - Connection conn = reactor.connection(handler); - Record conn_recs = conn.attachments(); - conn_recs.set(CONNECTION_ACCEPTOR_KEY, Acceptor.class, AcceptorImpl.this); - InetSocketAddress peerAddr = (InetSocketAddress)socketChannel.getRemoteAddress(); - if (peerAddr != null) { - Address addr = new Address(); - addr.setHost(peerAddr.getHostString()); - addr.setPort(Integer.toString(peerAddr.getPort())); - conn_recs.set(ReactorImpl.CONNECTION_PEER_ADDRESS_KEY, Address.class, addr); - } - Transport trans = Proton.transport(); - Sasl sasl = trans.sasl(); - sasl.server(); - sasl.setMechanisms("ANONYMOUS"); - sasl.done(SaslOutcome.PN_SASL_OK); - trans.bind(conn); - IOHandler.selectableTransport(reactor, socketChannel.socket(), trans); - } catch(IOException ioException) { - sel.error(); - } - } - } - - private static class AcceptorFree implements Callback { - @Override - public void run(Selectable selectable) { - try { - if (selectable.getChannel() != null) { - selectable.getChannel().close(); - } - } catch(IOException ioException) { - // Ignore - as we can't make the channel any more closed... - } - } - } - - protected AcceptorImpl(Reactor reactor, String host, int port, Handler handler) throws IOException { - ServerSocketChannel ssc = ((ReactorImpl)reactor).getIO().serverSocketChannel(); - ssc.bind(new InetSocketAddress(host, port)); - sel = ((ReactorImpl)reactor).selectable(this); - sel.setChannel(ssc); - sel.onReadable(new AcceptorReadable()); - sel.onFree(new AcceptorFree()); - sel.setReactor(reactor); - BaseHandler.setHandler(this, handler); - sel.setReading(true); - reactor.update(sel); - } - - @Override - public void close() { - if (!sel.isTerminal()) { - Reactor reactor = sel.getReactor(); - try { - sel.getChannel().close(); - } catch(IOException ioException) { - // Ignore. - } - sel.setChannel(null); - sel.terminate(); - reactor.update(sel); - } - } - - // Used for unit tests, where acceptor is bound to an ephemeral port - public int getPortNumber() throws IOException { - ServerSocketChannel ssc = (ServerSocketChannel)sel.getChannel(); - return ((InetSocketAddress)ssc.getLocalAddress()).getPort(); - } - - @Override - public void free() { - sel.free(); - } - - @Override - public Record attachments() { - return attachments; - } - -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IO.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IO.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IO.java deleted file mode 100644 index 1028ae8..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IO.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.proton.reactor.impl; - -import java.io.IOException; -import java.nio.channels.Pipe; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; - -// Java equivalent to pn_io. -// This is, currently, in the reactor.impl package because it is not -// used elsewhere in the proton-j codebase. Instead it is present to -// facilitate mocking of various Java I/O related resources so that -// the unit tests can check for leaks. -public interface IO { - - Pipe pipe() throws IOException; - - Selector selector() throws IOException; - - ServerSocketChannel serverSocketChannel() throws IOException; - - SocketChannel socketChannel() throws IOException; -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java deleted file mode 100644 index 2dd7e1a..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java +++ /dev/null @@ -1,392 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.proton.reactor.impl; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.nio.channels.Channel; -import java.nio.channels.SocketChannel; -import java.util.Iterator; - -import org.apache.qpid.proton.Proton; -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.engine.BaseHandler; -import org.apache.qpid.proton.engine.Connection; -import org.apache.qpid.proton.engine.EndpointState; -import org.apache.qpid.proton.engine.Event; -import org.apache.qpid.proton.engine.Sasl; -import org.apache.qpid.proton.engine.Transport; -import org.apache.qpid.proton.engine.impl.TransportImpl; -import org.apache.qpid.proton.engine.Record; -import org.apache.qpid.proton.reactor.Reactor; -import org.apache.qpid.proton.reactor.Selectable; -import org.apache.qpid.proton.reactor.Selectable.Callback; -import org.apache.qpid.proton.reactor.Selector; -import org.apache.qpid.proton.reactor.Acceptor; -import org.apache.qpid.proton.reactor.impl.AcceptorImpl; -import org.apache.qpid.proton.messenger.impl.Address; - -@SuppressWarnings("deprecation") -public class IOHandler extends BaseHandler { - - // pni_handle_quiesced from connection.c - private void handleQuiesced(Reactor reactor, Selector selector) throws IOException { - // check if we are still quiesced, other handlers of - // PN_REACTOR_QUIESCED could have produced more events to process - if (!reactor.quiesced()) return; - selector.select(reactor.getTimeout()); - reactor.mark(); - Iterator selectables = selector.readable(); - while(selectables.hasNext()) { - selectables.next().readable(); - } - selectables = selector.writeable(); - while(selectables.hasNext()) { - selectables.next().writeable(); - } - selectables = selector.expired(); - while(selectables.hasNext()) { - selectables.next().expired(); - } - selectables = selector.error(); - while(selectables.hasNext()) { - selectables.next().error(); - } - reactor.yield(); - } - - // pni_handle_open(...) from connection.c - private void handleOpen(Reactor reactor, Event event) { - Connection connection = event.getConnection(); - if (connection.getRemoteState() != EndpointState.UNINITIALIZED) { - return; - } - // Outgoing Reactor connections set the virtual host automatically using the - // following rules: - String vhost = connection.getHostname(); - if (vhost == null) { - // setHostname never called, use the host from the connection's - // socket address as the default virtual host: - String conAddr = reactor.getConnectionAddress(connection); - if (conAddr != null) { - Address addr = new Address(conAddr); - connection.setHostname(addr.getHost()); - } - } else if (vhost.isEmpty()) { - // setHostname called explictly with a null string. This allows - // the application to completely avoid sending a virtual host - // name - connection.setHostname(null); - } else { - // setHostname set by application - use it. - } - Transport transport = Proton.transport(); - Sasl sasl = transport.sasl(); - sasl.client(); - sasl.setMechanisms("ANONYMOUS"); - transport.bind(connection); - } - - // pni_handle_bound(...) from connection.c - // If this connection is an outgoing connection - not an incoming - // connection created by the Acceptor - create a socket connection to - // the peer address. - private void handleBound(Reactor reactor, Event event) { - Connection connection = event.getConnection(); - Record conn_recs = connection.attachments(); - if (conn_recs.get(AcceptorImpl.CONNECTION_ACCEPTOR_KEY, Acceptor.class) != null) { - // Connection was created via the Acceptor, so the socket already - // exists - return; - } - String url = reactor.getConnectionAddress(connection); - String hostname = connection.getHostname(); - int port = 5672; - - if (url != null) { - Address address = new Address(url); - hostname = address.getHost(); - try { - port = Integer.parseInt(address.getImpliedPort()); - } catch(NumberFormatException nfe) { - throw new IllegalArgumentException("Not a valid host: " + url, nfe); - } - } else if (hostname != null && !hostname.equals("")) { - // Backward compatibility with old code that illegally overloaded - // the connection's hostname - int colonIndex = hostname.indexOf(':'); - if (colonIndex >= 0) { - try { - port = Integer.parseInt(hostname.substring(colonIndex+1)); - } catch(NumberFormatException nfe) { - throw new IllegalArgumentException("Not a valid host: " + hostname, nfe); - } - hostname = hostname.substring(0, colonIndex); - } - } else { - throw new IllegalStateException("No address provided for Connection"); - } - - Transport transport = event.getConnection().getTransport(); - Socket socket = null; // In this case, 'null' is the proton-j equivalent of PN_INVALID_SOCKET - try { - SocketChannel socketChannel = ((ReactorImpl)reactor).getIO().socketChannel(); - socketChannel.configureBlocking(false); - socketChannel.connect(new InetSocketAddress(hostname, port)); - socket = socketChannel.socket(); - } catch(Exception exception) { - ErrorCondition condition = new ErrorCondition(); - condition.setCondition(Symbol.getSymbol("proton:io")); - condition.setDescription(exception.getMessage()); - transport.setCondition(condition); - transport.close_tail(); - transport.close_head(); - transport.pop(Math.max(0, transport.pending())); // Force generation of TRANSPORT_HEAD_CLOSE (not in C code) - } - selectableTransport(reactor, socket, transport); - } - - // pni_connection_capacity from connection.c - private static int capacity(Selectable selectable) { - Transport transport = ((SelectableImpl)selectable).getTransport(); - int capacity = transport.capacity(); - if (capacity < 0) { - if (transport.isClosed()) { - selectable.terminate(); - } - } - return capacity; - } - - // pni_connection_pending from connection.c - private static int pending(Selectable selectable) { - Transport transport = ((SelectableImpl)selectable).getTransport(); - int pending = transport.pending(); - if (pending < 0) { - if (transport.isClosed()) { - selectable.terminate(); - } - } - return pending; - } - - // pni_connection_deadline from connection.c - private static long deadline(SelectableImpl selectable) { - Reactor reactor = selectable.getReactor(); - Transport transport = selectable.getTransport(); - long deadline = transport.tick(reactor.now()); - return deadline; - } - - // pni_connection_update from connection.c - private static void update(Selectable selectable) { - SelectableImpl selectableImpl = (SelectableImpl)selectable; - int c = capacity(selectableImpl); - int p = pending(selectableImpl); - selectable.setReading(c > 0); - selectable.setWriting(p > 0); - selectable.setDeadline(deadline(selectableImpl)); - } - - // pni_connection_readable from connection.c - private static Callback connectionReadable = new Callback() { - @Override - public void run(Selectable selectable) { - Reactor reactor = selectable.getReactor(); - Transport transport = ((SelectableImpl)selectable).getTransport(); - int capacity = transport.capacity(); - if (capacity > 0) { - SocketChannel socketChannel = (SocketChannel)selectable.getChannel(); - try { - int n = socketChannel.read(transport.tail()); - if (n == -1) { - transport.close_tail(); - } else { - transport.process(); - } - } catch (IOException e) { - ErrorCondition condition = new ErrorCondition(); - condition.setCondition(Symbol.getSymbol("proton:io")); - condition.setDescription(e.getMessage()); - transport.setCondition(condition); - transport.close_tail(); - } - } - // (Comment from C code:) occasionally transport events aren't - // generated when expected, so the following hack ensures we - // always update the selector - update(selectable); - reactor.update(selectable); - } - }; - - // pni_connection_writable from connection.c - private static Callback connectionWritable = new Callback() { - @Override - public void run(Selectable selectable) { - Reactor reactor = selectable.getReactor(); - Transport transport = ((SelectableImpl)selectable).getTransport(); - int pending = transport.pending(); - if (pending > 0) { - SocketChannel channel = (SocketChannel)selectable.getChannel(); - try { - int n = channel.write(transport.head()); - if (n < 0) { - transport.close_head(); - } else { - transport.pop(n); - } - } catch(IOException ioException) { - ErrorCondition condition = new ErrorCondition(); - condition.setCondition(Symbol.getSymbol("proton:io")); - condition.setDescription(ioException.getMessage()); - transport.setCondition(condition); - transport.close_head(); - } - } - - int newPending = transport.pending(); - if (newPending != pending) { - update(selectable); - reactor.update(selectable); - } - } - }; - - // pni_connection_error from connection.c - private static Callback connectionError = new Callback() { - @Override - public void run(Selectable selectable) { - Reactor reactor = selectable.getReactor(); - selectable.terminate(); - reactor.update(selectable); - } - }; - - // pni_connection_expired from connection.c - private static Callback connectionExpired = new Callback() { - @Override - public void run(Selectable selectable) { - Reactor reactor = selectable.getReactor(); - Transport transport = ((SelectableImpl)selectable).getTransport(); - long deadline = transport.tick(reactor.now()); - selectable.setDeadline(deadline); - int c = capacity(selectable); - int p = pending(selectable); - selectable.setReading(c > 0); - selectable.setWriting(p > 0); - reactor.update(selectable); - } - }; - - private static Callback connectionFree = new Callback() { - @Override - public void run(Selectable selectable) { - Channel channel = selectable.getChannel(); - if (channel != null) { - try { - channel.close(); - } catch(IOException ioException) { - // Ignore - } - } - } - }; - - // pn_reactor_selectable_transport - // Note the socket argument can, validly be 'null' this is the equivalent of proton-c's PN_INVALID_SOCKET - protected static Selectable selectableTransport(Reactor reactor, Socket socket, Transport transport) { - Selectable selectable = reactor.selectable(); - selectable.setChannel(socket != null ? socket.getChannel() : null); - selectable.onReadable(connectionReadable); - selectable.onWritable(connectionWritable); - selectable.onError(connectionError); - selectable.onExpired(connectionExpired); - selectable.onFree(connectionFree); - ((SelectableImpl)selectable).setTransport(transport); - ((TransportImpl)transport).setSelectable(selectable); - ((TransportImpl)transport).setReactor(reactor); - update(selectable); - reactor.update(selectable); - return selectable; - } - - private void handleTransport(Reactor reactor, Event event) { - TransportImpl transport = (TransportImpl)event.getTransport(); - Selectable selectable = transport.getSelectable(); - if (selectable != null && !selectable.isTerminal()) { - update(selectable); - reactor.update(selectable); - } - } - - @Override - public void onUnhandled(Event event) { - try { - ReactorImpl reactor = (ReactorImpl)event.getReactor(); - Selector selector = reactor.getSelector(); - if (selector == null) { - selector = new SelectorImpl(reactor.getIO()); - reactor.setSelector(selector); - } - - Selectable selectable; - switch(event.getType()) { - case SELECTABLE_INIT: - selectable = event.getSelectable(); - selector.add(selectable); - break; - case SELECTABLE_UPDATED: - selectable = event.getSelectable(); - selector.update(selectable); - break; - case SELECTABLE_FINAL: - selectable = event.getSelectable(); - selector.remove(selectable); - selectable.release(); - break; - case CONNECTION_LOCAL_OPEN: - handleOpen(reactor, event); - break; - case CONNECTION_BOUND: - handleBound(reactor, event); - break; - case TRANSPORT: - handleTransport(reactor, event); - break; - case TRANSPORT_CLOSED: - event.getTransport().unbind(); - break; - case REACTOR_QUIESCED: - handleQuiesced(reactor, selector); - break; - default: - break; - } - } catch(IOException ioException) { - // XXX: Might not be the right exception type, but at least the exception isn't being swallowed - throw new ReactorInternalException(ioException); - } - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOImpl.java deleted file mode 100644 index 6376b16..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOImpl.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.proton.reactor.impl; - -import java.io.IOException; -import java.nio.channels.Pipe; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; - -public class IOImpl implements IO { - - @Override - public Pipe pipe() throws IOException { - return Pipe.open(); - } - - @Override - public Selector selector() throws IOException { - return Selector.open(); - } - - @Override - public ServerSocketChannel serverSocketChannel() throws IOException { - return ServerSocketChannel.open(); - } - - @Override - public SocketChannel socketChannel() throws IOException { - return SocketChannel.open(); - } - -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java deleted file mode 100644 index 30c8df9..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java +++ /dev/null @@ -1,485 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.proton.reactor.impl; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.Pipe; -import java.util.HashSet; -import java.util.Set; - -import org.apache.qpid.proton.Proton; -import org.apache.qpid.proton.engine.BaseHandler; -import org.apache.qpid.proton.engine.Collector; -import org.apache.qpid.proton.engine.Connection; -import org.apache.qpid.proton.engine.Event; -import org.apache.qpid.proton.engine.Event.Type; -import org.apache.qpid.proton.engine.EventType; -import org.apache.qpid.proton.engine.Extendable; -import org.apache.qpid.proton.engine.ExtendableAccessor; -import org.apache.qpid.proton.engine.Handler; -import org.apache.qpid.proton.engine.HandlerException; -import org.apache.qpid.proton.engine.Record; -import org.apache.qpid.proton.engine.impl.CollectorImpl; -import org.apache.qpid.proton.engine.impl.ConnectionImpl; -import org.apache.qpid.proton.engine.impl.RecordImpl; -import org.apache.qpid.proton.reactor.Acceptor; -import org.apache.qpid.proton.reactor.impl.AcceptorImpl; -import org.apache.qpid.proton.reactor.Reactor; -import org.apache.qpid.proton.reactor.ReactorChild; -import org.apache.qpid.proton.reactor.Selectable; -import org.apache.qpid.proton.reactor.Selectable.Callback; -import org.apache.qpid.proton.reactor.Selector; -import org.apache.qpid.proton.reactor.Task; -import org.apache.qpid.proton.messenger.impl.Address; - -@SuppressWarnings("deprecation") -public class ReactorImpl implements Reactor, Extendable { - public static final ExtendableAccessor ROOT = new ExtendableAccessor<>(Handler.class); - - private CollectorImpl collector; - private long now; - private long timeout; - private Handler global; - private Handler handler; - private Set children; - private int selectables; - private boolean yield; - private boolean stop; - private Selectable selectable; - private EventType previous; - private Timer timer; - private final Pipe wakeup; - private Selector selector; - private Record attachments; - private final IO io; - protected static final String CONNECTION_PEER_ADDRESS_KEY = "pn_reactor_connection_peer_address"; - - @Override - public long mark() { - now = System.currentTimeMillis(); - return now; - } - - @Override - public long now() { - return now; - } - - protected ReactorImpl(IO io) throws IOException { - collector = (CollectorImpl)Proton.collector(); - global = new IOHandler(); - handler = new BaseHandler(); - children = new HashSet(); - selectables = 0; - timer = new Timer(collector); - this.io = io; - wakeup = this.io.pipe(); - mark(); - attachments = new RecordImpl(); - } - - public ReactorImpl() throws IOException { - this(new IOImpl()); - } - - @Override - public void free() { - if (wakeup.source().isOpen()) { - try { - wakeup.source().close(); - } catch(IOException e) { - // Ignore. - } - } - if (wakeup.sink().isOpen()) { - try { - wakeup.sink().close(); - } catch(IOException e) { - // Ignore - } - } - - if (selector != null) { - selector.free(); - } - - for (ReactorChild child : children) { - child.free(); - } - } - - @Override - public Record attachments() { - return attachments; - } - - @Override - public long getTimeout() { - return timeout; - } - - @Override - public void setTimeout(long timeout) { - this.timeout = timeout; - } - - @Override - public Handler getGlobalHandler() { - return global; - } - - @Override - public void setGlobalHandler(Handler handler) { - global = handler; - } - - @Override - public Handler getHandler() { - return handler; - } - - @Override - public void setHandler(Handler handler) { - this.handler = handler; - } - - @Override - public Set children() { - return children; - } - - @Override - public Collector collector() { - return collector; - } - - private class ReleaseCallback implements Callback { - private final ReactorImpl reactor; - private final ReactorChild child; - public ReleaseCallback(ReactorImpl reactor, ReactorChild child) { - this.reactor = reactor; - this.child = child; - } - @Override - public void run(Selectable selectable) { - if (reactor.children.remove(child)) { - --reactor.selectables; - child.free(); - } - } - } - - @Override - public Selectable selectable() { - return selectable(null); - } - - public SelectableImpl selectable(ReactorChild child) { - SelectableImpl result = new SelectableImpl(); - result.setCollector(collector); - collector.put(Type.SELECTABLE_INIT, result); - result.setReactor(this); - children.add(child == null ? result : child); - result.onRelease(new ReleaseCallback(this, child == null ? result : child)); - ++selectables; - return result; - } - - @Override - public void update(Selectable selectable) { - SelectableImpl selectableImpl = (SelectableImpl)selectable; - if (!selectableImpl.isTerminated()) { - if (selectableImpl.isTerminal()) { - selectableImpl.terminated(); - collector.put(Type.SELECTABLE_FINAL, selectable); - } else { - collector.put(Type.SELECTABLE_UPDATED, selectable); - } - } - } - - // pn_event_handler - private Handler eventHandler(Event event) { - Handler result; - if (event.getLink() != null) { - result = BaseHandler.getHandler(event.getLink()); - if (result != null) return result; - } - if (event.getSession() != null) { - result = BaseHandler.getHandler(event.getSession()); - if (result != null) return result; - } - if (event.getConnection() != null) { - result = BaseHandler.getHandler(event.getConnection()); - if (result != null) return result; - } - - if (event.getTask() != null) { - result = BaseHandler.getHandler(event.getTask()); - if (result != null) return result; - } - - if (event.getSelectable() != null) { - result = BaseHandler.getHandler(event.getSelectable()); - if (result != null) return result; - } - - return handler; - } - - - @Override - public void yield() { - yield = true; - } - - @Override - public boolean quiesced() { - Event event = collector.peek(); - if (event == null) return true; - if (collector.more()) return false; - return event.getEventType() == Type.REACTOR_QUIESCED; - } - - @Override - public boolean process() throws HandlerException { - mark(); - EventType previous = null; - while (true) { - Event event = collector.peek(); - if (event != null) { - if (yield) { - yield = false; - return true; - } - Handler handler = eventHandler(event); - dispatch(event, handler); - dispatch(event, global); - - if (event.getEventType() == Type.CONNECTION_FINAL) { - children.remove(event.getConnection()); - } - this.previous = event.getEventType(); - previous = this.previous; - collector.pop(); - - } else { - if (!stop && more()) { - if (previous != Type.REACTOR_QUIESCED && this.previous != Type.REACTOR_FINAL) { - collector.put(Type.REACTOR_QUIESCED, this); - } else { - return true; - } - } else { - if (selectable != null) { - selectable.terminate(); - update(selectable); - selectable = null; - } else { - collector.put(Type.REACTOR_FINAL, this); - return false; - } - } - } - } - } - - private void dispatch(Event event, Handler handler) { - ROOT.set(event, handler); - event.dispatch(handler); - } - - @Override - public void wakeup() { - try { - wakeup.sink().write(ByteBuffer.allocate(1)); - } catch(ClosedChannelException channelClosedException) { - // Ignore - pipe already closed by reactor being shutdown. - } catch(IOException ioException) { - throw new ReactorInternalException(ioException); - } - } - - @Override - public void start() { - collector.put(Type.REACTOR_INIT, this); - selectable = timerSelectable(); - } - - @Override - public void stop() throws HandlerException { - stop = true; - } - - private boolean more() { - return timer.tasks() > 0 || selectables > 1; - } - - @Override - public void run() throws HandlerException { - setTimeout(3141); - start(); - while(process()) {} - stop(); - process(); - collector = null; - } - - // pn_reactor_schedule from reactor.c - @Override - public Task schedule(int delay, Handler handler) { - Task task = timer.schedule(now + delay); - ((TaskImpl)task).setReactor(this); - BaseHandler.setHandler(task, handler); - if (selectable != null) { - selectable.setDeadline(timer.deadline()); - update(selectable); - } - return task; - } - - private void expireSelectable(Selectable selectable) { - ReactorImpl reactor = (ReactorImpl) selectable.getReactor(); - reactor.timer.tick(reactor.now); - selectable.setDeadline(reactor.timer.deadline()); - reactor.update(selectable); - } - - private class TimerReadable implements Callback { - - @Override - public void run(Selectable selectable) { - try { - wakeup.source().read(ByteBuffer.allocate(64)); - } catch (IOException e) { - throw new RuntimeException(e); - } - expireSelectable(selectable); - } - - } - - private class TimerExpired implements Callback { - @Override - public void run(Selectable selectable) { - expireSelectable(selectable); - } - } - - - // pni_timer_finalize from reactor.c - private static class TimerFree implements Callback { - @Override - public void run(Selectable selectable) { - try { - selectable.getChannel().close(); - } catch(IOException ioException) { - // Ignore - } - } - } - - private Selectable timerSelectable() { - Selectable sel = selectable(); - sel.setChannel(wakeup.source()); - sel.onReadable(new TimerReadable()); - sel.onExpired(new TimerExpired()); - sel.onFree(new TimerFree()); - sel.setReading(true); - sel.setDeadline(timer.deadline()); - update(sel); - return sel; - } - - protected Selector getSelector() { - return selector; - } - - protected void setSelector(Selector selector) { - this.selector = selector; - } - - // pn_reactor_connection from connection.c - @Override - public Connection connection(Handler handler) { - Connection connection = Proton.connection(); - BaseHandler.setHandler(connection, handler); - connection.collect(collector); - children.add(connection); - ((ConnectionImpl)connection).setReactor(this); - return connection; - } - - @Override - public Connection connectionToHost(String host, int port, Handler handler) { - Connection connection = connection(handler); - setConnectionHost(connection, host, port); - return connection; - } - - @Override - public String getConnectionAddress(Connection connection) { - Record r = connection.attachments(); - Address addr = r.get(CONNECTION_PEER_ADDRESS_KEY, Address.class); - if (addr != null) { - StringBuilder sb = new StringBuilder(addr.getHost()); - if (addr.getPort() != null) - sb.append(":" + addr.getPort()); - return sb.toString(); - } - return null; - } - - @Override - public void setConnectionHost(Connection connection, - String host, int port) { - Record r = connection.attachments(); - // cannot set the address on an incoming connection - if (r.get(AcceptorImpl.CONNECTION_ACCEPTOR_KEY, Acceptor.class) == null) { - Address addr = new Address(); - addr.setHost(host); - if (port == 0) { - port = 5672; - } - addr.setPort(Integer.toString(port)); - r.set(CONNECTION_PEER_ADDRESS_KEY, Address.class, addr); - } else { - throw new IllegalStateException("Cannot set the host address on an incoming Connection"); - } - } - - @Override - public Acceptor acceptor(String host, int port) throws IOException { - return this.acceptor(host, port, null); - } - - @Override - public Acceptor acceptor(String host, int port, Handler handler) throws IOException { - return new AcceptorImpl(this, host, port, handler); - } - - public IO getIO() { - return io; - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorInternalException.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorInternalException.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorInternalException.java deleted file mode 100644 index 6dde424..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorInternalException.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.proton.reactor.impl; - -/** - * Thrown by the reactor when it encounters an internal error condition. - * This is analogous to an assertion failure in the proton-c reactor - * implementation. - */ -class ReactorInternalException extends RuntimeException { - - private static final long serialVersionUID = 8979674526584642454L; - - protected ReactorInternalException(String msg) { - super(msg); - } - - protected ReactorInternalException(Throwable cause) { - super(cause); - } - - protected ReactorInternalException(String msg, Throwable cause) { - super(msg, cause); - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java deleted file mode 100644 index df4e6cc..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectableImpl.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.proton.reactor.impl; - -import java.nio.channels.SelectableChannel; - -import org.apache.qpid.proton.engine.Collector; -import org.apache.qpid.proton.engine.Event.Type; -import org.apache.qpid.proton.engine.Record; -import org.apache.qpid.proton.engine.Transport; -import org.apache.qpid.proton.engine.impl.CollectorImpl; -import org.apache.qpid.proton.engine.impl.RecordImpl; -import org.apache.qpid.proton.reactor.Reactor; -import org.apache.qpid.proton.reactor.Selectable; - -public class SelectableImpl implements Selectable { - - private Callback readable; - private Callback writable; - private Callback error; - private Callback expire; - private Callback release; - private Callback free; - - private boolean reading = false; - private boolean writing = false; - private long deadline = 0; - private SelectableChannel channel; - private Record attachments = new RecordImpl(); - private boolean registered; - private Reactor reactor; - private Transport transport; - private boolean terminal; - private boolean terminated; - - @Override - public boolean isReading() { - return reading; - } - - @Override - public boolean isWriting() { - return writing; - } - - @Override - public long getDeadline() { - return deadline; - } - - @Override - public void setReading(boolean reading) { - this.reading = reading; - } - - @Override - public void setWriting(boolean writing) { - this.writing = writing; - } - - @Override - public void setDeadline(long deadline) { - this.deadline = deadline; - } - - @Override - public void onReadable(Callback runnable) { - this.readable = runnable; - } - - @Override - public void onWritable(Callback runnable) { - this.writable = runnable; - } - - @Override - public void onExpired(Callback runnable) { - this.expire = runnable; - } - - @Override - public void onError(Callback runnable) { - this.error = runnable; - } - - @Override - public void onRelease(Callback runnable) { - this.release = runnable; - } - - @Override - public void onFree(Callback runnable) { - this.free = runnable; - } - - @Override - public void readable() { - if (readable != null) { - readable.run(this); - } - } - - @Override - public void writeable() { - if (writable != null) { - writable.run(this); - } - } - - @Override - public void expired() { - if (expire != null) { - expire.run(this); - } - } - - @Override - public void error() { - if (error != null) { - error.run(this); - } - } - - @Override - public void release() { - if (release != null) { - release.run(this); - } - } - - @Override - public void free() { - if (free != null) { - free.run(this); - } - } - - @Override - public void setChannel(SelectableChannel channel) { - this.channel = channel; - } - - @Override - public SelectableChannel getChannel() { - return channel; - } - - @Override - public boolean isRegistered() { - return registered; - } - - @Override - public void setRegistered(boolean registered) { - this.registered = registered; - } - - @Override - public void setCollector(final Collector collector) { - final CollectorImpl collectorImpl = (CollectorImpl)collector; - - onReadable(new Callback() { - @Override - public void run(Selectable selectable) { - collectorImpl.put(Type.SELECTABLE_READABLE, selectable); - } - }); - onWritable(new Callback() { - @Override - public void run(Selectable selectable) { - collectorImpl.put(Type.SELECTABLE_WRITABLE, selectable); - } - }); - onExpired(new Callback() { - @Override - public void run(Selectable selectable) { - collectorImpl.put(Type.SELECTABLE_EXPIRED, selectable); - } - }); - onError(new Callback() { - @Override - public void run(Selectable selectable) { - collectorImpl.put(Type.SELECTABLE_ERROR, selectable); - } - }); - } - - @Override - public Reactor getReactor() { - return reactor; - } - - @Override - public void terminate() { - terminal = true; - } - - @Override - public boolean isTerminal() { - return terminal; - } - - protected Transport getTransport() { - return transport; - } - - protected void setTransport(Transport transport) { - this.transport = transport; - } - - protected void setReactor(Reactor reactor) { - this.reactor = reactor; - } - - @Override - public Record attachments() { - return attachments; - } - - public boolean isTerminated() { - return terminated; - } - - public void terminated() { - terminated = true; - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java deleted file mode 100644 index b4efb39..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java +++ /dev/null @@ -1,209 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.proton.reactor.impl; - -import java.io.IOException; -import java.nio.channels.SelectionKey; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.HashSet; -import java.util.Iterator; - -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.engine.Transport; -import org.apache.qpid.proton.reactor.Selectable; -import org.apache.qpid.proton.reactor.Selector; - -class SelectorImpl implements Selector { - - private final java.nio.channels.Selector selector; - private final HashSet selectables = new HashSet(); - private final HashSet readable = new HashSet(); - private final HashSet writeable = new HashSet(); - private final HashSet expired = new HashSet(); - private final HashSet error = new HashSet(); - - protected SelectorImpl(IO io) throws IOException { - selector = io.selector(); - } - - @Override - public void add(Selectable selectable) throws IOException { - // Selectable can be 'null' - if this is the case it can only ever receive expiry events. - if (selectable.getChannel() != null) { - selectable.getChannel().configureBlocking(false); - SelectionKey key = selectable.getChannel().register(selector, 0); - key.attach(selectable); - } - selectables.add(selectable); - update(selectable); - } - - @Override - public void update(Selectable selectable) { - if (selectable.getChannel() != null) { - int interestedOps = 0; - if (selectable.getChannel() instanceof SocketChannel && - ((SocketChannel)selectable.getChannel()).isConnectionPending()) { - interestedOps |= SelectionKey.OP_CONNECT; - } else { - if (selectable.isReading()) { - if (selectable.getChannel() instanceof ServerSocketChannel) { - interestedOps |= SelectionKey.OP_ACCEPT; - } else { - interestedOps |= SelectionKey.OP_READ; - } - } - if (selectable.isWriting()) interestedOps |= SelectionKey.OP_WRITE; - } - SelectionKey key = selectable.getChannel().keyFor(selector); - key.interestOps(interestedOps); - } - } - - @Override - public void remove(Selectable selectable) { - if (selectable.getChannel() != null) { - SelectionKey key = selectable.getChannel().keyFor(selector); - if (key != null) { - key.cancel(); - key.attach(null); - } - } - selectables.remove(selectable); - } - - @Override - public void select(long timeout) throws IOException { - - long now = System.currentTimeMillis(); - if (timeout > 0) { - long deadline = 0; - // XXX: Note: this differs from the C code which requires a call to update() to make deadline changes take affect - for (Selectable selectable : selectables) { - long d = selectable.getDeadline(); - if (d > 0) { - deadline = (deadline == 0) ? d : Math.min(deadline, d); - } - } - - if (deadline > 0) { - long delta = deadline - now; - if (delta < 0) { - timeout = 0; - } else if (delta < timeout) { - timeout = delta; - } - } - } - - error.clear(); - - long awoken = 0; - if (timeout > 0) { - long remainingTimeout = timeout; - while(remainingTimeout > 0) { - selector.select(remainingTimeout); - awoken = System.currentTimeMillis(); - - for (Iterator iterator = selector.selectedKeys().iterator(); iterator.hasNext();) { - SelectionKey key = iterator.next(); - if (key.isConnectable()) { - try { - ((SocketChannel)key.channel()).finishConnect(); - update((Selectable)key.attachment()); - } catch(IOException ioException) { - SelectableImpl selectable = (SelectableImpl)key.attachment(); - ErrorCondition condition = new ErrorCondition(); - condition.setCondition(Symbol.getSymbol("proton:io")); - condition.setDescription(ioException.getMessage()); - Transport transport = selectable.getTransport(); - if (transport != null) { - transport.setCondition(condition); - transport.close_tail(); - transport.close_head(); - transport.pop(Math.max(0, transport.pending())); // Force generation of TRANSPORT_HEAD_CLOSE (not in C code) - } - error.add(selectable); - } - iterator.remove(); - } - } - if (!selector.selectedKeys().isEmpty()) { - break; - } - remainingTimeout = remainingTimeout - (awoken - now); - } - } else { - selector.selectNow(); - awoken = System.currentTimeMillis(); - } - - readable.clear(); - writeable.clear(); - expired.clear(); - for (SelectionKey key : selector.selectedKeys()) { - Selectable selectable = (Selectable)key.attachment(); - if (key.isReadable()) readable.add(selectable); - if (key.isAcceptable()) readable.add(selectable); - if (key.isWritable()) writeable.add(selectable); - } - selector.selectedKeys().clear(); - // XXX: Note: this is different to the C code which evaluates expiry at the point the selectable is iterated over. - for (Selectable selectable : selectables) { - long deadline = selectable.getDeadline(); - if (deadline > 0 && awoken >= deadline) { - expired.add(selectable); - } - } - } - - @Override - public Iterator readable() { - return readable.iterator(); - } - - @Override - public Iterator writeable() { - return writeable.iterator(); - } - - @Override - public Iterator expired() { - return expired.iterator(); - } - - @Override - public Iterator error() { - return error.iterator(); - } - - @Override - public void free() { - try { - selector.close(); - } catch(IOException ioException) { - // Ignore - } - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java deleted file mode 100644 index 11bb6b8..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.proton.reactor.impl; - -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.qpid.proton.engine.Record; -import org.apache.qpid.proton.engine.impl.RecordImpl; -import org.apache.qpid.proton.reactor.Reactor; -import org.apache.qpid.proton.reactor.Task; - -public class TaskImpl implements Task, Comparable { - private final long deadline; - private final int counter; - private boolean cancelled = false; - private final AtomicInteger count = new AtomicInteger(); - private Record attachments = new RecordImpl(); - private Reactor reactor; - - public TaskImpl(long deadline) { - this.deadline = deadline; - this.counter = count.getAndIncrement(); - } - - @Override - public int compareTo(TaskImpl other) { - int result; - if (deadline < other.deadline) { - result = -1; - } else if (deadline > other.deadline) { - result = 1; - } else { - result = counter - other.counter; - } - return result; - } - - @Override - public long deadline() { - return deadline; - } - - public boolean isCancelled() { - return cancelled; - } - - @Override - public void cancel() { - cancelled = true; - } - - public void setReactor(Reactor reactor) { - this.reactor = reactor; - } - - @Override - public Reactor getReactor() { - return reactor; - } - - @Override - public Record attachments() { - return attachments; - } - -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java deleted file mode 100644 index b8df19d..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.proton.reactor.impl; - -import java.util.PriorityQueue; - -import org.apache.qpid.proton.engine.Collector; -import org.apache.qpid.proton.engine.Event.Type; -import org.apache.qpid.proton.engine.impl.CollectorImpl; -import org.apache.qpid.proton.reactor.Task; - -public class Timer { - - private CollectorImpl collector; - private PriorityQueue tasks = new PriorityQueue(); - - public Timer(Collector collector) { - this.collector = (CollectorImpl)collector; - } - - Task schedule(long deadline) { - TaskImpl task = new TaskImpl(deadline); - tasks.add(task); - return task; - } - - long deadline() { - flushCancelled(); - if (tasks.size() > 0) { - Task task = tasks.peek(); - return task.deadline(); - } else { - return 0; - } - } - - private void flushCancelled() { - while (!tasks.isEmpty()) { - TaskImpl task = tasks.peek(); - if (task.isCancelled()) - tasks.poll(); - else - break; - } - } - - void tick(long now) { - while(!tasks.isEmpty()) { - TaskImpl task = tasks.peek(); - if (now >= task.deadline()) { - tasks.poll(); - if (!task.isCancelled()) - collector.put(Type.TIMER_TASK, task); - } else { - break; - } - } - } - - int tasks() { - flushCancelled(); - return tasks.size(); - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.codec.DataFactory ---------------------------------------------------------------------- diff --git a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.codec.DataFactory b/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.codec.DataFactory deleted file mode 100644 index 46a716b..0000000 --- a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.codec.DataFactory +++ /dev/null @@ -1 +0,0 @@ -org.apache.qpid.proton.codec.impl.DataFactoryImpl \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.driver.DriverFactory ---------------------------------------------------------------------- diff --git a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.driver.DriverFactory b/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.driver.DriverFactory deleted file mode 100644 index 00e7a60..0000000 --- a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.driver.DriverFactory +++ /dev/null @@ -1 +0,0 @@ -org.apache.qpid.proton.driver.impl.DriverFactoryImpl \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.engine.EngineFactory ---------------------------------------------------------------------- diff --git a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.engine.EngineFactory b/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.engine.EngineFactory deleted file mode 100644 index 33f9865..0000000 --- a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.engine.EngineFactory +++ /dev/null @@ -1 +0,0 @@ -org.apache.qpid.proton.engine.impl.EngineFactoryImpl \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.message.MessageFactory ---------------------------------------------------------------------- diff --git a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.message.MessageFactory b/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.message.MessageFactory deleted file mode 100644 index 99eb726..0000000 --- a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.message.MessageFactory +++ /dev/null @@ -1 +0,0 @@ -org.apache.qpid.proton.message.impl.MessageFactoryImpl \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.messenger.MessengerFactory ---------------------------------------------------------------------- diff --git a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.messenger.MessengerFactory b/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.messenger.MessengerFactory deleted file mode 100644 index d0beeb4..0000000 --- a/proton-j/src/main/resources/META-INF/services/org.apache.qpid.proton.messenger.MessengerFactory +++ /dev/null @@ -1 +0,0 @@ -org.apache.qpid.proton.messenger.impl.MessengerFactoryImpl \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org For additional commands, e-mail: commits-help@qpid.apache.org