hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject [6/7] httpcomponents-core git commit: Improved I/O reactor APIs with a smaller public API footprint
Date Thu, 15 Jun 2017 05:59:37 GMT
http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/8b5b5ba3/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequestCallback.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequestCallback.java
b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequestCallback.java
deleted file mode 100644
index 589be42..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequestCallback.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation.  For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-
-package org.apache.hc.core5.reactor;
-
-/**
- * SessionRequestCallback interface can be used to get notifications of
- * completion of session requests asynchronously without having to wait
- * for it, blocking the current thread of execution.
- *
- * @since 4.0
- */
-public interface SessionRequestCallback {
-
-    /**
-     * Triggered on successful completion of a {@link SessionRequest}.
-     * The {@link SessionRequest#getSession()} method can now be used to obtain
-     * the new I/O session.
-     *
-     * @param request session request.
-     */
-    void completed(SessionRequest request);
-
-    /**
-     * Triggered on unsuccessful completion a {@link SessionRequest}.
-     * The {@link SessionRequest#getException()} method can now be used to
-     * obtain the cause of the error.
-     *
-     * @param request session request.
-     */
-    void failed(SessionRequest request);
-
-    /**
-     * Triggered if a {@link SessionRequest} times out.
-     *
-     * @param request session request.
-     */
-    void timeout(SessionRequest request);
-
-    /**
-     * Triggered on cancellation of a {@link SessionRequest}.
-     *
-     * @param request session request.
-     */
-    void cancelled(SessionRequest request);
-
-}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/8b5b5ba3/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequestImpl.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequestImpl.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequestImpl.java
deleted file mode 100644
index 66cccf6..0000000
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SessionRequestImpl.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * ====================================================================
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation.  For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
-
-package org.apache.hc.core5.reactor;
-
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.nio.channels.Channel;
-import java.nio.channels.SelectionKey;
-
-import org.apache.hc.core5.annotation.Contract;
-import org.apache.hc.core5.annotation.ThreadingBehavior;
-import org.apache.hc.core5.net.NamedEndpoint;
-import org.apache.hc.core5.util.Args;
-
-/**
- * Default implementation of {@link SessionRequest}.
- *
- * @since 4.0
- */
-@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)
-public class SessionRequestImpl implements SessionRequest {
-
-    private volatile boolean completed;
-    private volatile SelectionKey key;
-
-    private final NamedEndpoint remoteEndpoint;
-    private final SocketAddress remoteAddress;
-    private final SocketAddress localAddress;
-    private final Object attachment;
-    private final SessionRequestCallback callback;
-
-    private volatile int connectTimeout;
-    private volatile TlsCapableIOSession session = null;
-    private volatile Exception exception = null;
-
-    public SessionRequestImpl(
-            final NamedEndpoint remoteEndpoint,
-            final SocketAddress remoteAddress,
-            final SocketAddress localAddress,
-            final Object attachment,
-            final SessionRequestCallback callback) {
-        super();
-        this.remoteEndpoint = Args.notNull(remoteEndpoint, "Remote endpoint");
-        this.remoteAddress = Args.notNull(remoteAddress, "Remote address");
-        this.localAddress = localAddress;
-        this.attachment = attachment;
-        this.callback = callback;
-        this.connectTimeout = 0;
-    }
-
-    @Override
-    public NamedEndpoint getRemoteEndpoint() {
-        return this.remoteEndpoint;
-    }
-
-    @Override
-    public SocketAddress getRemoteAddress() {
-        return this.remoteAddress;
-    }
-
-    @Override
-    public SocketAddress getLocalAddress() {
-        return this.localAddress;
-    }
-
-    @Override
-    public Object getAttachment() {
-        return this.attachment;
-    }
-
-    @Override
-    public boolean isCompleted() {
-        return this.completed;
-    }
-
-    protected void setKey(final SelectionKey key) {
-        this.key = key;
-    }
-
-    @Override
-    public void waitFor() throws InterruptedException {
-        if (this.completed) {
-            return;
-        }
-        synchronized (this) {
-            while (!this.completed) {
-                wait();
-            }
-        }
-    }
-
-    @Override
-    public TlsCapableIOSession getSession() {
-        synchronized (this) {
-            return this.session;
-        }
-    }
-
-    @Override
-    public Exception getException() {
-        synchronized (this) {
-            return this.exception;
-        }
-    }
-
-    public void completed(final TlsCapableIOSession session) {
-        Args.notNull(session, "Session");
-        if (this.completed) {
-            return;
-        }
-        this.completed = true;
-        synchronized (this) {
-            this.session = session;
-            if (this.callback != null) {
-                this.callback.completed(this);
-            }
-            notifyAll();
-        }
-    }
-
-    public void failed(final Exception exception) {
-        if (exception == null) {
-            return;
-        }
-        if (this.completed) {
-            return;
-        }
-        this.completed = true;
-        final SelectionKey key = this.key;
-        if (key != null) {
-            key.cancel();
-            final Channel channel = key.channel();
-            try {
-                channel.close();
-            } catch (final IOException ignore) {}
-        }
-        synchronized (this) {
-            this.exception = exception;
-            if (this.callback != null) {
-                this.callback.failed(this);
-            }
-            notifyAll();
-        }
-    }
-
-    public void timeout() {
-        if (this.completed) {
-            return;
-        }
-        this.completed = true;
-        final SelectionKey key = this.key;
-        if (key != null) {
-            key.cancel();
-            final Channel channel = key.channel();
-            if (channel.isOpen()) {
-                try {
-                    channel.close();
-                } catch (final IOException ignore) {}
-            }
-        }
-        synchronized (this) {
-            if (this.callback != null) {
-                this.callback.timeout(this);
-            }
-        }
-    }
-
-    @Override
-    public int getConnectTimeout() {
-        return this.connectTimeout;
-    }
-
-    @Override
-    public void setConnectTimeout(final int timeout) {
-        if (this.connectTimeout != timeout) {
-            this.connectTimeout = timeout;
-            final SelectionKey key = this.key;
-            if (key != null) {
-                key.selector().wakeup();
-            }
-        }
-    }
-
-    @Override
-    public boolean cancel() {
-        if (this.completed) {
-            return false;
-        }
-        this.completed = true;
-        final SelectionKey key = this.key;
-        if (key != null) {
-            key.cancel();
-            final Channel channel = key.channel();
-            if (channel.isOpen()) {
-                try {
-                    channel.close();
-                } catch (final IOException ignore) {}
-            }
-        }
-        synchronized (this) {
-            if (this.callback != null) {
-                this.callback.cancelled(this);
-            }
-            notifyAll();
-        }
-        return true;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/8b5b5ba3/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
index fb86462..05f8c32 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
@@ -39,19 +39,23 @@ import java.nio.channels.SocketChannel;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.hc.core5.concurrent.ComplexFuture;
+import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.net.NamedEndpoint;
 import org.apache.hc.core5.util.Args;
+import org.apache.hc.core5.util.TimeValue;
 
-class SingleCoreIOReactor extends AbstractSingleCoreIOReactor {
+class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements ConnectionInitiator
{
 
     private final IOEventHandlerFactory eventHandlerFactory;
     private final IOReactorConfig reactorConfig;
     private final Queue<InternalDataChannel> closedSessions;
     private final Queue<SocketChannel> channelQueue;
-    private final Queue<SessionRequestImpl> requestQueue;
+    private final Queue<IOSessionRequest> requestQueue;
     private final AtomicBoolean shutdownInitiated;
     private final Callback<IOSession> sessionShutdownCallback;
 
@@ -211,24 +215,28 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor {
         }
     }
 
-    public SessionRequest connect(
+    @Override
+    public Future<IOSession> connect(
             final NamedEndpoint remoteEndpoint,
             final SocketAddress remoteAddress,
             final SocketAddress localAddress,
+            final TimeValue timeout,
             final Object attachment,
-            final SessionRequestCallback callback) throws IOReactorShutdownException {
+            final FutureCallback<IOSession> callback) throws IOReactorShutdownException
{
         Args.notNull(remoteEndpoint, "Remote endpoint");
-        final SessionRequestImpl sessionRequest = new SessionRequestImpl(
+        final ComplexFuture<IOSession> future = new ComplexFuture<>(callback);
+        final IOSessionRequest sessionRequest = new IOSessionRequest(
                 remoteEndpoint,
                 remoteAddress != null ? remoteAddress : new InetSocketAddress(remoteEndpoint.getHostName(),
remoteEndpoint.getPort()),
                 localAddress,
+                timeout,
                 attachment,
-                callback);
+                future);
 
         this.requestQueue.add(sessionRequest);
         this.selector.wakeup();
 
-        return sessionRequest;
+        return future;
     }
 
     private void prepareSocket(final Socket socket) throws IOException {
@@ -256,9 +264,9 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor {
     }
 
     private void processPendingConnectionRequests() {
-        SessionRequestImpl sessionRequest;
+        IOSessionRequest sessionRequest;
         while ((sessionRequest = this.requestQueue.poll()) != null) {
-            if (!sessionRequest.isCompleted()) {
+            if (!sessionRequest.isCancelled()) {
                 final SocketChannel socketChannel;
                 try {
                     socketChannel = SocketChannel.open();
@@ -279,21 +287,20 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor {
         }
     }
 
-    private void processConnectionRequest(final SocketChannel socketChannel, final SessionRequestImpl
sessionRequest) throws IOException {
-        validateAddress(sessionRequest.getLocalAddress());
-        validateAddress(sessionRequest.getRemoteAddress());
+    private void processConnectionRequest(final SocketChannel socketChannel, final IOSessionRequest
sessionRequest) throws IOException {
+        validateAddress(sessionRequest.localAddress);
+        validateAddress(sessionRequest.remoteAddress);
 
         socketChannel.configureBlocking(false);
         prepareSocket(socketChannel.socket());
 
-        if (sessionRequest.getLocalAddress() != null) {
+        if (sessionRequest.localAddress != null) {
             final Socket sock = socketChannel.socket();
             sock.setReuseAddress(this.reactorConfig.isSoReuseAddress());
-            sock.bind(sessionRequest.getLocalAddress());
+            sock.bind(sessionRequest.localAddress);
         }
-        final boolean connected = socketChannel.connect(sessionRequest.getRemoteAddress());
+        final boolean connected = socketChannel.connect(sessionRequest.remoteAddress);
         final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT
| SelectionKey.OP_READ);
-        sessionRequest.setKey(key);
         final InternalChannel channel = new InternalConnectChannel(key, socketChannel, sessionRequest,
new InternalDataChannelFactory() {
 
             @Override
@@ -313,6 +320,7 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor {
             channel.handleIOEvent(SelectionKey.OP_CONNECT);
         } else {
             key.attach(channel);
+            sessionRequest.assign(channel);
         }
     }
 
@@ -328,7 +336,7 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor {
     }
 
     private void closePendingConnectionRequests() {
-        SessionRequestImpl sessionRequest;
+        IOSessionRequest sessionRequest;
         while ((sessionRequest = this.requestQueue.poll()) != null) {
             sessionRequest.cancel();
         }

http://git-wip-us.apache.org/repos/asf/httpcomponents-core/blob/8b5b5ba3/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreListeningIOReactor.java
----------------------------------------------------------------------
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreListeningIOReactor.java
b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreListeningIOReactor.java
index 533dde7..b83cc5c 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreListeningIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreListeningIOReactor.java
@@ -31,7 +31,6 @@ import java.io.IOException;
 import java.net.ServerSocket;
 import java.net.SocketAddress;
 import java.nio.channels.CancelledKeyException;
-import java.nio.channels.Channel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
@@ -42,15 +41,18 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.hc.core5.concurrent.BasicFuture;
+import org.apache.hc.core5.concurrent.FutureCallback;
 import org.apache.hc.core5.function.Callback;
 
 class SingleCoreListeningIOReactor extends AbstractSingleCoreIOReactor implements ConnectionAcceptor
{
 
     private final IOReactorConfig reactorConfig;
     private final Callback<SocketChannel> callback;
-    private final Queue<ListenerEndpointImpl> requestQueue;
+    private final Queue<ListenerEndpointRequest> requestQueue;
     private final ConcurrentMap<ListenerEndpoint, Boolean> endpoints;
 
     private final AtomicBoolean paused;
@@ -69,7 +71,7 @@ class SingleCoreListeningIOReactor extends AbstractSingleCoreIOReactor implement
 
     @Override
     void doTerminate() {
-        ListenerEndpointImpl request;
+        ListenerEndpointRequest request;
         while ((request = this.requestQueue.poll()) != null) {
             request.cancel();
         }
@@ -131,41 +133,24 @@ class SingleCoreListeningIOReactor extends AbstractSingleCoreIOReactor
implement
         }
     }
 
-    private ListenerEndpointImpl createEndpoint(final SocketAddress address) {
-        return new ListenerEndpointImpl(
-                address,
-                new ListenerEndpointClosedCallback() {
-
-                    @Override
-                    public void endpointClosed(final ListenerEndpoint endpoint) {
-                        endpoints.remove(endpoint);
-                    }
-
-                });
-    }
-
     @Override
-    public ListenerEndpoint listen(final SocketAddress address) {
+    public Future<ListenerEndpoint> listen(final SocketAddress address, final FutureCallback<ListenerEndpoint>
callback) {
         if (getStatus().compareTo(IOReactorStatus.SHUTTING_DOWN) >= 0) {
             throw new IOReactorShutdownException("I/O reactor has been shut down");
         }
-        final ListenerEndpointImpl request = createEndpoint(address);
-        this.requestQueue.add(request);
+        final BasicFuture<ListenerEndpoint> future = new BasicFuture<>(callback);
+        this.requestQueue.add(new ListenerEndpointRequest(address, future));
         this.selector.wakeup();
-        return request;
-    }
-
-    private static void closeChannel(final Channel channel) {
-        try {
-            channel.close();
-        } catch (final IOException ignore) {
-        }
+        return future;
     }
 
     private void processSessionRequests() throws IOException {
-        ListenerEndpointImpl request;
+        ListenerEndpointRequest request;
         while ((request = this.requestQueue.poll()) != null) {
-            final SocketAddress address = request.getAddress();
+            if (request.isCancelled()) {
+                continue;
+            }
+            final SocketAddress address = request.address;
             final ServerSocketChannel serverChannel = ServerSocketChannel.open();
             try {
                 final ServerSocket socket = serverChannel.socket();
@@ -175,22 +160,19 @@ class SingleCoreListeningIOReactor extends AbstractSingleCoreIOReactor
implement
                 }
                 serverChannel.configureBlocking(false);
                 socket.bind(address, this.reactorConfig.getBacklogSize());
-            } catch (final IOException ex) {
-                closeChannel(serverChannel);
-                request.failed(ex);
-                return;
-            }
-            try {
+
                 final SelectionKey key = serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
                 key.attach(request);
-                request.setKey(key);
+                final ListenerEndpoint endpoint = new ListenerEndpointImpl(key, socket.getLocalSocketAddress());
+                this.endpoints.put(endpoint, Boolean.TRUE);
+                request.completed(endpoint);
             } catch (final IOException ex) {
-                closeChannel(serverChannel);
-                throw ex;
+                try {
+                    serverChannel.close();
+                } catch (final IOException ignore) {
+                }
+                request.failed(ex);
             }
-
-            this.endpoints.put(request, Boolean.TRUE);
-            request.completed(serverChannel.socket().getLocalSocketAddress());
         }
     }
 
@@ -217,8 +199,7 @@ class SingleCoreListeningIOReactor extends AbstractSingleCoreIOReactor
implement
                 final ListenerEndpoint endpoint = it.next();
                 if (!endpoint.isClosed()) {
                     endpoint.close();
-                    final ListenerEndpointImpl request = createEndpoint(endpoint.getAddress());
-                    this.requestQueue.add(request);
+                    this.requestQueue.add(new ListenerEndpointRequest(endpoint.getAddress(),
null));
                 }
                 it.remove();
             }


Mime
View raw message