Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B0982CDA4 for ; Fri, 21 Jun 2013 05:03:35 +0000 (UTC) Received: (qmail 85729 invoked by uid 500); 21 Jun 2013 05:03:35 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 85647 invoked by uid 500); 21 Jun 2013 05:03:32 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 85637 invoked by uid 99); 21 Jun 2013 05:03:31 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Jun 2013 05:03:31 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Jun 2013 05:03:30 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 3E32223888D2 for ; Fri, 21 Jun 2013 05:03:11 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1495293 - in /hadoop/common/branches/branch-1: CHANGES.txt src/core/org/apache/hadoop/ipc/Server.java src/test/org/apache/hadoop/ipc/TestRPC.java Date: Fri, 21 Jun 2013 05:03:11 -0000 To: common-commits@hadoop.apache.org From: ivanmi@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130621050311.3E32223888D2@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ivanmi Date: Fri Jun 21 05:03:10 2013 New Revision: 1495293 URL: http://svn.apache.org/r1495293 Log: HADOOP-7140. IPC Reader threads do not stop when server stops (Backported by Ivan Mitic). Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Server.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/ipc/TestRPC.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1495293&r1=1495292&r2=1495293&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Fri Jun 21 05:03:10 2013 @@ -62,6 +62,9 @@ Release 1.3.0 - unreleased HADOOP-9624. TestFSMainOperationsLocalFileSystem failed when the Hadoop test root path has "X" in its name. (Xi Fang via cnauroth) + HADOOP-7140. IPC Reader threads do not stop when server stops + (Todd Lipcon, backported by ivanmi) + Release 1.2.1 - Unreleased INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Server.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Server.java?rev=1495293&r1=1495292&r2=1495293&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Server.java (original) +++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Server.java Fri Jun 21 05:03:10 2013 @@ -329,7 +329,6 @@ public abstract class Server { private long cleanupInterval = 10000; //the minimum interval between //two cleanup runs private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128); - private ExecutorService readPool; public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); @@ -343,12 +342,12 @@ public abstract class Server { // create a selector; selector= Selector.open(); readers = new Reader[readThreads]; - readPool = Executors.newFixedThreadPool(readThreads); for (int i = 0; i < readThreads; i++) { Selector readSelector = Selector.open(); - Reader reader = new Reader(readSelector); + Reader reader = new Reader("Socket Reader #" + (i + 1) + " for port " + port, + readSelector); readers[i] = reader; - readPool.execute(reader); + reader.start(); } // Register accepts on the server socket with the selector. @@ -357,15 +356,16 @@ public abstract class Server { this.setDaemon(true); } - private class Reader implements Runnable { + private class Reader extends Thread { private volatile boolean adding = false; private Selector readSelector = null; - Reader(Selector readSelector) { + Reader(String name, Selector readSelector) { + super(name); this.readSelector = readSelector; } public void run() { - LOG.info("Starting SocketReader"); + LOG.info("Starting " + getName()); synchronized (this) { while (running) { SelectionKey key = null; @@ -419,6 +419,16 @@ public abstract class Server { adding = false; this.notify(); } + + void shutdown() { + assert !running; + readSelector.wakeup(); + try { + join(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } } /** cleanup connections from connectionList. Choose a random range @@ -607,7 +617,9 @@ public abstract class Server { LOG.info(getName() + ":Exception in closing listener socket. " + e); } } - readPool.shutdown(); + for (Reader r : readers) { + r.shutdown(); + } } // The method that will return the next reader to work with Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/ipc/TestRPC.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/ipc/TestRPC.java?rev=1495293&r1=1495292&r2=1495293&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/ipc/TestRPC.java (original) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/ipc/TestRPC.java Fri Jun 21 05:03:10 2013 @@ -23,6 +23,9 @@ import static org.apache.hadoop.test.Met import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; import java.lang.reflect.Method; import java.net.ConnectException; import java.net.InetSocketAddress; @@ -410,6 +413,50 @@ public class TestRPC extends TestCase { assertCounter("rpcAuthenticationSuccesses", 0, rb); } } + + /** + * Count the number of threads that have a stack frame containing + * the given string + */ + private static int countThreads(String search) { + ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); + + int count = 0; + ThreadInfo[] infos = threadBean.getThreadInfo(threadBean.getAllThreadIds(), 20); + for (ThreadInfo info : infos) { + if (info == null) continue; + for (StackTraceElement elem : info.getStackTrace()) { + if (elem.getClassName().contains(search)) { + count++; + break; + } + } + } + return count; + } + + + /** + * Test that server.stop() properly stops all threads + */ + public void testStopsAllThreads() throws Exception { + int threadsBefore = countThreads("Server$Listener$Reader"); + assertEquals("Expect no Reader threads running before test", + 0, threadsBefore); + + final Server server = RPC.getServer(new TestImpl(), ADDRESS, + 0, 5, true, conf); + server.start(); + try { + int threadsRunning = countThreads("Server$Listener$Reader"); + assertTrue(threadsRunning > 0); + } finally { + server.stop(); + } + int threadsAfter = countThreads("Server$Listener$Reader"); + assertEquals("Expect no Reader threads left running after test", + 0, threadsAfter); + } public void testAuthorization() throws Exception { Configuration conf = new Configuration();