hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jgho...@apache.org
Subject svn commit: r928966 - in /hadoop/hdfs/trunk: ./ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/
Date Tue, 30 Mar 2010 03:14:27 GMT
Author: jghoman
Date: Tue Mar 30 03:14:27 2010
New Revision: 928966

URL: http://svn.apache.org/viewvc?rev=928966&view=rev
Log:
HDFS-854. Datanode should scan devices in parallel to generate block report. 
Contributed by Dmytro Molkov.

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/hdfs-default.xml
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=928966&r1=928965&r2=928966&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Mar 30 03:14:27 2010
@@ -109,6 +109,9 @@ Trunk (unreleased changes)
     HDFS-892. Optionally use Avro reflection for Namenode RPC.  This
     is not a complete implementation yet, but rather a starting point.
     (cutting)
+    
+    HDFS-854. Datanode should scan devices in parallel to generate
+    block report. (Dmytro Molkov via jhoman)
 
   OPTIMIZATIONS
 

Modified: hadoop/hdfs/trunk/src/java/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/hdfs-default.xml?rev=928966&r1=928965&r2=928966&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/hdfs-default.xml (original)
+++ hadoop/hdfs/trunk/src/java/hdfs-default.xml Tue Mar 30 03:14:27 2010
@@ -302,6 +302,14 @@ creations/deletions), or "all".</descrip
 </property>
 
 <property>
+  <name>dfs.datanode.directoryscan.threads</name>
+  <value>1</value>
+  <description>How many threads should the threadpool used to compile reports
+  for volumes in parallel have.
+  </description>
+</property>
+
+<property>
   <name>dfs.heartbeat.interval</name>
   <value>3</value>
   <description>Determines datanode heartbeat interval in seconds.</description>

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=928966&r1=928965&r2=928966&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue Mar 30 03:14:27
2010
@@ -128,6 +128,8 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_DATANODE_ADDRESS_DEFAULT = "0.0.0.0:50010";
   public static final String  DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY = "dfs.datanode.directoryscan.interval";
   public static final int     DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT = 21600;
