hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1127815 - in /hadoop/common/branches/branch-0.22: CHANGES.txt src/java/org/apache/hadoop/ipc/Server.java src/test/core/org/apache/hadoop/ipc/TestIPC.java
Date Thu, 26 May 2011 07:55:17 GMT
Author: todd
Date: Thu May 26 07:55:17 2011
New Revision: 1127815

URL: http://svn.apache.org/viewvc?rev=1127815&view=rev
Log:
HADOOP-7146. RPC server leaks file descriptors. Contributed by Todd Lipcon.

Modified:
    hadoop/common/branches/branch-0.22/CHANGES.txt
    hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/ipc/Server.java
    hadoop/common/branches/branch-0.22/src/test/core/org/apache/hadoop/ipc/TestIPC.java

Modified: hadoop/common/branches/branch-0.22/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/CHANGES.txt?rev=1127815&r1=1127814&r2=1127815&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.22/CHANGES.txt Thu May 26 07:55:17 2011
@@ -489,6 +489,8 @@ Release 0.22.0 - Unreleased
     HADOOP-7287. Configuration deprecation mechanism doesn't work properly for
     GenericOptionsParser and Tools. (Aaron T. Myers via todd)
 
+    HADOOP-7146. RPC server leaks file descriptors (todd)
+
 Release 0.21.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/ipc/Server.java?rev=1127815&r1=1127814&r2=1127815&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-0.22/src/java/org/apache/hadoop/ipc/Server.java Thu May
