hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject [1/7] httpcomponents-core git commit: Rewrite of I/O reactor internal channel management; more efficient handling of outgoing connection requests
Date Thu, 15 Jun 2017 05:59:32 GMT
Repository: httpcomponents-core
Updated Branches:
  refs/heads/master 12065472a -> 8b5b5ba3e


http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalIOSession.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalIOSession.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalIOSession.java
deleted file mode 100644
index f938e1d..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalIOSession.java
+++ /dev/null
@@ -1,330 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation.  For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-
-package org.apache.hc.core5.reactor;
-
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.channels.ByteChannel;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.hc.core5.annotation.Contract;
-import org.apache.hc.core5.annotation.ThreadingBehavior;
-import org.apache.hc.core5.function.Callback;
-import org.apache.hc.core5.io.ShutdownType;
-import org.apache.hc.core5.net.NamedEndpoint;
-import org.apache.hc.core5.reactor.ssl.SSLBufferManagement;
-import org.apache.hc.core5.reactor.ssl.SSLIOSession;
-import org.apache.hc.core5.reactor.ssl.SSLMode;
-import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
-import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
-import org.apache.hc.core5.reactor.ssl.TlsDetails;
-import org.apache.hc.core5.util.Asserts;
-
-/**
- * @since 5.0
- */
-@Contract(threading = ThreadingBehavior.SAFE)
-class InternalIOSession implements TlsCapableIOSession {
-
-    private final NamedEndpoint namedEndpoint;
-    private final IOSession ioSession;
-    private final AtomicReference<SSLIOSession> tlsSessionRef;
-    private final Queue<InternalIOSession> closedSessions;
-    private final AtomicBoolean connected;
-    private final AtomicBoolean closed;
-
-    private volatile long lastAccessTime;
-
-    InternalIOSession(
-            final NamedEndpoint namedEndpoint,
-            final IOSession ioSession,
-            final Queue<InternalIOSession> closedSessions) {
-        this.namedEndpoint = namedEndpoint;
-        this.ioSession = ioSession;
-        this.closedSessions = closedSessions;
-        this.tlsSessionRef = new AtomicReference<>(null);
-        this.connected = new AtomicBoolean(false);
-        this.closed = new AtomicBoolean(false);
-        updateAccessTime();
-    }
-
-    @Override
-    public String getId() {
-        return ioSession.getId();
-    }
-
-    void updateAccessTime() {
-        this.lastAccessTime = System.currentTimeMillis();
-    }
-
-    long getLastAccessTime() {
-        return lastAccessTime;
-    }
-
-    private IOSession getSessionImpl() {
-        final SSLIOSession tlsSession = tlsSessionRef.get();
-        if (tlsSession != null) {
-            return tlsSession;
-        } else {
-            return ioSession;
-        }
-    }
-
-    private IOEventHandler getEventHandler() {
-        final IOEventHandler handler = ioSession.getHandler();
-        Asserts.notNull(handler, "IO event handler");
-        return handler;
-    }
-
-    void onConnected() throws IOException {
-        final SSLIOSession tlsSession = tlsSessionRef.get();
-        if (tlsSession != null) {
-            if (!tlsSession.isInitialized()) {
-                tlsSession.initialize();
-            }
-        } else {
-            if (connected.compareAndSet(false, true)) {
-                final IOEventHandler handler = getEventHandler();
-                handler.connected(this);
-            }
-        }
-    }
-
-    void onInputReady() throws IOException {
-        final SSLIOSession tlsSession = tlsSessionRef.get();
-        if (tlsSession != null) {
-            if (!tlsSession.isInitialized()) {
-                tlsSession.initialize();
-            }
-            if (tlsSession.isAppInputReady()) {
-                do {
-                    final IOEventHandler handler = getEventHandler();
-                    handler.inputReady(this);
-                } while (tlsSession.hasInputDate());
-            }
-            tlsSession.inboundTransport();
-        } else {
-            final IOEventHandler handler = getEventHandler();
-            handler.inputReady(this);
-        }
-    }
-
-    void onOutputReady() throws IOException {
-        final SSLIOSession tlsSession = tlsSessionRef.get();
-        if (tlsSession != null) {
-            if (!tlsSession.isInitialized()) {
-                tlsSession.initialize();
-            }
-            if (tlsSession.isAppOutputReady()) {
-                final IOEventHandler handler = getEventHandler();
-                handler.outputReady(this);
-            }
-            tlsSession.outboundTransport();
-        } else {
-            final IOEventHandler handler = getEventHandler();
-            handler.outputReady(this);
-        }
-    }
-
-    void onTimeout() throws IOException {
-        final IOEventHandler handler = getEventHandler();
-        handler.timeout(this);
-        final SSLIOSession tlsSession = tlsSessionRef.get();
-        if (tlsSession != null) {
-            if (tlsSession.isOutboundDone() && !tlsSession.isInboundDone()) {
-                // The session failed to terminate cleanly
-                tlsSession.shutdown(ShutdownType.IMMEDIATE);
-            }
-        }
-    }
-
-    void onException(final Exception cause) {
-        final IOEventHandler handler = getEventHandler();
-        try {
-            handler.exception(this, cause);
-        } finally {
-            shutdown(ShutdownType.IMMEDIATE);
-        }
-    }
-
-    void onDisconnected() {
-        final IOEventHandler handler = getEventHandler();
-        handler.disconnected(this);
-    }
-
-    @Override
-    public void startTls(
-            final SSLContext sslContext,
-            final SSLBufferManagement sslBufferManagement,
-            final SSLSessionInitializer initializer,
-            final SSLSessionVerifier verifier) {
-        if (!tlsSessionRef.compareAndSet(null, new SSLIOSession(
-                namedEndpoint,
-                ioSession,
-                namedEndpoint != null ? SSLMode.CLIENT : SSLMode.SERVER,
-                sslContext,
-                sslBufferManagement,
-                initializer,
-                verifier,
-                new Callback<SSLIOSession>() {
-
-                    @Override
-                    public void execute(final SSLIOSession sslSession) {
-                        if (connected.compareAndSet(false, true)) {
-                            final IOEventHandler handler = getEventHandler();
-                            try {
-                                handler.connected(InternalIOSession.this);
-                            } catch (final Exception ex) {
-                                handler.exception(InternalIOSession.this, ex);
-                            }
-                        }
-                    }
-
-                }))) {
-            throw new IllegalStateException("TLS already activated");
-        }
-    }
-
-    @Override
-    public TlsDetails getTlsDetails() {
-        final SSLIOSession sslIoSession = tlsSessionRef.get();
-        return sslIoSession != null ? sslIoSession.getTlsDetails() : null;
-    }
-
-    @Override
-    public void close() {
-        if (closed.compareAndSet(false, true)) {
-            try {
-                getSessionImpl().close();
-            } finally {
-                closedSessions.add(this);
-            }
-        }
-    }
-
-    @Override
-    public void shutdown(final ShutdownType shutdownType) {
-        if (closed.compareAndSet(false, true)) {
-            try {
-                getSessionImpl().shutdown(shutdownType);
-            } finally {
-                closedSessions.add(this);
-            }
-        }
-    }
-
-    @Override
-    public int getStatus() {
-        return getSessionImpl().getStatus();
-    }
-
-    @Override
-    public boolean isClosed() {
-        return getSessionImpl().isClosed();
-    }
-
-    @Override
-    public IOEventHandler getHandler() {
-        return ioSession.getHandler();
-    }
-
-    @Override
-    public void setHandler(final IOEventHandler eventHandler) {
-        ioSession.setHandler(eventHandler);
-    }
-
-    @Override
-    public void addLast(final Command command) {
-        getSessionImpl().addLast(command);
-    }
-
-    @Override
-    public void addFirst(final Command command) {
-        getSessionImpl().addFirst(command);
-    }
-
-    @Override
-    public Command getCommand() {
-        return getSessionImpl().getCommand();
-    }
-
-    @Override
-    public ByteChannel channel() {
-        return getSessionImpl().channel();
-    }
-
-    @Override
-    public SocketAddress getRemoteAddress() {
-        return ioSession.getRemoteAddress();
-    }
-
-    @Override
-    public SocketAddress getLocalAddress() {
-        return ioSession.getLocalAddress();
-    }
-
-    @Override
-    public int getEventMask() {
-        return getSessionImpl().getEventMask();
-    }
-
-    @Override
-    public void setEventMask(final int ops) {
-        getSessionImpl().setEventMask(ops);
-    }
-
-    @Override
-    public void setEvent(final int op) {
-        getSessionImpl().setEvent(op);
-    }
-
-    @Override
-    public void clearEvent(final int op) {
-        getSessionImpl().clearEvent(op);
-    }
-
-    @Override
-    public int getSocketTimeout() {
-        return ioSession.getSocketTimeout();
-    }
-
-    @Override
-    public void setSocketTimeout(final int timeout) {
-        ioSession.setSocketTimeout(timeout);
-    }
-
-    @Override
-    public String toString() {
-        return getSessionImpl().toString();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListeningIOReactor.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListeningIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListeningIOReactor.java
index 17dcf60..bbd2ecb 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListeningIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ListeningIOReactor.java
@@ -27,58 +27,12 @@
 
 package org.apache.hc.core5.reactor;
 
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.util.Set;
-
 /**
  * ListeningIOReactor represents an I/O reactor capable of listening for
  * incoming connections on one or several ports.
  *
  * @since 4.0
  */
-public interface ListeningIOReactor extends IOReactor {
-
-    /**
-     * Opens a new listener endpoint with the given socket address. Once
-     * the endpoint is fully initialized it starts accepting incoming
-     * connections and propagates I/O activity notifications to the I/O event
-     * dispatcher.
-     * <p>
-     * {@link ListenerEndpoint#waitFor()} can be used to wait for the
-     *  listener to be come ready to accept incoming connections.
-     * <p>
-     * {@link ListenerEndpoint#close()} can be used to shut down
-     * the listener even before it is fully initialized.
-     *
-     * @param address the socket address to listen on.
-     * @return listener endpoint.
-     */
-    ListenerEndpoint listen(SocketAddress address);
-
-    /**
-     * Suspends the I/O reactor preventing it from accepting new connections on
-     * all active endpoints.
-     *
-     * @throws IOException in case of an I/O error.
-     */
-    void pause()
-        throws IOException;
-
-    /**
-     * Resumes the I/O reactor restoring its ability to accept incoming
-     * connections on all active endpoints.
-     *
-     * @throws IOException in case of an I/O error.
-     */
-    void resume()
-        throws IOException;
-
-    /**
-     * Returns a set of endpoints for this I/O reactor.
-     *
-     * @return set of endpoints.
-     */
-    Set<ListenerEndpoint> getEndpoints();
+public interface ListeningIOReactor extends IOReactorService, ConnectionAcceptor {
 
 }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/MultiCoreIOReactor.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/MultiCoreIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/MultiCoreIOReactor.java
new file mode 100644
index 0000000..8811ed2
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/MultiCoreIOReactor.java
@@ -0,0 +1,137 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.reactor;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hc.core5.io.ShutdownType;
+import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.TimeValue;
+
+class MultiCoreIOReactor implements IOReactor {
+
+    private final IOReactor[] ioReactors;
+    private final Thread[] threads;
+    private final AtomicReference<IOReactorStatus> status;
+
+    MultiCoreIOReactor(final IOReactor[] ioReactors, final Thread[] threads) {
+        super();
+        this.ioReactors = ioReactors.clone();
+        this.threads = threads.clone();
+        this.status = new AtomicReference<>(IOReactorStatus.INACTIVE);
+    }
+
+    @Override
+    public IOReactorStatus getStatus() {
+        return this.status.get();
+    }
+
+    /**
+     * Activates all worker I/O reactors.
+     * The I/O main reactor will start reacting to I/O events and triggering
+     * notification methods. The worker I/O reactor in their turn will start
+     * reacting to I/O events and dispatch I/O event notifications to the
+     * {@link IOEventHandler} associated with the given I/O session.
+     */
+    public final void start() {
+        if (this.status.compareAndSet(IOReactorStatus.INACTIVE, IOReactorStatus.ACTIVE))
{
+            for (int i = 0; i < this.threads.length; i++) {
+                this.threads[i].start();
+            }
+        }
+    }
+
+    @Override
+    public final void initiateShutdown() {
+        if (this.status.compareAndSet(IOReactorStatus.ACTIVE, IOReactorStatus.SHUTTING_DOWN))
{
+            for (int i = 0; i < this.ioReactors.length; i++) {
+                final IOReactor ioReactor = this.ioReactors[i];
+                ioReactor.initiateShutdown();
+            }
+        }
+    }
+
+    @Override
+    public final void awaitShutdown(final TimeValue waitTime) throws InterruptedException
{
+        Args.notNull(waitTime, "Wait time");
+        final long deadline = System.currentTimeMillis() + waitTime.toMillis();
+        long remaining = waitTime.toMillis();
+        for (int i = 0; i < this.ioReactors.length; i++) {
+            final IOReactor ioReactor = this.ioReactors[i];
+            if (ioReactor.getStatus().compareTo(IOReactorStatus.SHUT_DOWN) < 0) {
+                ioReactor.awaitShutdown(TimeValue.of(remaining, TimeUnit.MILLISECONDS));
+                remaining = deadline - System.currentTimeMillis();
+                if (remaining <= 0) {
+                    return;
+                }
+            }
+        }
+        for (int i = 0; i < this.threads.length; i++) {
+            final Thread thread = this.threads[i];
+            thread.join(remaining);
+            remaining = deadline - System.currentTimeMillis();
+            if (remaining <= 0) {
+                return;
+            }
+        }
+    }
+
+    @Override
+    public final void shutdown(final ShutdownType shutdownType) {
+        final IOReactorStatus currentStatus = this.status.get();
+        if (currentStatus == IOReactorStatus.INACTIVE || currentStatus == IOReactorStatus.SHUT_DOWN)
{
+            return;
+        }
+        if (shutdownType == ShutdownType.GRACEFUL) {
+            initiateShutdown();
+            try {
+                awaitShutdown(TimeValue.ofSeconds(5));
+            } catch (final InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        } else {
+            this.status.compareAndSet(IOReactorStatus.ACTIVE, IOReactorStatus.SHUTTING_DOWN);
+            for (int i = 0; i < this.ioReactors.length; i++) {
+                final IOReactor ioReactor = this.ioReactors[i];
+                ioReactor.shutdown(ShutdownType.IMMEDIATE);
+            }
+            for (int i = 0; i < this.threads.length; i++) {
+                final Thread thread = this.threads[i];
+                thread.interrupt();
+            }
+        }
+        this.status.set(IOReactorStatus.SHUT_DOWN);
+    }
+
+    @Override
+    public final void close() {
+        shutdown(ShutdownType.GRACEFUL);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequest.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequest.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequest.java
index 3923cef..539b218 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequest.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequest.java
@@ -27,7 +27,6 @@
 
 package org.apache.hc.core5.reactor;
 
-import java.io.IOException;
 import java.net.SocketAddress;
 
 import org.apache.hc.core5.concurrent.Cancellable;
@@ -92,14 +91,13 @@ public interface SessionRequest extends Cancellable {
     TlsCapableIOSession getSession();
 
     /**
-     * Returns {@link IOException} instance if the request could not be
-     * successfully executed due to an I/O error or {@code null} if no
+     * Returns {@link Exception} instance if the request could not be
+     * successfully executed due to an error or {@code null} if no
      * error occurred to this point.
      *
-     * @return I/O exception or {@code null} if no error occurred to
-     * this point.
+     * @return Exception or {@code null} if no error occurred to this point.
      */
-    IOException getException();
+    Exception getException();
 
     /**
      * Waits for completion of this session request.

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequestHandle.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequestHandle.java
b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequestHandle.java
deleted file mode 100644
index 113f905..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequestHandle.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation.  For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-
-package org.apache.hc.core5.reactor;
-
-import org.apache.hc.core5.util.Args;
-
-/**
- * Session request handle class used by I/O reactor implementations to keep
- * a reference to a {@link SessionRequest} along
- * with the time the request was made.
- *
- * @since 4.0
- */
-public class SessionRequestHandle {
-
-    private final SessionRequestImpl sessionRequest;
-    private final long requestTime;
-
-    public SessionRequestHandle(final SessionRequestImpl sessionRequest) {
-        super();
-        Args.notNull(sessionRequest, "Session request");
-        this.sessionRequest = sessionRequest;
-        this.requestTime = System.currentTimeMillis();
-    }
-
-    public SessionRequestImpl getSessionRequest() {
-        return this.sessionRequest;
-    }
-
-    public long getRequestTime() {
-        return this.requestTime;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequestImpl.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequestImpl.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequestImpl.java
index da97537..66cccf6 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequestImpl.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequestImpl.java
@@ -56,7 +56,7 @@ public class SessionRequestImpl implements SessionRequest {
 
     private volatile int connectTimeout;
     private volatile TlsCapableIOSession session = null;
-    private volatile IOException exception = null;
+    private volatile Exception exception = null;
 
     public SessionRequestImpl(
             final NamedEndpoint remoteEndpoint,
@@ -122,7 +122,7 @@ public class SessionRequestImpl implements SessionRequest {
     }
 
     @Override
-    public IOException getException() {
+    public Exception getException() {
         synchronized (this) {
             return this.exception;
         }
@@ -143,7 +143,7 @@ public class SessionRequestImpl implements SessionRequest {
         }
     }
 
-    public void failed(final IOException exception) {
+    public void failed(final Exception exception) {
         if (exception == null) {
             return;
         }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
new file mode 100644
index 0000000..fb86462
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
@@ -0,0 +1,337 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.reactor;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.net.NamedEndpoint;
+import org.apache.hc.core5.util.Args;
+
+class SingleCoreIOReactor extends AbstractSingleCoreIOReactor {
+
+    private final IOEventHandlerFactory eventHandlerFactory;
+    private final IOReactorConfig reactorConfig;
+    private final Queue<InternalDataChannel> closedSessions;
+    private final Queue<SocketChannel> channelQueue;
+    private final Queue<SessionRequestImpl> requestQueue;
+    private final AtomicBoolean shutdownInitiated;
+    private final Callback<IOSession> sessionShutdownCallback;
+
+    private volatile long lastTimeoutCheck;
+
+    SingleCoreIOReactor(
+            final Queue<ExceptionEvent> auditLog,
+            final IOEventHandlerFactory eventHandlerFactory,
+            final IOReactorConfig reactorConfig,
+            final Callback<IOSession> sessionShutdownCallback) {
+        super(auditLog);
+        this.eventHandlerFactory = Args.notNull(eventHandlerFactory, "Event handler factory");
+        this.reactorConfig = Args.notNull(reactorConfig, "I/O reactor config");
+        this.sessionShutdownCallback = sessionShutdownCallback;
+        this.shutdownInitiated = new AtomicBoolean(false);
+        this.closedSessions = new ConcurrentLinkedQueue<>();
+        this.channelQueue = new ConcurrentLinkedQueue<>();
+        this.requestQueue = new ConcurrentLinkedQueue<>();
+    }
+
+    void enqueueChannel(final SocketChannel socketChannel) throws IOReactorShutdownException
{
+        Args.notNull(socketChannel, "SocketChannel");
+        if (getStatus().compareTo(IOReactorStatus.ACTIVE) > 0) {
+            throw new IOReactorShutdownException("I/O reactor has been shut down");
+        }
+        this.channelQueue.add(socketChannel);
+        this.selector.wakeup();
+    }
+
+    @Override
+    void doTerminate() {
+        closePendingChannels();
+        closePendingConnectionRequests();
+        processClosedSessions();
+    }
+
+    @Override
+    void doExecute() throws IOException {
+        final long selectTimeout = this.reactorConfig.getSelectInterval();
+        while (!Thread.currentThread().isInterrupted()) {
+
+            final int readyCount = this.selector.select(selectTimeout);
+
+            if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
+                if (this.shutdownInitiated.compareAndSet(false, true)) {
+                    initiateSessionShutdown();
+                }
+                closePendingChannels();
+            }
+            if (getStatus().compareTo(IOReactorStatus.SHUT_DOWN) == 0) {
+                break;
+            }
+
+            // Process selected I/O events
+            if (readyCount > 0) {
+                processEvents(this.selector.selectedKeys());
+            }
+
+            validateActiveChannels();
+
+            // Process closed sessions
+            processClosedSessions();
+
+            // If active process new channels
+            if (getStatus().compareTo(IOReactorStatus.ACTIVE) == 0) {
+                processPendingChannels();
+                processPendingConnectionRequests();
+            }
+
+            // Exit select loop if graceful shutdown has been completed
+            if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) == 0 && this.selector.keys().isEmpty())
{
+                break;
+            }
+            if (getStatus().compareTo(IOReactorStatus.SHUT_DOWN) == 0) {
+                break;
+            }
+        }
+    }
+
+    private void initiateSessionShutdown() {
+        if (this.sessionShutdownCallback != null) {
+            final Set<SelectionKey> keys = this.selector.keys();
+            for (final SelectionKey key : keys) {
+                final InternalChannel channel = (InternalChannel) key.attachment();
+                if (channel instanceof InternalDataChannel) {
+                    this.sessionShutdownCallback.execute((InternalDataChannel) channel);
+                }
+            }
+        }
+    }
+
+    private void validateActiveChannels() {
+        final long currentTime = System.currentTimeMillis();
+        if( (currentTime - this.lastTimeoutCheck) >= this.reactorConfig.getSelectInterval())
{
+            this.lastTimeoutCheck = currentTime;
+            for (final SelectionKey key : this.selector.keys()) {
+                timeoutCheck(key, currentTime);
+            }
+        }
+    }
+
+    private void processEvents(final Set<SelectionKey> selectedKeys) {
+        for (final SelectionKey key : selectedKeys) {
+            final InternalChannel channel = (InternalChannel) key.attachment();
+            channel.handleIOEvent(key.readyOps());
+        }
+        selectedKeys.clear();
+    }
+
+    private void processPendingChannels() throws IOException {
+        SocketChannel socketChannel;
+        while ((socketChannel = this.channelQueue.poll()) != null) {
+            try {
+                prepareSocket(socketChannel.socket());
+                socketChannel.configureBlocking(false);
+            } catch (final IOException ex) {
+                addExceptionEvent(ex);
+                try {
+                    socketChannel.close();
+                } catch (final IOException ex2) {
+                    addExceptionEvent(ex2);
+                }
+                throw ex;
+            }
+            final SelectionKey key;
+            try {
+                key = socketChannel.register(this.selector, SelectionKey.OP_READ);
+            } catch (final ClosedChannelException ex) {
+                return;
+            }
+            final InternalDataChannel dataChannel = new InternalDataChannel(key, socketChannel,
null, closedSessions);
+            dataChannel.setHandler(this.eventHandlerFactory.createHandler(dataChannel, null));
+            dataChannel.setSocketTimeout(this.reactorConfig.getSoTimeout().toMillisIntBound());
+            key.attach(dataChannel);
+            dataChannel.handleIOEvent(SelectionKey.OP_CONNECT);
+        }
+    }
+
+    private void processClosedSessions() {
+        for (;;) {
+            final InternalDataChannel dataChannel = this.closedSessions.poll();
+            if (dataChannel == null) {
+                break;
+            }
+            try {
+                dataChannel.disconnected();
+            } catch (final CancelledKeyException ex) {
+                // ignore and move on
+            }
+        }
+    }
+
+    private void timeoutCheck(final SelectionKey key, final long now) {
+        final InternalChannel channel = (InternalChannel) key.attachment();
+        if (channel != null) {
+            channel.checkTimeout(now);
+        }
+    }
+
+    public SessionRequest connect(
+            final NamedEndpoint remoteEndpoint,
+            final SocketAddress remoteAddress,
+            final SocketAddress localAddress,
+            final Object attachment,
+            final SessionRequestCallback callback) throws IOReactorShutdownException {
+        Args.notNull(remoteEndpoint, "Remote endpoint");
+        final SessionRequestImpl sessionRequest = new SessionRequestImpl(
+                remoteEndpoint,
+                remoteAddress != null ? remoteAddress : new InetSocketAddress(remoteEndpoint.getHostName(),
remoteEndpoint.getPort()),
+                localAddress,
+                attachment,
+                callback);
+
+        this.requestQueue.add(sessionRequest);
+        this.selector.wakeup();
+
+        return sessionRequest;
+    }
+
+    private void prepareSocket(final Socket socket) throws IOException {
+        socket.setTcpNoDelay(this.reactorConfig.isTcpNoDelay());
+        socket.setKeepAlive(this.reactorConfig.isSoKeepalive());
+        if (this.reactorConfig.getSndBufSize() > 0) {
+            socket.setSendBufferSize(this.reactorConfig.getSndBufSize());
+        }
+        if (this.reactorConfig.getRcvBufSize() > 0) {
+            socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
+        }
+        final int linger = this.reactorConfig.getSoLinger().toSecondsIntBound();
+        if (linger >= 0) {
+            socket.setSoLinger(true, linger);
+        }
+    }
+
+    private void validateAddress(final SocketAddress address) throws UnknownHostException
{
+        if (address instanceof InetSocketAddress) {
+            final InetSocketAddress endpoint = (InetSocketAddress) address;
+            if (endpoint.isUnresolved()) {
+                throw new UnknownHostException(endpoint.getHostName());
+            }
+        }
+    }
+
+    private void processPendingConnectionRequests() {
+        SessionRequestImpl sessionRequest;
+        while ((sessionRequest = this.requestQueue.poll()) != null) {
+            if (!sessionRequest.isCompleted()) {
+                final SocketChannel socketChannel;
+                try {
+                    socketChannel = SocketChannel.open();
+                } catch (final IOException ex) {
+                    sessionRequest.failed(ex);
+                    return;
+                }
+                try {
+                    processConnectionRequest(socketChannel, sessionRequest);
+                } catch (final IOException ex) {
+                    try {
+                        socketChannel.close();
+                    } catch (IOException ignore) {
+                    }
+                    sessionRequest.failed(ex);
+                }
+            }
+        }
+    }
+
+    private void processConnectionRequest(final SocketChannel socketChannel, final SessionRequestImpl
sessionRequest) throws IOException {
+        validateAddress(sessionRequest.getLocalAddress());
+        validateAddress(sessionRequest.getRemoteAddress());
+
+        socketChannel.configureBlocking(false);
+        prepareSocket(socketChannel.socket());
+
+        if (sessionRequest.getLocalAddress() != null) {
+            final Socket sock = socketChannel.socket();
+            sock.setReuseAddress(this.reactorConfig.isSoReuseAddress());
+            sock.bind(sessionRequest.getLocalAddress());
+        }
+        final boolean connected = socketChannel.connect(sessionRequest.getRemoteAddress());
+        final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT
| SelectionKey.OP_READ);
+        sessionRequest.setKey(key);
+        final InternalChannel channel = new InternalConnectChannel(key, socketChannel, sessionRequest,
new InternalDataChannelFactory() {
+
+            @Override
+            public InternalDataChannel create(
+                    final SelectionKey key,
+                    final SocketChannel socketChannel,
+                    final NamedEndpoint namedEndpoint,
+                    final Object attachment) {
+                final InternalDataChannel dataChannel = new InternalDataChannel(key, socketChannel,
namedEndpoint, closedSessions);
+                dataChannel.setHandler(eventHandlerFactory.createHandler(dataChannel, attachment));
+                dataChannel.setSocketTimeout(reactorConfig.getSoTimeout().toMillisIntBound());
+                return dataChannel;
+            }
+
+        });
+        if (connected) {
+            channel.handleIOEvent(SelectionKey.OP_CONNECT);
+        } else {
+            key.attach(channel);
+        }
+    }
+
+    private void closePendingChannels() {
+        SocketChannel socketChannel;
+        while ((socketChannel = this.channelQueue.poll()) != null) {
+            try {
+                socketChannel.close();
+            } catch (final IOException ex) {
+                addExceptionEvent(ex);
+            }
+        }
+    }
+
+    private void closePendingConnectionRequests() {
+        SessionRequestImpl sessionRequest;
+        while ((sessionRequest = this.requestQueue.poll()) != null) {
+            sessionRequest.cancel();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/a7f77921/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreListeningIOReactor.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreListeningIOReactor.java
b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreListeningIOReactor.java
new file mode 100644
index 0000000..533dde7
--- /dev/null
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreListeningIOReactor.java
@@ -0,0 +1,235 @@
+/*
+ * ====================================================================
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * ====================================================================
+ *
+ * This software consists of voluntary contributions made by many
+ * individuals on behalf of the Apache Software Foundation.  For more
+ * information on the Apache Software Foundation, please see
+ * <http://www.apache.org/>.
+ *
+ */
+
+package org.apache.hc.core5.reactor;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.SocketAddress;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.Channel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hc.core5.function.Callback;
+
+class SingleCoreListeningIOReactor extends AbstractSingleCoreIOReactor implements ConnectionAcceptor
{
+
+    private final IOReactorConfig reactorConfig;
+    private final Callback<SocketChannel> callback;
+    private final Queue<ListenerEndpointImpl> requestQueue;
+    private final ConcurrentMap<ListenerEndpoint, Boolean> endpoints;
+
+    private final AtomicBoolean paused;
+
+    SingleCoreListeningIOReactor(
+            final Queue<ExceptionEvent> auditLog,
+            final IOReactorConfig ioReactorConfig,
+            final Callback<SocketChannel> callback) {
+        super(auditLog);
+        this.reactorConfig = ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT;
+        this.callback = callback;
+        this.requestQueue = new ConcurrentLinkedQueue<>();
+        this.endpoints = new ConcurrentHashMap<>();
+        this.paused = new AtomicBoolean(false);
+    }
+
+    @Override
+    void doTerminate() {
+        ListenerEndpointImpl request;
+        while ((request = this.requestQueue.poll()) != null) {
+            request.cancel();
+        }
+    }
+
+    @Override
+    protected final void doExecute() throws IOException {
+        final long selectTimeout = this.reactorConfig.getSelectInterval();
+        while (!Thread.currentThread().isInterrupted()) {
+            if (getStatus().compareTo(IOReactorStatus.ACTIVE) != 0) {
+                break;
+            }
+
+            final int readyCount = this.selector.select(selectTimeout);
+
+            if (getStatus().compareTo(IOReactorStatus.ACTIVE) != 0) {
+                break;
+            }
+
+            processEvents(readyCount);
+        }
+    }
+
+    private void processEvents(final int readyCount) throws IOException {
+        if (!this.paused.get()) {
+            processSessionRequests();
+        }
+
+        if (readyCount > 0) {
+            final Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
+            for (final SelectionKey key : selectedKeys) {
+
+                processEvent(key);
+
+            }
+            selectedKeys.clear();
+        }
+    }
+
+    private void processEvent(final SelectionKey key) throws IOException {
+        try {
+
+            if (key.isAcceptable()) {
+
+                final ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
+                for (;;) {
+                    final SocketChannel socketChannel = serverChannel.accept();
+                    if (socketChannel == null) {
+                        break;
+                    }
+                    this.callback.execute(socketChannel);
+                }
+            }
+
+        } catch (final CancelledKeyException ex) {
+            final ListenerEndpoint endpoint = (ListenerEndpoint) key.attachment();
+            this.endpoints.remove(endpoint);
+            key.attach(null);
+        }
+    }
+
+    private ListenerEndpointImpl createEndpoint(final SocketAddress address) {
+        return new ListenerEndpointImpl(
+                address,
+                new ListenerEndpointClosedCallback() {
+
+                    @Override
+                    public void endpointClosed(final ListenerEndpoint endpoint) {
+                        endpoints.remove(endpoint);
+                    }
+
+                });
+    }
+
+    @Override
+    public ListenerEndpoint listen(final SocketAddress address) {
+        if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
+            throw new IOReactorShutdownException("I/O reactor has been shut down");
+        }
+        final ListenerEndpointImpl request = createEndpoint(address);
+        this.requestQueue.add(request);
+        this.selector.wakeup();
+        return request;
+    }
+
+    private static void closeChannel(final Channel channel) {
+        try {
+            channel.close();
+        } catch (final IOException ignore) {
+        }
+    }
+
+    private void processSessionRequests() throws IOException {
+        ListenerEndpointImpl request;
+        while ((request = this.requestQueue.poll()) != null) {
+            final SocketAddress address = request.getAddress();
+            final ServerSocketChannel serverChannel = ServerSocketChannel.open();
+            try {
+                final ServerSocket socket = serverChannel.socket();
+                socket.setReuseAddress(this.reactorConfig.isSoReuseAddress());
+                if (this.reactorConfig.getRcvBufSize() > 0) {
+                    socket.setReceiveBufferSize(this.reactorConfig.getRcvBufSize());
+                }
+                serverChannel.configureBlocking(false);
+                socket.bind(address, this.reactorConfig.getBacklogSize());
+            } catch (final IOException ex) {
+                closeChannel(serverChannel);
+                request.failed(ex);
+                return;
+            }
+            try {
+                final SelectionKey key = serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
+                key.attach(request);
+                request.setKey(key);
+            } catch (final IOException ex) {
+                closeChannel(serverChannel);
+                throw ex;
+            }
+
+            this.endpoints.put(request, Boolean.TRUE);
+            request.completed(serverChannel.socket().getLocalSocketAddress());
+        }
+    }
+
+    @Override
+    public Set<ListenerEndpoint> getEndpoints() {
+        final Set<ListenerEndpoint> set = new HashSet<>();
+        final Iterator<ListenerEndpoint> it = this.endpoints.keySet().iterator();
+        while (it.hasNext()) {
+            final ListenerEndpoint endpoint = it.next();
+            if (!endpoint.isClosed()) {
+                set.add(endpoint);
+            } else {
+                it.remove();
+            }
+        }
+        return set;
+    }
+
+    @Override
+    public void pause() throws IOException {
+        if (paused.compareAndSet(false, true)) {
+            final Iterator<ListenerEndpoint> it = this.endpoints.keySet().iterator();
+            while (it.hasNext()) {
+                final ListenerEndpoint endpoint = it.next();
+                if (!endpoint.isClosed()) {
+                    endpoint.close();
+                    final ListenerEndpointImpl request = createEndpoint(endpoint.getAddress());
+                    this.requestQueue.add(request);
+                }
+                it.remove();
+            }
+        }
+    }
+
+    @Override
+    public void resume() throws IOException {
+        if (paused.compareAndSet(true, false)) {
+            this.selector.wakeup();
+        }
+    }
+
+}


Mime
View raw message