hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kih...@apache.org
Subject hadoop git commit: HDFS-11379. DFSInputStream may infinite loop requesting block locations. Contributed by Daryn Sharp.
Date Mon, 13 Feb 2017 15:38:53 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8.0 bfa04500d -> cf91cc2b0


HDFS-11379. DFSInputStream may infinite loop requesting block locations. Contributed by Daryn
Sharp.

(cherry picked from commit 22f8b6613707e7d08905656f2e11c6c24a6a8533)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cf91cc2b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cf91cc2b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cf91cc2b

Branch: refs/heads/branch-2.8.0
Commit: cf91cc2b0f5c71aa300756373d5a533a431239bc
Parents: bfa0450
Author: Kihwal Lee <kihwal@apache.org>
Authored: Mon Feb 13 08:42:35 2017 -0600
Committer: Kihwal Lee <kihwal@apache.org>
Committed: Mon Feb 13 08:42:35 2017 -0600

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSInputStream.java  | 48 +++++++-----------
 .../java/org/apache/hadoop/hdfs/TestPread.java  | 52 +++++++++++++++++++-
 2 files changed, 70 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf91cc2b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 463ce23..873fb03 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -505,33 +505,36 @@ public class DFSInputStream extends FSInputStream
       }
       else {
         // search cached blocks first
-        int targetBlockIdx = locatedBlocks.findBlock(offset);
-        if (targetBlockIdx < 0) { // block is not cached
-          targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
-          // fetch more blocks
-          final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
-          assert (newBlocks != null) : "Could not find target position " + offset;
-          locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
-        }
-        blk = locatedBlocks.get(targetBlockIdx);
+        blk = fetchBlockAt(offset, 0, true);
       }
       return blk;
     }
   }
 
   /** Fetch a block from namenode and cache it */
-  protected void fetchBlockAt(long offset) throws IOException {
+  protected LocatedBlock fetchBlockAt(long offset) throws IOException {
+    return fetchBlockAt(offset, 0, false); // don't use cache
+  }
+
+  /** Fetch a block from namenode and cache it */
+  private LocatedBlock fetchBlockAt(long offset, long length, boolean useCache)
+      throws IOException {
     synchronized(infoLock) {
       int targetBlockIdx = locatedBlocks.findBlock(offset);
       if (targetBlockIdx < 0) { // block is not cached
         targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
+        useCache = false;
       }
-      // fetch blocks
-      final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
-      if (newBlocks == null) {
-        throw new IOException("Could not find target position " + offset);
+      if (!useCache) { // fetch blocks
+        final LocatedBlocks newBlocks = (length == 0)
+            ? dfsClient.getLocatedBlocks(src, offset)
+            : dfsClient.getLocatedBlocks(src, offset, length);
+        if (newBlocks == null || newBlocks.locatedBlockCount() == 0) {
+          throw new EOFException("Could not find target position " + offset);
+        }
+        locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
       }
-      locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
+      return locatedBlocks.get(targetBlockIdx);
     }
   }
 
@@ -586,28 +589,15 @@ public class DFSInputStream extends FSInputStream
       assert (locatedBlocks != null) : "locatedBlocks is null";
       List<LocatedBlock> blockRange = new ArrayList<>();
       // search cached blocks first
-      int blockIdx = locatedBlocks.findBlock(offset);
-      if (blockIdx < 0) { // block is not cached
-        blockIdx = LocatedBlocks.getInsertIndex(blockIdx);
-      }
       long remaining = length;
       long curOff = offset;
       while(remaining > 0) {
-        LocatedBlock blk = null;
-        if(blockIdx < locatedBlocks.locatedBlockCount())
-          blk = locatedBlocks.get(blockIdx);
-        if (blk == null || curOff < blk.getStartOffset()) {
-          LocatedBlocks newBlocks;
-          newBlocks = dfsClient.getLocatedBlocks(src, curOff, remaining);
-          locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());
-          continue;
-        }
+        LocatedBlock blk = fetchBlockAt(curOff, remaining, true);
         assert curOff >= blk.getStartOffset() : "Block not found";
         blockRange.add(blk);
         long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff;
         remaining -= bytesRead;
         curOff += bytesRead;
-        blockIdx++;
       }
       return blockRange;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf91cc2b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
index cc0fb92..b1858e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Random;
@@ -29,7 +30,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -494,6 +496,54 @@ public class TestPread {
     }
   }
 
+  @Test
+  public void testTruncateWhileReading() throws Exception {
+    Path path = new Path("/testfile");
+    final int blockSize = 512;
+
+    // prevent initial pre-fetch of multiple block locations
+    Configuration conf = new Configuration();
+    conf.setLong(HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY, blockSize);
+
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    try {
+      DistributedFileSystem fs = cluster.getFileSystem();
+      // create multi-block file
+      FSDataOutputStream dos =
+          fs.create(path, true, blockSize, (short)1, blockSize);
+      dos.write(new byte[blockSize*3]);
+      dos.close();
+      // truncate a file while it's open
+      final FSDataInputStream dis = fs.open(path);
+      while (!fs.truncate(path, 10)) {
+        Thread.sleep(10);
+      }
+      // verify that reading bytes outside the initial pre-fetch do
+      // not send the client into an infinite loop querying locations.
+      ExecutorService executor = Executors.newFixedThreadPool(1);
+      Future<?> future = executor.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws IOException {
+          // read from 2nd block.
+          dis.readFully(blockSize, new byte[4]);
+          return null;
+        }
+      });
+      try {
+        future.get(4, TimeUnit.SECONDS);
+        Assert.fail();
+      } catch (ExecutionException ee) {
+        assertTrue(ee.toString(), ee.getCause() instanceof EOFException);
+      } finally {
+        future.cancel(true);
+        executor.shutdown();
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   public static void main(String[] args) throws Exception {
     new TestPread().testPreadDFS();
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message