26 07:55:17 2011
@@ -315,9 +315,8 @@ public abstract class Server {
       selector= Selector.open();
       readers = new Reader[readThreads];
       for (int i = 0; i < readThreads; i++) {
-        Selector readSelector = Selector.open();
-        Reader reader = new Reader("Socket Reader #" + (i + 1) + " for port " + port,
-                                   readSelector);
+        Reader reader = new Reader(
+            "Socket Reader #" + (i + 1) + " for port " + port);
         readers[i] = reader;
         reader.start();
       }
@@ -330,42 +329,53 @@ public abstract class Server {
     
     private class Reader extends Thread {
       private volatile boolean adding = false;
-      private Selector readSelector = null;
+      private final Selector readSelector;
 
-      Reader(String name, Selector readSelector) {
+      Reader(String name) throws IOException {
         super(name);
-        this.readSelector = readSelector;
+
+        this.readSelector = Selector.open();
       }
+      
       public void run() {
         LOG.info("Starting " + getName());
-        synchronized (this) {
-          while (running) {
-            SelectionKey key = null;
-            try {
-              readSelector.select();
-              while (adding) {
-                this.wait(1000);
-              }              
-
-              Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
-              while (iter.hasNext()) {
-                key = iter.next();
-                iter.remove();
-                if (key.isValid()) {
-                  if (key.isReadable()) {
-                    doRead(key);
-                  }
+        try {
+          doRunLoop();
+        } finally {
+          try {
+            readSelector.close();
+          } catch (IOException ioe) {
+            LOG.error("Error closing read selector in " + this.getName(), ioe);
+          }
+        }
+      }
+
+      private synchronized void doRunLoop() {
+        while (running) {
+          SelectionKey key = null;
+          try {
+            readSelector.select();
+            while (adding) {
+              this.wait(1000);
+            }              
+
+            Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
+            while (iter.hasNext()) {
+              key = iter.next();
+              iter.remove();
+              if (key.isValid()) {
+                if (key.isReadable()) {
+                  doRead(key);
                 }
-                key = null;
-              }
-            } catch (InterruptedException e) {
-              if (running) {                      // unexpected -- log it
-                LOG.info(getName() + " caught: " +
-                         StringUtils.stringifyException(e));
               }
-            } catch (IOException ex) {
-              LOG.error("Error in Reader", ex);
+              key = null;
             }
+          } catch (InterruptedException e) {
+            if (running) {                      // unexpected -- log it
+              LOG.info(getName() + " unexpectedly interrupted", e);
+            }
+          } catch (IOException ex) {
+            LOG.error("Error in Reader", ex);
           }
         }
       }
@@ -605,7 +615,7 @@ public abstract class Server {
 
   // Sends responses of RPC back to clients.
   private class Responder extends Thread {
-    private Selector writeSelector;
+    private final Selector writeSelector;
     private int pending;         // connections waiting to register
     
     final static int PURGE_INTERVAL = 900000; // 15mins
@@ -621,6 +631,19 @@ public abstract class Server {
     public void run() {
       LOG.info(getName() + ": starting");
       SERVER.set(Server.this);
+      try {
+        doRunLoop();
+      } finally {
+        LOG.info("Stopping " + this.getName());
+        try {
+          writeSelector.close();
+        } catch (IOException ioe) {
+          LOG.error("Couldn't close write selector in " + this.getName(), ioe);
+        }
+      }
+    }
+    
+    private void doRunLoop() {
       long lastPurgeTime = 0;   // last check for old calls.
 
       while (running) {
@@ -682,11 +705,9 @@ public abstract class Server {
           LOG.warn("Out of Memory in server select", e);
           try { Thread.sleep(60000); } catch (Exception ie) {}
         } catch (Exception e) {
-          LOG.warn("Exception in Responder " + 
-                   StringUtils.stringifyException(e));
+          LOG.warn("Exception in Responder", e);
         }
       }
-      LOG.info("Stopping " + this.getName());
     }
 
     private void doAsyncWrite(SelectionKey key) throws IOException {
@@ -1446,12 +1467,10 @@ public abstract class Server {
           }
         } catch (InterruptedException e) {
           if (running) {                          // unexpected -- log it
-            LOG.info(getName() + " caught: " +
-                     StringUtils.stringifyException(e));
+            LOG.info(getName() + " unexpectedly interrupted", e);
           }
         } catch (Exception e) {
-          LOG.info(getName() + " caught: " +
-                   StringUtils.stringifyException(e));
+          LOG.info(getName() + " caught an exception", e);
         }
       }
       LOG.info(getName() + ": exiting");

Modified: hadoop/common/branches/branch-0.22/src/test/core/org/apache/hadoop/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/src/test/core/org/apache/hadoop/ipc/TestIPC.java?rev=1127815&r1=1127814&r2=1127815&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/src/test/core/org/apache/hadoop/ipc/TestIPC.java (original)
+++ hadoop/common/branches/branch-0.22/src/test/core/org/apache/hadoop/ipc/TestIPC.java Thu
May 26 07:55:17 2011
@@ -27,6 +27,7 @@ import org.apache.hadoop.net.NetUtils;
 
 import java.util.Random;
 import java.io.DataInput;
+import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
@@ -36,6 +37,7 @@ import junit.framework.TestCase;
 import static org.mockito.Mockito.*;
 
 import org.apache.hadoop.conf.Configuration;
+import org.junit.Assume;
 
 /** Unit tests for IPC. */
 public class TestIPC extends TestCase {
@@ -55,6 +57,9 @@ public class TestIPC extends TestCase {
 
   private static final String ADDRESS = "0.0.0.0";
 
+  /** Directory where we can count open file descriptors on Linux */
+  private static final File FD_DIR = new File("/proc/self/fd");
+
   private static class TestServer extends Server {
     private boolean sleep;
 
@@ -354,6 +359,29 @@ public class TestIPC extends TestCase {
         addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf);
   }
   
+  /**
+   * Check that file descriptors aren't leaked by starting
+   * and stopping IPC servers.
+   */
+  public void testSocketLeak() throws Exception {
+    Assume.assumeTrue(FD_DIR.exists()); // only run on Linux
+
+    long startFds = countOpenFileDescriptors();
+    for (int i = 0; i < 50; i++) {
+      Server server = new TestServer(1, true);
+      server.start();
+      server.stop();
+    }
+    long endFds = countOpenFileDescriptors();
+    
+    assertTrue("Leaked " + (endFds - startFds) + " file descriptors",
+        endFds - startFds < 20);
+  }
+  
+  private long countOpenFileDescriptors() {
+    return FD_DIR.list().length;
+  }
+
   public static void main(String[] args) throws Exception {
 
     //new TestIPC("test").testSerial(5, false, 2, 10, 1000);



Mime
View raw message