hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject [34/50] [abbrv] hadoop git commit: HDFS-7764. DirectoryScanner shouldn't abort the scan if one directory had an error (Rakesh R via cmccabe)
Date Mon, 01 Feb 2016 18:40:53 GMT
HDFS-7764. DirectoryScanner shouldn't abort the scan if one directory had an error (Rakesh
R via cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f67149ab
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f67149ab
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f67149ab

Branch: refs/heads/HDFS-7240
Commit: f67149ab08bb49381def6c535ab4c4610e0a4221
Parents: ee005e0
Author: Colin Patrick Mccabe <cmccabe@cloudera.com>
Authored: Thu Jan 28 19:54:50 2016 -0800
Committer: Colin Patrick Mccabe <cmccabe@cloudera.com>
Committed: Thu Jan 28 19:54:50 2016 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../hdfs/server/datanode/DirectoryScanner.java  | 72 +++++++++++++-------
 .../server/datanode/TestDirectoryScanner.java   | 50 ++++++++++++++
 3 files changed, 100 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f67149ab/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index a51dc15..9b80aa1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -962,6 +962,9 @@ Release 2.9.0 - UNRELEASED
     HDFS-9677. Rename generationStampV1/generationStampV2 to
     legacyGenerationStamp/generationStamp. (Mingliang Liu via jing9)
 
+    HDFS-7764. DirectoryScanner shouldn't abort the scan if one directory had
+    an error (Rakesh R via cmccabe)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f67149ab/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index 392c121..083ca31 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -42,12 +44,12 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StopWatch;
 import org.apache.hadoop.util.Time;
