Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 80956 invoked from network); 15 Feb 2011 07:19:10 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 15 Feb 2011 07:19:10 -0000 Received: (qmail 90180 invoked by uid 500); 15 Feb 2011 07:19:09 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 89920 invoked by uid 500); 15 Feb 2011 07:19:07 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 89913 invoked by uid 99); 15 Feb 2011 07:19:06 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Feb 2011 07:19:06 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Feb 2011 07:19:05 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 6335D23888A2; Tue, 15 Feb 2011 07:18:45 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1070791 - in /hadoop/common/branches/branch-0.22: CHANGES.txt src/java/org/apache/hadoop/ipc/Server.java src/test/core/org/apache/hadoop/ipc/TestRPC.java Date: Tue, 15 Feb 2011 07:18:45 -0000 To: common-commits@hadoop.apache.org From: todd@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110215071845.6335D23888A2@eris.apache.org> Author: todd Date: Tue Feb 15 07:18:44 2011 New Revision: 1070791 URL: http://svn.apache.org/viewvc?rev=1070791&view=rev Log: HADOOP-7140. IPC Reader threads do not stop when server stops. Contributed by Todd Lipcon. Modified: hadoop/common/branches/branch-0.22/CHANGES.txt hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/ipc/Server.java hadoop/common/branches/branch-0.22/src/test/core/org/apache/hadoop/ipc/TestRPC.java Modified: hadoop/common/branches/branch-0.22/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/CHANGES.txt?rev=1070791&r1=1070790&r2=1070791&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.22/CHANGES.txt Tue Feb 15 07:18:44 2011 @@ -416,6 +416,8 @@ Release 0.22.0 - Unreleased HADOOP-6642. Fix javac, javadoc, findbugs warnings related to security work. (Chris Douglas, Po Cheung via shv) + HADOOP-7140. IPC Reader threads do not stop when server stops (todd) + Release 0.21.1 - Unreleased IMPROVEMENTS Modified: hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/ipc/Server.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/ipc/Server.java?rev=1070791&r1=1070790&r2=1070791&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/ipc/Server.java (original) +++ hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/ipc/Server.java Tue Feb 15 07:18:44 2011 @@ -299,7 +299,6 @@ public abstract class Server { private long cleanupInterval = 10000; //the minimum interval between //two cleanup runs private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128); - private ExecutorService readPool; public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); @@ -313,12 +312,12 @@ public abstract class Server { // create a selector; selector= Selector.open(); readers = new Reader[readThreads]; - readPool = Executors.newFixedThreadPool(readThreads); for (int i = 0; i < readThreads; i++) { Selector readSelector = Selector.open(); - Reader reader = new Reader(readSelector); + Reader reader = new Reader("Socket Reader #" + (i + 1) + " for port " + port, + readSelector); readers[i] = reader; - readPool.execute(reader); + reader.start(); } // Register accepts on the server socket with the selector. @@ -327,15 +326,16 @@ public abstract class Server { this.setDaemon(true); } - private class Reader implements Runnable { + private class Reader extends Thread { private volatile boolean adding = false; private Selector readSelector = null; - Reader(Selector readSelector) { + Reader(String name, Selector readSelector) { + super(name); this.readSelector = readSelector; } public void run() { - LOG.info("Starting SocketReader"); + LOG.info("Starting " + getName()); synchronized (this) { while (running) { SelectionKey key = null; @@ -389,6 +389,16 @@ public abstract class Server { adding = false; this.notify(); } + + void shutdown() { + assert !running; + readSelector.wakeup(); + try { + join(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } } /** cleanup connections from connectionList. Choose a random range * to scan and also have a limit on the number of the connections @@ -577,7 +587,9 @@ public abstract class Server { LOG.info(getName() + ":Exception in closing listener socket. " + e); } } - readPool.shutdown(); + for (Reader r : readers) { + r.shutdown(); + } } synchronized Selector getSelector() { return selector; } Modified: hadoop/common/branches/branch-0.22/src/test/core/org/apache/hadoop/ipc/TestRPC.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/src/test/core/org/apache/hadoop/ipc/TestRPC.java?rev=1070791&r1=1070790&r2=1070791&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/src/test/core/org/apache/hadoop/ipc/TestRPC.java (original) +++ hadoop/common/branches/branch-0.22/src/test/core/org/apache/hadoop/ipc/TestRPC.java Tue Feb 15 07:18:44 2011 @@ -21,6 +21,9 @@ package org.apache.hadoop.ipc; import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; import java.lang.reflect.Method; import junit.framework.TestCase; @@ -506,6 +509,50 @@ public class TestRPC extends TestCase { } assertTrue(succeeded); } + + /** + * Count the number of threads that have a stack frame containing + * the given string + */ + private static int countThreads(String search) { + ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); + + int count = 0; + ThreadInfo[] infos = threadBean.getThreadInfo(threadBean.getAllThreadIds(), 20); + for (ThreadInfo info : infos) { + if (info == null) continue; + for (StackTraceElement elem : info.getStackTrace()) { + if (elem.getClassName().contains(search)) { + count++; + break; + } + } + } + return count; + } + + + /** + * Test that server.stop() properly stops all threads + */ + public void testStopsAllThreads() throws Exception { + int threadsBefore = countThreads("Server$Listener$Reader"); + assertEquals("Expect no Reader threads running before test", + 0, threadsBefore); + + final Server server = RPC.getServer(TestProtocol.class, + new TestImpl(), ADDRESS, 0, 5, true, conf, null); + server.start(); + try { + int threadsRunning = countThreads("Server$Listener$Reader"); + assertTrue(threadsRunning > 0); + } finally { + server.stop(); + } + int threadsAfter = countThreads("Server$Listener$Reader"); + assertEquals("Expect no Reader threads left running after test", + 0, threadsAfter); + } public static void main(String[] args) throws Exception {