hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject svn commit: r613136 - in /httpcomponents/httpcore/trunk/module-nio/src: main/java/org/apache/http/impl/nio/reactor/ main/java/org/apache/http/nio/params/ main/java/org/apache/http/nio/reactor/ test/java/org/apache/http/impl/nio/reactor/
Date Fri, 18 Jan 2008 11:57:06 GMT
Author: olegk
Date: Fri Jan 18 03:57:05 2008
New Revision: 613136

URL: http://svn.apache.org/viewvc?rev=613136&view=rev
Log:
Fixed end point management code in DefaultListeningIOReactor

Modified:
    httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java
    httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java
    httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java
    httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/ListenerEndpointImpl.java
    httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/params/NIOReactorPNames.java
    httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/params/NIOReactorParams.java
    httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/ListeningIOReactor.java
    httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultListeningIOReactor.java

Modified: httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java?rev=613136&r1=613135&r2=613136&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java
(original)
+++ httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java
Fri Jan 18 03:57:05 2008
@@ -91,7 +91,7 @@
         }
         this.params = params;
         this.selectTimeout = NIOReactorParams.getSelectInterval(params);
-        this.gracePeriod = 500;
+        this.gracePeriod = NIOReactorParams.getGracePeriod(params);
         this.shutdownMutex = new Object();
         this.workerCount = workerCount;
         if (threadFactory != null) {
@@ -200,16 +200,14 @@
         // Close out all channels
         if (this.selector.isOpen()) {
             Set<SelectionKey> keys = this.selector.keys();
-            synchronized (keys) {
-                for (Iterator<SelectionKey> it = keys.iterator(); it.hasNext(); ) {
-                    try {
-                        SelectionKey key = it.next();
-                        Channel channel = key.channel();
-                        if (channel != null) {
-                            channel.close();
-                        }
-                    } catch (IOException ignore) {
+            for (Iterator<SelectionKey> it = keys.iterator(); it.hasNext(); ) {
+                try {
+                    SelectionKey key = it.next();
+                    Channel channel = key.channel();
+                    if (channel != null) {
+                        channel.close();
                     }
+                } catch (IOException ignore) {
                 }
             }
             // Stop dispatching I/O events

Modified: httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java?rev=613136&r1=613135&r2=613136&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java
(original)
+++ httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java
Fri Jan 18 03:57:05 2008
@@ -101,9 +101,7 @@
         if ((currentTime - this.lastTimeoutCheck) >= this.selectTimeout) {
             this.lastTimeoutCheck = currentTime;
             Set<SelectionKey> keys = this.selector.keys();
-            synchronized (keys) {
-                processTimeouts(keys);
-            }
+            processTimeouts(keys);
         }
     }
 
@@ -129,8 +127,10 @@
                         try {
                             prepareSocket(channel.socket());
                         } catch (IOException ex) {
-                            if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex))
{
-                                throw new IOReactorException("Failure initalizing socket",
ex);
+                            if (this.exceptionHandler == null 
+                                    || !this.exceptionHandler.handle(ex)) {
+                                throw new IOReactorException(
+                                        "Failure initalizing socket", ex);
                             }
                         }
                         ChannelEntry entry = new ChannelEntry(channel, sessionRequest); 

Modified: httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java?rev=613136&r1=613135&r2=613136&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java
(original)
+++ httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java
Fri Jan 18 03:57:05 2008
@@ -37,10 +37,9 @@
 import java.nio.channels.SelectionKey;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -56,14 +55,18 @@
         implements ListeningIOReactor {
 
     private final Queue<ListenerEndpointImpl> requestQueue;
+    private final Set<ListenerEndpointImpl> endpoints;
     private final Set<SocketAddress> pausedEndpoints;
     
+    private volatile boolean paused;
+    
     public DefaultListeningIOReactor(
             int workerCount, 
             final ThreadFactory threadFactory,
             final HttpParams params) throws IOReactorException {
         super(workerCount, threadFactory, params);
         this.requestQueue = new ConcurrentLinkedQueue<ListenerEndpointImpl>();
+        this.endpoints = Collections.synchronizedSet(new HashSet<ListenerEndpointImpl>());
         this.pausedEndpoints = new HashSet<SocketAddress>();
     }
 
@@ -84,7 +87,9 @@
 
     @Override
     protected void processEvents(int readyCount) throws IOReactorException {
-        processSessionRequests();
+        if (!this.paused) {
+            processSessionRequests();
+        }
 
         if (readyCount > 0) {
             Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
@@ -109,8 +114,10 @@
                 try {
                     socketChannel = serverChannel.accept();
                 } catch (IOException ex) {
-                    if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex))
{
-                        throw new IOReactorException("Failure accepting connection", ex);
+                    if (this.exceptionHandler == null 
+                            || !this.exceptionHandler.handle(ex)) {
+                        throw new IOReactorException(
+                                "Failure accepting connection", ex);
                     }
                 }
                 
@@ -118,8 +125,10 @@
                     try {
                         prepareSocket(socketChannel.socket());
                     } catch (IOException ex) {
-                        if (this.exceptionHandler == null || !this.exceptionHandler.handle(ex))
{
-                            throw new IOReactorException("Failure initalizing socket", ex);
+                        if (this.exceptionHandler == null || 
+                                !this.exceptionHandler.handle(ex)) {
+                            throw new IOReactorException(
+                                    "Failure initalizing socket", ex);
                         }
                     }
                     ChannelEntry entry = new ChannelEntry(socketChannel); 
@@ -128,6 +137,8 @@
             }
             
         } catch (CancelledKeyException ex) {
+            ListenerEndpointImpl endpoint = (ListenerEndpointImpl) key.attachment();
+            this.endpoints.remove(endpoint);
             key.attach(null);
         }
     }
@@ -172,50 +183,51 @@
                 throw new IOReactorException("Failure registering channel " +
                         "with the selector", ex);
             }
+            
+            this.endpoints.add(request);
             request.completed(serverChannel);
         }
     }
     
