hc-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ol...@apache.org
Subject svn commit: r574640 - in /jakarta/httpcomponents/httpcore/trunk/module-nio/src: main/java/org/apache/http/impl/nio/reactor/ main/java/org/apache/http/nio/reactor/ test/java/org/apache/http/impl/nio/reactor/
Date Tue, 11 Sep 2007 17:10:51 GMT
Author: olegk
Date: Tue Sep 11 10:10:50 2007
New Revision: 574640

URL: http://svn.apache.org/viewvc?rev=574640&view=rev
Log:
HTTPCORE-109: Refactored AbstractIOReactor shutdown process. The I/O reactor will now attempt
to terminate all running sessions gracefully by calling #close() and waiting for the termination
process to complete within a given grace period and only then force shutting down still active
sessions

Added:
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultIOReactors.java
      - copied, changed from r573981, jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultListeningIOReactor.java
Removed:
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultListeningIOReactor.java
Modified:
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java
    jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestAllImplReactor.java

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java?rev=574640&r1=574639&r2=574640&view=diff
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java
(original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java
Tue Sep 11 10:10:50 2007
@@ -49,8 +49,9 @@
 
 public abstract class AbstractIOReactor implements IOReactor {
 
-    private volatile boolean closed = false;
+    private volatile int status;
     
+    private final Object shutdownMutex;
     private final long selectTimeout;
     private final Selector selector;
     private final SessionSet sessions;
@@ -73,6 +74,8 @@
         } catch (IOException ex) {
             throw new IOReactorException("Failure opening selector", ex);
         }
+        this.shutdownMutex = new Object();
+        this.status = ACTIVE;
     }
 
     protected abstract void acceptable(SelectionKey key);
@@ -91,6 +94,10 @@
     
     protected abstract IOSession keyCancelled(final SelectionKey key);
     
+    public int getStatus() {
+        return this.status;
+    }
+
     public void addChannel(final ChannelEntry channelEntry) {
         if (channelEntry == null) {
             throw new IllegalArgumentException("Channel entry may not be null");
@@ -118,7 +125,7 @@
                     throw new IOReactorException("Unexpected selector failure", ex);
                 }
                 
-                if (this.closed) {
+                if (this.status == SHUT_DOWN) {
                     break;
                 }
 
@@ -131,11 +138,18 @@
                 validate(this.selector.keys());
                 
                 processClosedSessions();
+
+                if (this.status != ACTIVE && this.sessions.isEmpty()) {
+                    break;
+                }
                 
             }
         } catch (ClosedSelectorException ex) {
         } finally {
-            closeSessions();
+            synchronized (this.shutdownMutex) {
+                this.status = SHUT_DOWN;
+                this.shutdownMutex.notifyAll();
+            }
         }
     }
     
@@ -232,23 +246,16 @@
         }
     }
 