+  public static final String  DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = "dfs.datanode.directoryscan.threads";
+  public static final int     DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT = 1;
   public static final String  DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface";
   public static final String  DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default";
   public static final String  DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver";

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=928966&r1=928965&r2=928966&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
(original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
Tue Mar 30 03:14:27 2010
@@ -621,6 +621,7 @@ class DataBlockScanner implements Runnab
   synchronized void shutdown() {
     LogFileHandler log = verificationLog;
     verificationLog = null;
+    dirScanner.shutdown();
     if (log != null) {
       log.close();
     }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java?rev=928966&r1=928965&r2=928966&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
(original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
Tue Mar 30 03:14:27 2010
@@ -20,12 +20,20 @@ package org.apache.hadoop.hdfs.server.da
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
@@ -42,6 +50,7 @@ public class DirectoryScanner {
   private final FSDataset dataset;
   private long scanPeriod;
   private long lastScanTime;
+  private ExecutorService reportCompileThreadPool;
 
   LinkedList<ScanInfo> diff = new LinkedList<ScanInfo>();
 
@@ -128,6 +137,11 @@ public class DirectoryScanner {
     int interval = conf.getInt("dfs.datanode.directoryscan.interval",
         DEFAULT_SCAN_INTERVAL);
     scanPeriod = interval * 1000L;
+    int threads = 
+        conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
+                    DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT);
+
+    reportCompileThreadPool = Executors.newFixedThreadPool(threads);
 
     Random rand = new Random();
     lastScanTime = System.currentTimeMillis() - (rand.nextInt(interval) * 1000L);
@@ -147,6 +161,10 @@ public class DirectoryScanner {
     missingMemoryBlocks = 0;
     mismatchBlocks = 0;
   }
+  
+  void shutdown() {
+    reportCompileThreadPool.shutdown();
+  }
 
   /**
    * Reconcile differences between disk and in-memory blocks
@@ -241,13 +259,29 @@ public class DirectoryScanner {
     FSDataset.FSVolume[] volumes = dataset.volumes.volumes;
     ArrayList<LinkedList<ScanInfo>> dirReports =
       new ArrayList<LinkedList<ScanInfo>>(volumes.length);
+    
+    Map<Integer, Future<LinkedList<ScanInfo>>> compilersInProgress =
+      new HashMap<Integer, Future<LinkedList<ScanInfo>>>();
     for (int i = 0; i < volumes.length; i++) {
       if (!dataset.volumes.isValid(volumes[i])) { // volume is still valid
         dirReports.add(i, null);
       } else {
-        LinkedList<ScanInfo> dirReport = new LinkedList<ScanInfo>();
-        dirReports.add(i, compileReport(volumes[i], volumes[i].getDir(),
-            dirReport));
+        ReportCompiler reportCompiler =
+          new ReportCompiler(volumes[i], volumes[i].getDir());
+        Future<LinkedList<ScanInfo>> result = 
+          reportCompileThreadPool.submit(reportCompiler);
+        compilersInProgress.put(i, result);
+      }
+    }
+    
+    for (Map.Entry<Integer, Future<LinkedList<ScanInfo>>> report :
+      compilersInProgress.entrySet()) {
+      try {
+        dirReports.add(report.getKey(), report.getValue().get());
+      } catch (Exception ex) {
+        LOG.error("Error compiling report", ex);
+        // Propagate ex to DataBlockScanner to deal with
+        throw new RuntimeException(ex);
       }
     }
 
@@ -270,46 +304,63 @@ public class DirectoryScanner {
         && metaFile.endsWith(Block.METADATA_EXTENSION);
   }
 
-  /** Compile list {@link ScanInfo} for the blocks in the directory <dir>*/
-  private LinkedList<ScanInfo> compileReport(FSVolume vol, File dir,
-      LinkedList<ScanInfo> report) {
-    File[] files = dir.listFiles();
-    Arrays.sort(files);
-
-    /* Assumption: In the sorted list of files block file appears immediately
-     * before block metadata file. This is true for the current naming
-     * convention for block file blk_<blockid> and meta file
-     * blk_<blockid>_<genstamp>.meta
-     */
-    for (int i = 0; i < files.length; i++) {
-      if (files[i].isDirectory()) {
-        compileReport(vol, files[i], report);
-        continue;
-      }
-      if (!Block.isBlockFilename(files[i])) {
-        if (isBlockMetaFile("blk_", files[i].getName())) {
-          long blockId = Block.getBlockId(files[i].getName());
-          report.add(new ScanInfo(blockId, null, files[i], vol));
+  private static class ReportCompiler implements Callable<LinkedList<ScanInfo>>
{
+    private FSVolume volume;
+    private File dir;
+
+    public ReportCompiler(FSVolume volume, File dir) {
+      this.dir = dir;
+      this.volume = volume;
+    }
+
+    @Override
+    public LinkedList<ScanInfo> call() throws Exception {
+      LinkedList<ScanInfo> result = new LinkedList<ScanInfo>();
+      compileReport(volume, dir, result);
+      return result;
+    }
+
+    /** Compile list {@link ScanInfo} for the blocks in the directory <dir> */
+    private LinkedList<ScanInfo> compileReport(FSVolume vol, File dir,
+        LinkedList<ScanInfo> report) {
+      File[] files = dir.listFiles();
+      Arrays.sort(files);
+
+      /*
+       * Assumption: In the sorted list of files block file appears immediately
+       * before block metadata file. This is true for the current naming
+       * convention for block file blk_<blockid> and meta file
+       * blk_<blockid>_<genstamp>.meta
+       */
+      for (int i = 0; i < files.length; i++) {
+        if (files[i].isDirectory()) {
+          compileReport(vol, files[i], report);
+          continue;
         }
-        continue;
-      }
-      File blockFile = files[i];
-      long blockId = Block.filename2id(blockFile.getName());
-      File metaFile = null;
-
-      // Skip all the files that start with block name until
-      // getting to the metafile for the block
-      while (i + 1 < files.length
-          && files[i+1].isFile()
-          && files[i + 1].getName().startsWith(blockFile.getName())) {
-        i++;
-        if (isBlockMetaFile(blockFile.getName(), files[i].getName())) {
-          metaFile = files[i];
-          break;
+        if (!Block.isBlockFilename(files[i])) {
+          if (isBlockMetaFile("blk_", files[i].getName())) {
+            long blockId = Block.getBlockId(files[i].getName());
+            report.add(new ScanInfo(blockId, null, files[i], vol));
+          }
+          continue;
         }
+        File blockFile = files[i];
+        long blockId = Block.filename2id(blockFile.getName());
+        File metaFile = null;
+
+        // Skip all the files that start with block name until
+        // getting to the metafile for the block
+        while (i + 1 < files.length && files[i + 1].isFile()
+            && files[i + 1].getName().startsWith(blockFile.getName())) {
+          i++;
+          if (isBlockMetaFile(blockFile.getName(), files[i].getName())) {
+            metaFile = files[i];
+            break;
+          }
+        }
+        report.add(new ScanInfo(blockId, blockFile, metaFile, vol));
       }
-      report.add(new ScanInfo(blockId, blockFile, metaFile, vol));
+      return report;
     }
-    return report;
   }
 }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java?rev=928966&r1=928965&r2=928966&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
(original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
Tue Mar 30 03:14:27 2010
@@ -204,11 +204,20 @@ public class TestDirectoryScanner extend
     assertEquals(mismatchBlocks, scanner.mismatchBlocks);
   }
 
-  public void test() throws Exception {
+  public void testDirectoryScanner() throws Exception {
+    // Run the test with and without parallel scanning
+    for (int parallelism = 1; parallelism < 2; parallelism++) {
+      runTest(parallelism);
+    }
+  }
+  
+  public void runTest(int parallelism) throws Exception {
     cluster = new MiniDFSCluster(CONF, 1, true, null);
     try {
       cluster.waitActive();
       fds = (FSDataset) cluster.getDataNodes().get(0).getFSDataset();
+      CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
+                  parallelism);
       scanner = new DirectoryScanner(fds, CONF);
 
       // Add files with 100 blocks



Mime
View raw message