hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject hadoop git commit: HADOOP-11802. DomainSocketWatcher thread terminates sometimes after there is an I/O error during requestShortCircuitShm (cmccabe)
Date Fri, 24 Apr 2015 02:10:04 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 0eae3bc0f -> 496afb5e1


HADOOP-11802. DomainSocketWatcher thread terminates sometimes after there is an I/O error
during requestShortCircuitShm (cmccabe)

(cherry picked from commit a0e0a63209b5eb17dca5cc503be36aa52defeabd)
(cherry picked from commit 788b76761d5dfadf688406d50169e95401fe5d33)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/496afb5e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/496afb5e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/496afb5e

Branch: refs/heads/branch-2.7
Commit: 496afb5e1a5890a579354a191696c42bcda7ab96
Parents: 0eae3bc
Author: Colin Patrick Mccabe <cmccabe@cloudera.com>
Authored: Thu Apr 23 18:59:52 2015 -0700
Committer: Colin Patrick Mccabe <cmccabe@cloudera.com>
Committed: Thu Apr 23 19:09:50 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  3 +
 .../hadoop/net/unix/DomainSocketWatcher.java    |  4 +-
 .../hadoop/net/unix/DomainSocketWatcher.c       | 10 ++-
 .../server/datanode/DataNodeFaultInjector.java  |  2 +
 .../hdfs/server/datanode/DataXceiver.java       | 18 ++++-
 .../hdfs/shortcircuit/DfsClientShmManager.java  |  3 +-
 .../hdfs/shortcircuit/DomainSocketFactory.java  |  6 ++
 .../shortcircuit/TestShortCircuitCache.java     | 83 ++++++++++++++++++--
 8 files changed, 111 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/496afb5e/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index e29cb0f..b144177 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -21,6 +21,9 @@ Release 2.7.1 - UNRELEASED
     HADOOP-11730. Regression: s3n read failure recovery broken.
     (Takenori Sato via stevel)
 
+    HADOOP-11802. DomainSocketWatcher thread terminates sometimes after there
+    is an I/O error during requestShortCircuitShm (cmccabe)
+
 Release 2.7.0 - 2015-04-20
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/496afb5e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
index 03b52e0..5648ae1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java
@@ -512,8 +512,8 @@ public final class DomainSocketWatcher implements Closeable {
         }
       } catch (InterruptedException e) {
         LOG.info(toString() + " terminating on InterruptedException");
-      } catch (IOException e) {
-        LOG.error(toString() + " terminating on IOException", e);
+      } catch (Throwable e) {
+        LOG.error(toString() + " terminating on exception", e);
       } finally {
         lock.lock();
         try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/496afb5e/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
index dbaa4fe..596601b 100644
--- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
+++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/net/unix/DomainSocketWatcher.c
@@ -111,7 +111,7 @@ JNIEnv *env, jobject obj, jint fd)
   pollfd = &sd->pollfd[sd->used_size];
   sd->used_size++;
   pollfd->fd = fd;
-  pollfd->events = POLLIN;
+  pollfd->events = POLLIN | POLLHUP;
   pollfd->revents = 0;
 }
 