-    public ListenerEndpoint[] getEndpoints() {
-        List<ListenerEndpoint> list = new ArrayList<ListenerEndpoint>();
-        if (this.selector.isOpen()) {
-            Set<SelectionKey> keys = this.selector.keys();
-            this.selector.wakeup();
-            synchronized (keys) {
-                for (Iterator<SelectionKey> it = keys.iterator(); it.hasNext(); ) {
-                    SelectionKey key = it.next();
-                    if (key.isValid()) {
-                        ListenerEndpoint endpoint = (ListenerEndpoint) key.attachment();
-                        if (endpoint != null) {
-                            list.add(endpoint);
-                        }
-                    }
+    public Set<ListenerEndpoint> getEndpoints() {
+        Set<ListenerEndpoint> set = new HashSet<ListenerEndpoint>();
+        synchronized (this.endpoints) {
+            Iterator<ListenerEndpointImpl> it = this.endpoints.iterator();
+            while (it.hasNext()) {
+                ListenerEndpoint endpoint = it.next();
+                if (!endpoint.isClosed()) {
+                    set.add(endpoint);
+                } else {
+                    it.remove();
                 }
             }
         }
-        return list.toArray(new ListenerEndpoint[list.size()]);
+        return set;
     }
 
     public void pause() throws IOException {
-        if (this.selector.isOpen()) {
-            Set<SelectionKey> keys = this.selector.keys();
-            this.selector.wakeup();
-            synchronized (keys) {
-                for (Iterator<SelectionKey> it = keys.iterator(); it.hasNext(); ) {
-                    SelectionKey key = it.next();
-                    if (key.isValid()) {
-                        ListenerEndpointImpl endpoint = (ListenerEndpointImpl) key.attachment();
-                        if (endpoint != null) {
-                            endpoint.close();
-                            this.pausedEndpoints.add(endpoint.getAddress());
-                        }
-                    }
+        if (this.paused) {
+            return;
+        }
+        this.paused = true;
+        synchronized (this.endpoints) {
+            Iterator<ListenerEndpointImpl> it = this.endpoints.iterator();
+            while (it.hasNext()) {
+                ListenerEndpoint endpoint = it.next();
+                if (!endpoint.isClosed()) {
+                    endpoint.close();
+                    this.pausedEndpoints.add(endpoint.getAddress());
                 }
             }
+            this.endpoints.clear();
         }
     }
 
     public void resume() throws IOException {
+        if (!this.paused) {
+            return;
+        }
+        this.paused = false;
         for (SocketAddress socketAddress: this.pausedEndpoints) {
             ListenerEndpointImpl request = new ListenerEndpointImpl(socketAddress);
             this.requestQueue.add(request);

Modified: httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/ListenerEndpointImpl.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/ListenerEndpointImpl.java?rev=613136&r1=613135&r2=613136&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/ListenerEndpointImpl.java
(original)
+++ httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/ListenerEndpointImpl.java
Fri Jan 18 03:57:05 2008
@@ -117,6 +117,7 @@
             return;
         }
         this.completed = true;
+        this.closed = true;
         synchronized (this) {
             notifyAll();
         }

Modified: httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/params/NIOReactorPNames.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/params/NIOReactorPNames.java?rev=613136&r1=613135&r2=613136&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/params/NIOReactorPNames.java
(original)
+++ httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/params/NIOReactorPNames.java
Fri Jan 18 03:57:05 2008
@@ -54,12 +54,20 @@
 
     /**
      * Determines the time interval in milliseconds at which the
-     * I/O reactor wakes up 
-     * to check for timed out sessions and session requests.
+     * I/O reactor wakes up to check for timed out sessions and session requests.
      * <p>
      * This parameter expects a value of type {@link Long}.
      * </p>
      */
     public static final String SELECT_INTERVAL = "http.nio.select-interval"; 
+
+    /**
+     * Determines the grace period the I/O reactors are expected to block
+     * waiting for individual worker threads to terminate cleanly.
+     * <p>
+     * This parameter expects a value of type {@link Long}.
+     * </p>
+     */
+    public static final String GRACE_PERIOD = "http.nio.grace-period"; 
 
 }

Modified: httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/params/NIOReactorParams.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/params/NIOReactorParams.java?rev=613136&r1=613135&r2=613136&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/params/NIOReactorParams.java
(original)
+++ httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/params/NIOReactorParams.java
Fri Jan 18 03:57:05 2008
@@ -83,4 +83,18 @@
         params.setLongParameter(SELECT_INTERVAL, ms);
     }
 
+    public static long getGracePeriod(final HttpParams params) {
+        if (params == null) {
+            throw new IllegalArgumentException("HTTP parameters may not be null");
+        }
+        return params.getLongParameter(GRACE_PERIOD, 500);
+    }
+    
+    public static void setGracePeriod(final HttpParams params, long ms) {
+        if (params == null) {
+            throw new IllegalArgumentException("HTTP parameters may not be null");
+        }
+        params.setLongParameter(GRACE_PERIOD, ms);
+    }
+
 }

Modified: httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/ListeningIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/ListeningIOReactor.java?rev=613136&r1=613135&r2=613136&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/ListeningIOReactor.java
(original)
+++ httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/ListeningIOReactor.java
Fri Jan 18 03:57:05 2008
@@ -32,6 +32,7 @@
 package org.apache.http.nio.reactor;
 
 import java.net.SocketAddress;
+import java.util.Set;
 import java.io.IOException;
 
 public interface ListeningIOReactor extends IOReactor {
@@ -44,6 +45,6 @@
     void resume()
         throws IOException;
 
-    ListenerEndpoint[] getEndpoints();
+    Set<ListenerEndpoint> getEndpoints();
     
 }

Modified: httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultListeningIOReactor.java
URL: http://svn.apache.org/viewvc/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultListeningIOReactor.java?rev=613136&r1=613135&r2=613136&view=diff
==============================================================================
--- httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultListeningIOReactor.java
(original)
+++ httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultListeningIOReactor.java
Fri Jan 18 03:57:05 2008
@@ -33,6 +33,7 @@
 import java.io.IOException;
 import java.net.BindException;
 import java.net.InetSocketAddress;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -118,9 +119,9 @@
         
         t.start();
         
-        ListenerEndpoint[] endpoints = ioreactor.getEndpoints();
+        Set<ListenerEndpoint> endpoints = ioreactor.getEndpoints();
         assertNotNull(endpoints);
-        assertEquals(0, endpoints.length);
+        assertEquals(0, endpoints.size());
         
         ListenerEndpoint port9998 = ioreactor.listen(new InetSocketAddress(9998));
         port9998.waitFor();
@@ -130,15 +131,17 @@
 
         endpoints = ioreactor.getEndpoints();
         assertNotNull(endpoints);
-        assertEquals(2, endpoints.length);
+        assertEquals(2, endpoints.size());
         
         port9998.close();
 
         endpoints = ioreactor.getEndpoints();
         assertNotNull(endpoints);
-        assertEquals(1, endpoints.length);
+        assertEquals(1, endpoints.size());
         
-        assertEquals(9999, ((InetSocketAddress) endpoints[0].getAddress()).getPort());
+        ListenerEndpoint endpoint = endpoints.iterator().next();
+        
+        assertEquals(9999, ((InetSocketAddress) endpoint.getAddress()).getPort());
         
         ioreactor.shutdown(1000);
         t.join(1000);
@@ -196,9 +199,9 @@
         latch.await(2000, TimeUnit.MILLISECONDS);
         assertEquals(IOReactorStatus.SHUT_DOWN, ioreactor.getStatus());
         
-        ListenerEndpoint[] endpoints = ioreactor.getEndpoints();
+        Set<ListenerEndpoint> endpoints = ioreactor.getEndpoints();
         assertNotNull(endpoints);
-        assertEquals(0, endpoints.length);
+        assertEquals(0, endpoints.size());
         
         ioreactor.shutdown(1000);
         t.join(1000);



Mime
View raw message