Return-Path: Delivered-To: apmail-jakarta-httpcomponents-commits-archive@www.apache.org Received: (qmail 20997 invoked from network); 11 Sep 2007 17:11:19 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 11 Sep 2007 17:11:19 -0000 Received: (qmail 59312 invoked by uid 500); 11 Sep 2007 17:11:12 -0000 Delivered-To: apmail-jakarta-httpcomponents-commits-archive@jakarta.apache.org Received: (qmail 59282 invoked by uid 500); 11 Sep 2007 17:11:12 -0000 Mailing-List: contact httpcomponents-commits-help@jakarta.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: httpcomponents-dev@jakarta.apache.org Delivered-To: mailing list httpcomponents-commits@jakarta.apache.org Received: (qmail 59256 invoked by uid 99); 11 Sep 2007 17:11:11 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Sep 2007 10:11:11 -0700 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Sep 2007 17:12:50 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 373B31A9832; Tue, 11 Sep 2007 10:10:53 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r574640 - in /jakarta/httpcomponents/httpcore/trunk/module-nio/src: main/java/org/apache/http/impl/nio/reactor/ main/java/org/apache/http/nio/reactor/ test/java/org/apache/http/impl/nio/reactor/ Date: Tue, 11 Sep 2007 17:10:51 -0000 To: httpcomponents-commits@jakarta.apache.org From: olegk@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20070911171053.373B31A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: olegk Date: Tue Sep 11 10:10:50 2007 New Revision: 574640 URL: http://svn.apache.org/viewvc?rev=574640&view=rev Log: HTTPCORE-109: Refactored AbstractIOReactor shutdown process. The I/O reactor will now attempt to terminate all running sessions gracefully by calling #close() and waiting for the termination process to complete within a given grace period and only then force shutting down still active sessions Added: jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultIOReactors.java - copied, changed from r573981, jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultListeningIOReactor.java Removed: jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultListeningIOReactor.java Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestAllImplReactor.java Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java?rev=574640&r1=574639&r2=574640&view=diff ============================================================================== --- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java (original) +++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractIOReactor.java Tue Sep 11 10:10:50 2007 @@ -49,8 +49,9 @@ public abstract class AbstractIOReactor implements IOReactor { - private volatile boolean closed = false; + private volatile int status; + private final Object shutdownMutex; private final long selectTimeout; private final Selector selector; private final SessionSet sessions; @@ -73,6 +74,8 @@ } catch (IOException ex) { throw new IOReactorException("Failure opening selector", ex); } + this.shutdownMutex = new Object(); + this.status = ACTIVE; } protected abstract void acceptable(SelectionKey key); @@ -91,6 +94,10 @@ protected abstract IOSession keyCancelled(final SelectionKey key); + public int getStatus() { + return this.status; + } + public void addChannel(final ChannelEntry channelEntry) { if (channelEntry == null) { throw new IllegalArgumentException("Channel entry may not be null"); @@ -118,7 +125,7 @@ throw new IOReactorException("Unexpected selector failure", ex); } - if (this.closed) { + if (this.status == SHUT_DOWN) { break; } @@ -131,11 +138,18 @@ validate(this.selector.keys()); processClosedSessions(); + + if (this.status != ACTIVE && this.sessions.isEmpty()) { + break; + } } } catch (ClosedSelectorException ex) { } finally { - closeSessions(); + synchronized (this.shutdownMutex) { + this.status = SHUT_DOWN; + this.shutdownMutex.notifyAll(); + } } } @@ -232,23 +246,16 @@ } } - private void closeSessions() { - for (Iterator it = this.sessions.iterator(); it.hasNext(); ) { - IOSession session = (IOSession) it.next(); - if (!session.isClosed()) { - + protected void closeSessions() { + synchronized (this.sessions) { + for (Iterator it = this.sessions.iterator(); it.hasNext(); ) { + IOSession session = (IOSession) it.next(); session.close(); - this.eventDispatch.disconnected(session); } } - this.sessions.clear(); } - public void shutdown() throws IOReactorException { - if (this.closed) { - return; - } - this.closed = true; + protected void closeChannels() throws IOReactorException { // Close out all channels Set keys = this.selector.keys(); for (Iterator it = keys.iterator(); it.hasNext(); ) { @@ -268,5 +275,55 @@ throw new IOReactorException("Failure closing selector", ex); } } + + public void gracefulShutdown() { + if (this.status != ACTIVE) { + // Already shutting down + return; + } + this.status = SHUTTING_DOWN; + closeSessions(); + this.selector.wakeup(); + } + public void hardShutdown() throws IOReactorException { + if (this.status == SHUT_DOWN) { + // Already shut down + return; + } + this.status = SHUT_DOWN; + closeChannels(); + } + + public void awaitShutdown(long timeout) throws InterruptedException { + synchronized (this.shutdownMutex) { + long deadline = System.currentTimeMillis() + timeout; + long remaining = timeout; + while (this.status != SHUT_DOWN) { + this.shutdownMutex.wait(remaining); + if (timeout > 0) { + remaining = deadline - System.currentTimeMillis(); + if (remaining <= 0) { + break; + } + } + } + } + } + + public void shutdown(long gracePeriod) throws IOReactorException { + gracefulShutdown(); + try { + awaitShutdown(gracePeriod); + } catch (InterruptedException ignore) { + } + if (this.status != SHUT_DOWN) { + hardShutdown(); + } + } + + public void shutdown() throws IOReactorException { + shutdown(1000); + } + } Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java?rev=574640&r1=574639&r2=574640&view=diff ============================================================================== --- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java (original) +++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/AbstractMultiworkerIOReactor.java Tue Sep 11 10:10:50 2007 @@ -40,15 +40,15 @@ public abstract class AbstractMultiworkerIOReactor implements IOReactor { + private volatile int status; + private final long selectTimeout; private final int workerCount; private final ThreadFactory threadFactory; - private final BaseIOReactor[] ioReactors; + private final BaseIOReactor[] dispatchers; private final Worker[] workers; private final Thread[] threads; - private volatile boolean shutdown; - private int currentWorker = 0; public AbstractMultiworkerIOReactor( @@ -66,12 +66,17 @@ } else { this.threadFactory = new DefaultThreadFactory(); } - this.ioReactors = new BaseIOReactor[workerCount]; - for (int i = 0; i < this.ioReactors.length; i++) { - this.ioReactors[i] = new BaseIOReactor(selectTimeout); + this.dispatchers = new BaseIOReactor[workerCount]; + for (int i = 0; i < this.dispatchers.length; i++) { + this.dispatchers[i] = new BaseIOReactor(selectTimeout); } this.workers = new Worker[workerCount]; this.threads = new Thread[workerCount]; + this.status = ACTIVE; + } + + public int getStatus() { + return this.status; } protected long getSelectTimeout() { @@ -80,47 +85,46 @@ protected void startWorkers(final IOEventDispatch eventDispatch) { for (int i = 0; i < this.workerCount; i++) { - BaseIOReactor ioReactor = this.ioReactors[i]; - this.workers[i] = new Worker(ioReactor, eventDispatch); + BaseIOReactor dispatcher = this.dispatchers[i]; + this.workers[i] = new Worker(dispatcher, eventDispatch); this.threads[i] = this.threadFactory.newThread(this.workers[i]); } for (int i = 0; i < this.workerCount; i++) { - if (this.shutdown) { + if (this.status != ACTIVE) { return; } this.threads[i].start(); } } - protected void stopWorkers(int millis) - throws InterruptedIOException, IOReactorException { - if (this.shutdown) { - return; + protected void stopWorkers(int timeout) + throws InterruptedException, IOReactorException { + + // Attempt to shut down I/O dispatchers gracefully + for (int i = 0; i < this.workerCount; i++) { + BaseIOReactor dispatcher = this.dispatchers[i]; + dispatcher.gracefulShutdown(); } - this.shutdown = true; + // Force shut down I/O dispatchers if they fail to terminate + // in time for (int i = 0; i < this.workerCount; i++) { - BaseIOReactor reactor = this.ioReactors[i]; - if (reactor != null) { - reactor.shutdown(); + BaseIOReactor dispatcher = this.dispatchers[i]; + dispatcher.awaitShutdown(timeout); + if (dispatcher.getStatus() != SHUT_DOWN) { + dispatcher.hardShutdown(); } } + // Join worker threads for (int i = 0; i < this.workerCount; i++) { - try { - Thread t = this.threads[i]; - if (t != null) { - t.join(millis); - } - } catch (InterruptedException ex) { - throw new InterruptedIOException(ex.getMessage()); + Thread t = this.threads[i]; + if (t != null) { + t.join(timeout); } } } protected void verifyWorkers() throws InterruptedIOException, IOReactorException { - if (this.shutdown) { - return; - } for (int i = 0; i < this.workerCount; i++) { Worker worker = this.workers[i]; Thread thread = this.threads[i]; @@ -141,25 +145,25 @@ protected void addChannel(final ChannelEntry entry) { // Distribute new channels among the workers - this.ioReactors[this.currentWorker++ % this.workerCount].addChannel(entry); + this.dispatchers[this.currentWorker++ % this.workerCount].addChannel(entry); } static class Worker implements Runnable { - final BaseIOReactor ioReactor; + final BaseIOReactor dispatcher; final IOEventDispatch eventDispatch; private volatile Exception exception; - public Worker(final BaseIOReactor ioReactor, final IOEventDispatch eventDispatch) { + public Worker(final BaseIOReactor dispatcher, final IOEventDispatch eventDispatch) { super(); - this.ioReactor = ioReactor; + this.dispatcher = dispatcher; this.eventDispatch = eventDispatch; } public void run() { try { - this.ioReactor.execute(this.eventDispatch); + this.dispatcher.execute(this.eventDispatch); } catch (InterruptedIOException ex) { this.exception = ex; } catch (IOReactorException ex) { @@ -168,7 +172,9 @@ this.exception = ex; } finally { try { - this.ioReactor.shutdown(); + if (this.dispatcher.getStatus() != SHUT_DOWN) { + this.dispatcher.closeChannels(); + } } catch (IOReactorException ex2) { if (this.exception == null) { this.exception = ex2; @@ -188,7 +194,7 @@ private static int COUNT = 0; public Thread newThread(final Runnable r) { - return new Thread(r, "I/O reactor worker thread " + (++COUNT)); + return new Thread(r, "I/O dispatcher " + (++COUNT)); } } Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java?rev=574640&r1=574639&r2=574640&view=diff ============================================================================== --- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java (original) +++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultConnectingIOReactor.java Tue Sep 11 10:10:50 2007 @@ -299,7 +299,11 @@ // Stop dispatching I/O events this.selector.close(); // Stop the workers - stopWorkers(500); + try { + stopWorkers(500); + } catch (InterruptedException ex) { + throw new InterruptedIOException(ex.getMessage()); + } } } Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java?rev=574640&r1=574639&r2=574640&view=diff ============================================================================== --- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java (original) +++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/impl/nio/reactor/DefaultListeningIOReactor.java Tue Sep 11 10:10:50 2007 @@ -200,9 +200,12 @@ } // Stop dispatching I/O events this.selector.close(); - // Stop the workers - stopWorkers(500); + try { + stopWorkers(500); + } catch (InterruptedException ex) { + throw new InterruptedIOException(ex.getMessage()); + } } } Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java?rev=574640&r1=574639&r2=574640&view=diff ============================================================================== --- jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java (original) +++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/reactor/IOReactor.java Tue Sep 11 10:10:50 2007 @@ -35,6 +35,12 @@ public interface IOReactor { + public static final int ACTIVE = 0; + public static final int SHUTTING_DOWN = 1; + public static final int SHUT_DOWN = 2; + + int getStatus(); + void execute(IOEventDispatch eventDispatch) throws IOException; Modified: jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestAllImplReactor.java URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestAllImplReactor.java?rev=574640&r1=574639&r2=574640&view=diff ============================================================================== --- jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestAllImplReactor.java (original) +++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestAllImplReactor.java Tue Sep 11 10:10:50 2007 @@ -41,7 +41,7 @@ public static Test suite() { TestSuite suite = new TestSuite(); suite.addTest(TestSessionInOutBuffers.suite()); - suite.addTest(TestDefaultListeningIOReactor.suite()); + suite.addTest(TestDefaultIOReactors.suite()); return suite; } Copied: jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultIOReactors.java (from r573981, jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultListeningIOReactor.java) URL: http://svn.apache.org/viewvc/jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultIOReactors.java?p2=jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultIOReactors.java&p1=jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultListeningIOReactor.java&r1=573981&r2=574640&rev=574640&view=diff ============================================================================== --- jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultListeningIOReactor.java (original) +++ jakarta/httpcomponents/httpcore/trunk/module-nio/src/test/java/org/apache/http/impl/nio/reactor/TestDefaultIOReactors.java Tue Sep 11 10:10:50 2007 @@ -46,23 +46,23 @@ * * @version $Id$ */ -public class TestDefaultListeningIOReactor extends TestCase { +public class TestDefaultIOReactors extends TestCase { // ------------------------------------------------------------ Constructor - public TestDefaultListeningIOReactor(String testName) { + public TestDefaultIOReactors(String testName) { super(testName); } // ------------------------------------------------------------------- Main public static void main(String args[]) { - String[] testCaseName = { TestDefaultListeningIOReactor.class.getName() }; + String[] testCaseName = { TestDefaultIOReactors.class.getName() }; junit.textui.TestRunner.main(testCaseName); } // ------------------------------------------------------- TestCase Methods public static Test suite() { - return new TestSuite(TestDefaultListeningIOReactor.class); + return new TestSuite(TestDefaultIOReactors.class); } public void testRestart() throws Exception {