@@ -162,7 +162,10 @@ JNIEnv *env, jobject obj)
       GetLongField(env, obj, fd_set_data_fid);
   used_size = sd->used_size;
   for (i = 0; i < used_size; i++) {
-    if (sd->pollfd[i].revents & POLLIN) {
+    // We check for both POLLIN and POLLHUP, because on some OSes, when a socket
+    // is shutdown(), it sends POLLHUP rather than POLLIN.
+    if ((sd->pollfd[i].revents & POLLIN) ||
+        (sd->pollfd[i].revents & POLLHUP)) {
       num_readable++;
     } else {
       sd->pollfd[i].revents = 0;
@@ -177,7 +180,8 @@ JNIEnv *env, jobject obj)
     }
     j = 0;
     for (i = 0; ((i < used_size) && (j < num_readable)); i++) {
-      if (sd->pollfd[i].revents & POLLIN) {
+      if ((sd->pollfd[i].revents & POLLIN) ||
+          (sd->pollfd[i].revents & POLLHUP)) {
         carr[j] = sd->pollfd[i].fd;
         j++;
         sd->pollfd[i].revents = 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/496afb5e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
index 478099d..65f0506 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
@@ -39,4 +39,6 @@ public class DataNodeFaultInjector {
   public void getHdfsBlocksMetadata() {}
 
   public void writeBlockAfterFlush() throws IOException {}
+
+  public void sendShortCircuitShmResponse() throws IOException {}
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/496afb5e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index cf1b6be..01ff32d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -421,6 +421,7 @@ class DataXceiver extends Receiver implements Runnable {
 
   private void sendShmSuccessResponse(DomainSocket sock, NewShmInfo shmInfo)
       throws IOException {
+    DataNodeFaultInjector.get().sendShortCircuitShmResponse();
     ShortCircuitShmResponseProto.newBuilder().setStatus(SUCCESS).
         setId(PBHelper.convert(shmInfo.shmId)).build().
         writeDelimitedTo(socketOut);
@@ -479,10 +480,19 @@ class DataXceiver extends Receiver implements Runnable {
         }
       }
       if ((!success) && (peer == null)) {
-        // If we failed to pass the shared memory segment to the client,
-        // close the UNIX domain socket now.  This will trigger the 
-        // DomainSocketWatcher callback, cleaning up the segment.
-        IOUtils.cleanup(null, sock);
+        // The socket is now managed by the DomainSocketWatcher.  However,
+        // we failed to pass it to the client.  We call shutdown() on the
+        // UNIX domain socket now.  This will trigger the DomainSocketWatcher
+        // callback.  The callback will close the domain socket.
+        // We don't want to close the socket here, since that might lead to
+        // bad behavior inside the poll() call.  See HADOOP-11802 for details.
+        try {
+          LOG.warn("Failed to send success response back to the client.  " +
+              "Shutting down socket for " + shmInfo.shmId + ".");
+          sock.shutdown();
+        } catch (IOException e) {
+          LOG.warn("Failed to shut down socket in error handler", e);
+        }
       }
       IOUtils.cleanup(null, shmInfo);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/496afb5e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
index 9092bc5..062539a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
@@ -216,10 +216,11 @@ public class DfsClientShmManager implements Closeable {
      * Must be called with the EndpointShmManager lock held.
      *
      * @param peer          The peer to use to talk to the DataNode.
-     * @param clientName    The client name.
      * @param usedPeer      (out param) Will be set to true if we used the peer.
      *                        When a peer is used
      *
+     * @param clientName    The client name.
+     * @param blockId       The block ID to use.
      * @return              null if the DataNode does not support shared memory
      *                        segments, or experienced an error creating the
      *                        shm.  The shared memory segment itself on success.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/496afb5e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java
index 5fd31a9..60adb02 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -185,4 +186,9 @@ public class DomainSocketFactory {
   public void disableDomainSocketPath(String path) {
     pathMap.put(path, PathState.UNUSABLE);
   }
+
+  @VisibleForTesting
+  public void clearPathMap() {
+    pathMap.invalidateAll();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/496afb5e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
index 7daabd0..efd9754 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.net.DomainPeer;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
 import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
 import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm;
 import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo;
@@ -73,6 +74,9 @@ import org.apache.hadoop.util.Time;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
@@ -622,6 +626,18 @@ public class TestShortCircuitCache {
     sockDir.close();
   }
 
+  static private void checkNumberOfSegmentsAndSlots(final int expectedSegments,
+        final int expectedSlots, ShortCircuitRegistry registry) {
+    registry.visit(new ShortCircuitRegistry.Visitor() {
+      @Override
+      public void accept(HashMap<ShmId, RegisteredShm> segments,
+                         HashMultimap<ExtendedBlockId, Slot> slots) {
+        Assert.assertEquals(expectedSegments, segments.size());
+        Assert.assertEquals(expectedSlots, slots.size());
+      }
+    });
+  }
+
   public static class TestCleanupFailureInjector
         extends BlockReaderFactory.FailureInjector {
     @Override
@@ -665,16 +681,67 @@ public class TestShortCircuitCache {
       GenericTestUtils.assertExceptionContains("TCP reads were disabled for " +
           "testing, but we failed to do a non-TCP read.", t);
     }
-    ShortCircuitRegistry registry =
-      cluster.getDataNodes().get(0).getShortCircuitRegistry();
-    registry.visit(new ShortCircuitRegistry.Visitor() {
+    checkNumberOfSegmentsAndSlots(1, 1,
+        cluster.getDataNodes().get(0).getShortCircuitRegistry());
+    cluster.shutdown();
+    sockDir.close();
+  }
+
+  // Regression test for HADOOP-11802
+  @Test(timeout=60000)
+  public void testDataXceiverHandlesRequestShortCircuitShmFailure()
+      throws Exception {
+    BlockReaderTestUtil.enableShortCircuitShmTracing();
+    TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
+    Configuration conf = createShortCircuitConf(
+        "testDataXceiverHandlesRequestShortCircuitShmFailure", sockDir);
+    conf.setLong(HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
+        1000000000L);
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    final Path TEST_PATH1 = new Path("/test_file1");
+    DFSTestUtil.createFile(fs, TEST_PATH1, 4096,
+        (short)1, 0xFADE1);
+    LOG.info("Setting failure injector and performing a read which " +
+        "should fail...");
+    DataNodeFaultInjector failureInjector = Mockito.mock(DataNodeFaultInjector.class);
+    Mockito.doAnswer(new Answer<Void>() {
       @Override
-      public void accept(HashMap<ShmId, RegisteredShm> segments,
-                         HashMultimap<ExtendedBlockId, Slot> slots) {
-        Assert.assertEquals(1, segments.size());
-        Assert.assertEquals(1, slots.size());
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        throw new IOException("injected error into sendShmResponse");
       }
-    });
+    }).when(failureInjector).sendShortCircuitShmResponse();
+    DataNodeFaultInjector prevInjector = DataNodeFaultInjector.instance;
+    DataNodeFaultInjector.instance = failureInjector;
+
+    try {
+      // The first read will try to allocate a shared memory segment and slot.
+      // The shared memory segment allocation will fail because of the failure
+      // injector.
+      DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
+      Assert.fail("expected readFileBuffer to fail, but it succeeded.");
+    } catch (Throwable t) {
+      GenericTestUtils.assertExceptionContains("TCP reads were disabled for " +
+          "testing, but we failed to do a non-TCP read.", t);
+    }
+
+    checkNumberOfSegmentsAndSlots(0, 0,
+        cluster.getDataNodes().get(0).getShortCircuitRegistry());
+
+    LOG.info("Clearing failure injector and performing another read...");
+    DataNodeFaultInjector.instance = prevInjector;
+
+    fs.getClient().getClientContext().getDomainSocketFactory().clearPathMap();
+
+    // The second read should succeed.
+    DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
+
+    // We should have added a new short-circuit shared memory segment and slot.
+    checkNumberOfSegmentsAndSlots(1, 1,
+        cluster.getDataNodes().get(0).getShortCircuitRegistry());
+
     cluster.shutdown();
     sockDir.close();
   }


Mime
View raw message