@@ -727,18 +729,20 @@ public class DirectoryScanner implements Runnable {
 
       for (Entry<Integer, Future<ScanInfoPerBlockPool>> report :
           compilersInProgress.entrySet()) {
+        Integer index = report.getKey();
         try {
-          dirReports[report.getKey()] = report.getValue().get();
+          dirReports[index] = report.getValue().get();
 
           // If our compiler threads were interrupted, give up on this run
-          if (dirReports[report.getKey()] == null) {
+          if (dirReports[index] == null) {
             dirReports = null;
             break;
           }
         } catch (Exception ex) {
-          LOG.error("Error compiling report", ex);
-          // Propagate ex to DataBlockScanner to deal with
-          throw new RuntimeException(ex);
+          FsVolumeSpi fsVolumeSpi = volumes.get(index);
+          LOG.error("Error compiling report for the volume, StorageId: "
+              + fsVolumeSpi.getStorageID(), ex);
+          // Continue scanning the other volumes
         }
       }
     } catch (IOException e) {
@@ -747,7 +751,9 @@ public class DirectoryScanner implements Runnable {
     if (dirReports != null) {
       // Compile consolidated report for all the volumes
       for (ScanInfoPerBlockPool report : dirReports) {
-        list.addAll(report);
+        if(report != null){
+          list.addAll(report);
+        }
       }
     }
     return list.toSortedArrays();
@@ -837,12 +843,11 @@ public class DirectoryScanner implements Runnable {
         File bpFinalizedDir, File dir, LinkedList<ScanInfo> report)
         throws InterruptedException {
 
-      File[] files;
-
       throttle();
 
+      List <String> fileNames;
       try {
-        files = FileUtil.listFiles(dir);
+        fileNames = IOUtils.listDirectory(dir, BlockDirFilter.INSTANCE);
       } catch (IOException ioe) {
         LOG.warn("Exception occured while compiling report: ", ioe);
         // Initiate a check on disk failure.
@@ -850,44 +855,50 @@ public class DirectoryScanner implements Runnable {
         // Ignore this directory and proceed.
         return report;
       }
-      Arrays.sort(files);
+      Collections.sort(fileNames);
+
       /*
        * Assumption: In the sorted list of files block file appears immediately
        * before block metadata file. This is true for the current naming
        * convention for block file blk_<blockid> and meta file
        * blk_<blockid>_<genstamp>.meta
        */
-      for (int i = 0; i < files.length; i++) {
+      for (int i = 0; i < fileNames.size(); i++) {
         // Make sure this thread can make a timely exit. With a low throttle
         // rate, completing a run can take a looooong time.
         if (Thread.interrupted()) {
           throw new InterruptedException();
         }
 
-        if (files[i].isDirectory()) {
-          compileReport(vol, bpFinalizedDir, files[i], report);
+        File file = new File(dir, fileNames.get(i));
+        if (file.isDirectory()) {
+          compileReport(vol, bpFinalizedDir, file, report);
           continue;
         }
-        if (!Block.isBlockFilename(files[i])) {
-          if (isBlockMetaFile(Block.BLOCK_FILE_PREFIX, files[i].getName())) {
-            long blockId = Block.getBlockId(files[i].getName());
-            verifyFileLocation(files[i].getParentFile(), bpFinalizedDir,
+        if (!Block.isBlockFilename(file)) {
+          if (isBlockMetaFile(Block.BLOCK_FILE_PREFIX, file.getName())) {
+            long blockId = Block.getBlockId(file.getName());
+            verifyFileLocation(file.getParentFile(), bpFinalizedDir,
                 blockId);
-            report.add(new ScanInfo(blockId, null, files[i], vol));
+            report.add(new ScanInfo(blockId, null, file, vol));
           }
           continue;
         }
-        File blockFile = files[i];
-        long blockId = Block.filename2id(blockFile.getName());
+        File blockFile = file;
+        long blockId = Block.filename2id(file.getName());
         File metaFile = null;
 
         // Skip all the files that start with block name until
         // getting to the metafile for the block
-        while (i + 1 < files.length && files[i + 1].isFile()
-            && files[i + 1].getName().startsWith(blockFile.getName())) {
+        while (i + 1 < fileNames.size()) {
+          File blkMetaFile = new File(dir, fileNames.get(i + 1));
+          if (!(blkMetaFile.isFile()
+              && blkMetaFile.getName().startsWith(blockFile.getName()))) {
+            break;
+          }
           i++;
-          if (isBlockMetaFile(blockFile.getName(), files[i].getName())) {
-            metaFile = files[i];
+          if (isBlockMetaFile(blockFile.getName(), blkMetaFile.getName())) {
+            metaFile = blkMetaFile;
             break;
           }
         }
@@ -946,4 +957,15 @@ public class DirectoryScanner implements Runnable {
       perfTimer.reset().start();
     }
   }
+
+  private enum BlockDirFilter implements FilenameFilter {
+    INSTANCE;
+
+    @Override
+    public boolean accept(File dir, String name) {
+      return name.startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)
+          || name.startsWith(DataStorage.STORAGE_DIR_FINALIZED)
+          || name.startsWith(Block.BLOCK_FILE_PREFIX);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f67149ab/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index d030144..d860107 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -30,6 +30,8 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
@@ -54,6 +56,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
@@ -63,6 +66,7 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 /**
  * Tests {@link DirectoryScanner} handling of differences
@@ -934,4 +938,50 @@ public class TestDirectoryScanner {
         new File(TEST_VOLUME.getFinalizedDir(BPID_2).getAbsolutePath(),
             "blk_567__1004.meta"));
   }
+
+  /**
+   * Test the behavior of exception handling during directory scan operation.
+   * Directory scanner shouldn't abort the scan on every directory just because
+   * one had an error.
+   */
+  @Test(timeout = 60000)
+  public void testExceptionHandlingWhileDirectoryScan() throws Exception {
+    cluster = new MiniDFSCluster.Builder(CONF).build();
+    try {
+      cluster.waitActive();
+      bpid = cluster.getNamesystem().getBlockPoolId();
+      fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
+      client = cluster.getFileSystem().getClient();
+      CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
+      DataNode dataNode = cluster.getDataNodes().get(0);
+
+      // Add files with 2 blocks
+      createFile(GenericTestUtils.getMethodName(), BLOCK_LENGTH * 2, false);
+
+      // Inject error on #getFinalizedDir() so that ReportCompiler#call() will
+      // hit exception while preparing the block info report list.
+      List<FsVolumeSpi> volumes = new ArrayList<>();
+      Iterator<FsVolumeSpi> iterator = fds.getFsVolumeReferences().iterator();
+      while (iterator.hasNext()) {
+        FsVolumeSpi volume = iterator.next();
+        FsVolumeSpi spy = Mockito.spy(volume);
+        Mockito.doThrow(new IOException("Error while getFinalizedDir"))
+            .when(spy).getFinalizedDir(volume.getBlockPoolList()[0]);
+        volumes.add(spy);
+      }
+      FsVolumeReferences volReferences = new FsVolumeReferences(volumes);
+      FsDatasetSpi<? extends FsVolumeSpi> spyFds = Mockito.spy(fds);
+      Mockito.doReturn(volReferences).when(spyFds).getFsVolumeReferences();
+
+      scanner = new DirectoryScanner(dataNode, spyFds, CONF);
+      scanner.setRetainDiffs(true);
+      scanner.reconcile();
+    } finally {
+      if (scanner != null) {
+        scanner.shutdown();
+        scanner = null;
+      }
+      cluster.shutdown();
+    }
+  }
 }


Mime
View raw message