hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject svn commit: r1502704 - in /hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common: CHANGES.txt src/main/java/org/apache/hadoop/ipc/Client.java src/test/java/org/apache/hadoop/ipc/TestIPC.java
Date Fri, 12 Jul 2013 21:55:15 GMT
Author: cmccabe
Date: Fri Jul 12 21:55:15 2013
New Revision: 1502704

URL: http://svn.apache.org/r1502704
Log:
HADOOP-9703.  org.apache.hadoop.ipc.Client leaks threads on stop (Tsuyoshi OZAWA via Colin
Patrick McCabe)

Modified:
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1502704&r1=1502703&r2=1502704&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt Fri Jul
12 21:55:15 2013
@@ -31,6 +31,9 @@ Release 2.2.0 - UNRELEASED
     HADOOP-9417.  Support for symlink resolution in LocalFileSystem /
     RawLocalFileSystem.  (Andrew Wang via Colin Patrick McCabe)
 
+    HADOOP-9703.  org.apache.hadoop.ipc.Client leaks threads on stop.
+    (Tsuyoshi OZAWA vi Colin Patrick McCabe)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1502704&r1=1502703&r2=1502704&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
Fri Jul 12 21:55:15 2013
@@ -115,17 +115,70 @@ public class Client {
   final static int PING_CALL_ID = -1;
   
   /**
-   * Executor on which IPC calls' parameters are sent. Deferring
-   * the sending of parameters to a separate thread isolates them
-   * from thread interruptions in the calling code.
-   */
-  private static final ExecutorService SEND_PARAMS_EXECUTOR = 
-    Executors.newCachedThreadPool(
-        new ThreadFactoryBuilder()
-        .setDaemon(true)
-        .setNameFormat("IPC Parameter Sending Thread #%d")
-        .build());
-
+   * Executor on which IPC calls' parameters are sent.
+   * Deferring the sending of parameters to a separate
+   * thread isolates them from thread interruptions in the
+   * calling code.
+   */
+  private final ExecutorService sendParamsExecutor;
+  private final static ClientExecutorServiceFactory clientExcecutorFactory =
+      new ClientExecutorServiceFactory();
+
+  private static class ClientExecutorServiceFactory {
+    private int executorRefCount = 0;
+    private ExecutorService clientExecutor = null;
+    
+    /**
+     * Get Executor on which IPC calls' parameters are sent.
+     * If the internal reference counter is zero, this method
+     * creates the instance of Executor. If not, this method
+     * just returns the reference of clientExecutor.
+     * 
+     * @return An ExecutorService instance
+     */
+    synchronized ExecutorService refAndGetInstance() {
+      if (executorRefCount == 0) {
+        clientExecutor = Executors.newCachedThreadPool(
+            new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat("IPC Parameter Sending Thread #%d")
+            .build());
+      }
+      executorRefCount++;
+      
+      return clientExecutor;
+    }
+    
+    /**
+     * Cleanup Executor on which IPC calls' parameters are sent.
+     * If reference counter is zero, this method discards the
+     * instance of the Executor. If not, this method
+     * just decrements the internal reference counter.
+     * 
+     * @return An ExecutorService instance if it exists.
+     *   Null is returned if not.
+     */
+    synchronized ExecutorService unrefAndCleanup() {
+      executorRefCount--;
+      assert(executorRefCount >= 0);
+      
+      if (executorRefCount == 0) {
+        clientExecutor.shutdown();
+        try {
+          if (!clientExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
+            clientExecutor.shutdownNow();
+          }
+        } catch (InterruptedException e) {
+          LOG.error("Interrupted while waiting for clientExecutor" +
+              "to stop", e);
+          clientExecutor.shutdownNow();
+        }
+        clientExecutor = null;
+      }
+      
+      return clientExecutor;
+    }
+  };
   
   /**
    * set the ping interval value in configuration
@@ -198,7 +251,7 @@ public class Client {
   synchronized boolean isZeroReference() {
     return refCount==0;
   }
-
+  
   /** 
    * Class that represents an RPC call
    */
@@ -878,7 +931,8 @@ public class Client {
       }
 
       // Serialize the call to be sent. This is done from the actual
-      // caller thread, rather than the SEND_PARAMS_EXECUTOR thread,
+      // caller thread, rather than the sendParamsExecutor thread,
+      
       // so that if the serialization throws an error, it is reported
       // properly. This also parallelizes the serialization.
       //
@@ -895,7 +949,7 @@ public class Client {
       call.rpcRequest.write(d);
 
       synchronized (sendRpcRequestLock) {
-        Future<?> senderFuture = SEND_PARAMS_EXECUTOR.submit(new Runnable() {
+        Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {
           @Override
           public void run() {
             try {
@@ -1090,6 +1144,7 @@ public class Client {
         CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
     this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
         CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
+    this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
   }
 
   /**
@@ -1134,6 +1189,8 @@ public class Client {
       } catch (InterruptedException e) {
       }
     }
+    
+    clientExcecutorFactory.unrefAndCleanup();
   }
 
   /**

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java?rev=1502704&r1=1502703&r2=1502704&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
Fri Jul 12 21:55:15 2013
@@ -562,6 +562,24 @@ public class TestIPC {
     assertFalse(noChanged ^ serviceClass == serviceClass2);
     client.stop();
   }
+  
+  @Test(timeout=30000, expected=IOException.class)
+  public void testIpcAfterStopping() throws IOException, InterruptedException {
+    // start server
+    Server server = new TestServer(5, false);
+    InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    server.start();
+
+    // start client
+    Client client = new Client(LongWritable.class, conf);
+    client.call(new LongWritable(RANDOM.nextLong()),
+        addr, null, null, MIN_SLEEP_TIME, 0, conf);
+    client.stop();
+ 
+    // This call should throw IOException.
+    client.call(new LongWritable(RANDOM.nextLong()),
+        addr, null, null, MIN_SLEEP_TIME, 0, conf);
+  }
 
   /**
    * Check that file descriptors aren't leaked by starting



Mime
View raw message