-    private void closeSessions() {
-        for (Iterator it = this.sessions.iterator(); it.hasNext(); ) {
-            IOSession session = (IOSession) it.next();
-            if (!session.isClosed()) {    
-
+    protected void closeSessions() {
+        synchronized (this.sessions) {
+            for (Iterator it = this.sessions.iterator(); it.hasNext(); ) {
+                IOSession session = (IOSession) it.next();
                 session.close();
-                this.eventDispatch.disconnected(session);
             }
         }
-        this.sessions.clear();
     }
     
-    public void shutdown() throws IOReactorException {
-        if (this.closed) {
-            return;
-        }
-        this.closed = true;
+    protected void closeChannels() throws IOReactorException {
         // Close out all channels
         Set keys = this.selector.keys();
         for (Iterator it = keys.iterator(); it.hasNext(); ) {
@@ -268,5 +275,55 @@
             throw new IOReactorException("Failure closing selector", ex);
         }
     }
+    
+    public void gracefulShutdown() {
+        if (this.status != ACTIVE) {
+            // Already shutting down
+            return;
+        }
+        this.status = SHUTTING_DOWN;
+        closeSessions();
+        this.selector.wakeup();
+    }
         
+    public void hardShutdown() throws IOReactorException {
+        if (this.status == SHUT_DOWN) {
+            // Already shut down
+            return;
+        }
+        this.status = SHUT_DOWN;
+        closeChannels();
+    }
+    
+    public void awaitShutdown(long timeout) throws InterruptedException {
+        synchronized (this.shutdownMutex) {
+            long deadline = System.currentTimeMillis() + timeout;
+            long remaining = timeout;
+            while (this.status != SHUT_DOWN) {
+                this.shutdownMutex.wait(remaining);
+                if (timeout > 0) {
+                    remaining = deadline - System.currentTimeMillis();
+                    if (remaining <= 0) {
+                        break;
+                    }
+                }
+            }
+        }
+    }
+        
+    public void shutdown(long gracePeriod) throws IOReactorException {
+        gracefulShutdown();
+        try {
+            awaitShutdown(gracePeriod);
+        } catch (InterruptedException ignore) {
+        }
+        if (this.status != SHUT_DOWN) {
+            hardShutdown();
+        }
+    }
+    
+    public void shutdown() throws IOReactorException {
+        shutdown(1000);
+    }
+    
 }

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java?rev=574640&r1=574639&r2=574640&view=diff
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java
(original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java
Tue Sep 11 10:10:50 2007
@@ -40,15 +40,15 @@
 
 public abstract class AbstractMultiworkerIOReactor implements IOReactor {
 
+    private volatile int status;
+    
     private final long selectTimeout;
     private final int workerCount;
     private final ThreadFactory threadFactory;
-    private final BaseIOReactor[] ioReactors;
+    private final BaseIOReactor[] dispatchers;
     private final Worker[] workers;
     private final Thread[] threads;
     
-    private volatile boolean shutdown;
-    
     private int currentWorker = 0;
     
     public AbstractMultiworkerIOReactor(
@@ -66,12 +66,17 @@
         } else {
             this.threadFactory = new DefaultThreadFactory();
         }
-        this.ioReactors = new BaseIOReactor[workerCount];
-        for (int i = 0; i < this.ioReactors.length; i++) {
-            this.ioReactors[i] = new BaseIOReactor(selectTimeout);
+        this.dispatchers = new BaseIOReactor[workerCount];
+        for (int i = 0; i < this.dispatchers.length; i++) {
+            this.dispatchers[i] = new BaseIOReactor(selectTimeout);
         }
         this.workers = new Worker[workerCount];
         this.threads = new Thread[workerCount];
+        this.status = ACTIVE;
+    }
+
+    public int getStatus() {
+        return this.status;
     }
 
     protected long getSelectTimeout() {
@@ -80,47 +85,46 @@
     
     protected void startWorkers(final IOEventDispatch eventDispatch) {
         for (int i = 0; i < this.workerCount; i++) {
-            BaseIOReactor ioReactor = this.ioReactors[i];
-            this.workers[i] = new Worker(ioReactor, eventDispatch);
+            BaseIOReactor dispatcher = this.dispatchers[i];
+            this.workers[i] = new Worker(dispatcher, eventDispatch);
             this.threads[i] = this.threadFactory.newThread(this.workers[i]);
         }
         for (int i = 0; i < this.workerCount; i++) {
-            if (this.shutdown) {
+            if (this.status != ACTIVE) {
                 return;
             }
             this.threads[i].start();
         }
     }
 
-    protected void stopWorkers(int millis) 
-            throws InterruptedIOException, IOReactorException {
-        if (this.shutdown) {
-            return;
+    protected void stopWorkers(int timeout) 
+            throws InterruptedException, IOReactorException {
+
+        // Attempt to shut down I/O dispatchers gracefully
+        for (int i = 0; i < this.workerCount; i++) {
+            BaseIOReactor dispatcher = this.dispatchers[i];
+            dispatcher.gracefulShutdown();
         }
-        this.shutdown = true;
+        // Force shut down I/O dispatchers if they fail to terminate
+        // in time
         for (int i = 0; i < this.workerCount; i++) {
-            BaseIOReactor reactor = this.ioReactors[i];
-            if (reactor != null) {
-                reactor.shutdown();
+            BaseIOReactor dispatcher = this.dispatchers[i];
+            dispatcher.awaitShutdown(timeout);
+            if (dispatcher.getStatus() != SHUT_DOWN) {
+                dispatcher.hardShutdown();
             }
         }
+        // Join worker threads
         for (int i = 0; i < this.workerCount; i++) {
-            try {
-                Thread t = this.threads[i];
-                if (t != null) {
-                    t.join(millis);
-                }
-            } catch (InterruptedException ex) {
-                throw new InterruptedIOException(ex.getMessage());
+            Thread t = this.threads[i];
+            if (t != null) {
+                t.join(timeout);
             }
         }
     }
     
     protected void verifyWorkers() 
             throws InterruptedIOException, IOReactorException {
-        if (this.shutdown) {
-            return;
-        }
         for (int i = 0; i < this.workerCount; i++) {
             Worker worker = this.workers[i];
             Thread thread = this.threads[i];
@@ -141,25 +145,25 @@
     
     protected void addChannel(final ChannelEntry entry) {
         // Distribute new channels among the workers
-        this.ioReactors[this.currentWorker++ % this.workerCount].addChannel(entry);
+        this.dispatchers[this.currentWorker++ % this.workerCount].addChannel(entry);
     }
         
     static class Worker implements Runnable {
 
-        final BaseIOReactor ioReactor;
+        final BaseIOReactor dispatcher;
         final IOEventDispatch eventDispatch;
         
         private volatile Exception exception;
         
-        public Worker(final BaseIOReactor ioReactor, final IOEventDispatch eventDispatch)
{
+        public Worker(final BaseIOReactor dispatcher, final IOEventDispatch eventDispatch)
{
             super();
-            this.ioReactor = ioReactor;
+            this.dispatcher = dispatcher;
             this.eventDispatch = eventDispatch;
         }
         
         public void run() {
             try {
-                this.ioReactor.execute(this.eventDispatch);
+                this.dispatcher.execute(this.eventDispatch);
             } catch (InterruptedIOException ex) {
                 this.exception = ex;
             } catch (IOReactorException ex) {
@@ -168,7 +172,9 @@
                 this.exception = ex;
             } finally {
                 try {
-                    this.ioReactor.shutdown();
+                    if (this.dispatcher.getStatus() != SHUT_DOWN) {
+                        this.dispatcher.closeChannels();
+                    }
                 } catch (IOReactorException ex2) {
                     if (this.exception == null) {
                         this.exception = ex2;
@@ -188,7 +194,7 @@
         private static int COUNT = 0;
         
         public Thread newThread(final Runnable r) {
-            return new Thread(r, "I/O reactor worker thread " + (++COUNT));
+            return new Thread(r, "I/O dispatcher " + (++COUNT));
         }
         
     }

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java?rev=574640&r1=574639&r2=574640&view=diff
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java
(original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java
Tue Sep 11 10:10:50 2007
@@ -299,7 +299,11 @@
         // Stop dispatching I/O events
         this.selector.close();
         // Stop the workers
-        stopWorkers(500);
+        try {
+            stopWorkers(500);
+        } catch (InterruptedException ex) {
+            throw new InterruptedIOException(ex.getMessage());
+        }
     }
         
 }

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java?rev=574640&r1=574639&r2=574640&view=diff
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java
(original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java
Tue Sep 11 10:10:50 2007
@@ -200,9 +200,12 @@
         }
         // Stop dispatching I/O events
         this.selector.close();
-        
         // Stop the workers
-        stopWorkers(500);
+        try {
+            stopWorkers(500);
+        } catch (InterruptedException ex) {
+            throw new InterruptedIOException(ex.getMessage());
+        }
     }
 
 }

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java?rev=574640&r1=574639&r2=574640&view=diff
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java
(original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java
Tue Sep 11 10:10:50 2007
@@ -35,6 +35,12 @@
 
 public interface IOReactor {
 
+    public static final int ACTIVE           = 0;
+    public static final int SHUTTING_DOWN    = 1;
+    public static final int SHUT_DOWN        = 2;
+     
+    int getStatus();
+    
     void execute(IOEventDispatch eventDispatch) 
         throws IOException;
 

Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestAllImplReactor.java
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestAllImplReactor.java?rev=574640&r1=574639&r2=574640&view=diff
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestAllImplReactor.java
(original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestAllImplReactor.java
Tue Sep 11 10:10:50 2007
@@ -41,7 +41,7 @@
     public static Test suite() {
         TestSuite suite = new TestSuite();
         suite.addTest(TestSessionInOutBuffers.suite());
-        suite.addTest(TestDefaultListeningIOReactor.suite());
+        suite.addTest(TestDefaultIOReactors.suite());
         return suite;
     }
 

Copied: jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultIOReactors.java
(from r573981, jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultListeningIOReactor.java)
URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultIOReactors.java?p2=jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultIOReactors.java&p1=jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultListeningIOReactor.java&r1=573981&r2=574640&rev=574640&view=diff
==============================================================================
--- jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultListeningIOReactor.java
(original)
+++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultIOReactors.java
Tue Sep 11 10:10:50 2007
@@ -46,23 +46,23 @@
  * 
  * @version $Id$
  */
-public class TestDefaultListeningIOReactor extends TestCase {
+public class TestDefaultIOReactors extends TestCase {
 
     // ------------------------------------------------------------ Constructor
-    public TestDefaultListeningIOReactor(String testName) {
+    public TestDefaultIOReactors(String testName) {
         super(testName);
     }
 
     // ------------------------------------------------------------------- Main
     public static void main(String args[]) {
-        String[] testCaseName = { TestDefaultListeningIOReactor.class.getName() };
+        String[] testCaseName = { TestDefaultIOReactors.class.getName() };
         junit.textui.TestRunner.main(testCaseName);
     }
 
     // ------------------------------------------------------- TestCase Methods
 
     public static Test suite() {
-        return new TestSuite(TestDefaultListeningIOReactor.class);
+        return new TestSuite(TestDefaultIOReactors.class);
     }
 
     public void testRestart() throws Exception {



Mime
View raw message