hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rang...@apache.org
Subject svn commit: r764031 - in /hadoop/core/trunk: ./ src/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/test/org/apache/hadoop/hdfs/server/datanode/
Date Fri, 10 Apr 2009 20:14:15 GMT
Author: rangadi
Date: Fri Apr 10 20:14:14 2009
New Revision: 764031

URL: http://svn.apache.org/viewvc?rev=764031&view=rev
Log:
HADOOP-4584. Improve datanode block reports and associated file system
scan to avoid interefering with normal datanode operations.

Added:
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/hdfs/hdfs-default.xml
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/Block.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=764031&r1=764030&r2=764031&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Apr 10 20:14:14 2009
@@ -212,6 +212,9 @@
     HADOOP-2413. Remove the static variable FSNamesystem.fsNamesystemObject.
     (Konstantin Shvachko via szetszwo)
 
+    HADOOP-4584. Improve datanode block reports and associated file system
+    scan to avoid interefering with normal datanode operations. 
+
   OPTIMIZATIONS
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a

Modified: hadoop/core/trunk/src/hdfs/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/hdfs-default.xml?rev=764031&r1=764030&r2=764031&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/hdfs-default.xml (original)
+++ hadoop/core/trunk/src/hdfs/hdfs-default.xml Fri Apr 10 20:14:14 2009
@@ -271,6 +271,14 @@
 </property>
 
 <property>
+  <name>dfs.datanode.directoryscan.interval</name>
+  <value>21600</value>
+  <description>Interval in seconds for Datanode to scan data directories and
+  reconcile the difference between blocks in memory and on the disk.
+  </description>
+</property>
+
+<property>
   <name>dfs.heartbeat.interval</name>
   <value>3</value>
   <description>Determines datanode heartbeat interval in seconds.</description>

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/Block.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/Block.java?rev=764031&r1=764030&r2=764031&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/Block.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/Block.java Fri Apr 10 20:14:14 2009
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.protocol;
 
 import java.io.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.io.*;
@@ -28,7 +30,8 @@
  *
  **************************************************/
 public class Block implements Writable, Comparable<Block> {
-
+  public static final String BLOCK_FILE_PREFIX = "blk_";
+  public static final String METADATA_EXTENSION = ".meta";
   static {                                      // register a ctor
     WritableFactories.setFactory
       (Block.class,
@@ -41,20 +44,41 @@
   // a generation stamp.
   public static final long GRANDFATHER_GENERATION_STAMP = 0;
 
-  /**
-   */
+  public static final Pattern blockFilePattern = Pattern
+      .compile(BLOCK_FILE_PREFIX + "(-??\\d++)$");
+  public static final Pattern metaFilePattern = Pattern
+      .compile(BLOCK_FILE_PREFIX + "(-??\\d++)_(\\d++)\\" + METADATA_EXTENSION
+          + "$");
+
   public static boolean isBlockFilename(File f) {
     String name = f.getName();
-    if ( name.startsWith( "blk_" ) && 
-        name.indexOf( '.' ) < 0 ) {
-      return true;
-    } else {
-      return false;
-    }
+    return blockFilePattern.matcher(name).matches();
+  }
+
+  public static long filename2id(String name) {
+    Matcher m = blockFilePattern.matcher(name);
+    return m.matches() ? Long.parseLong(m.group(1)) : 0;
   }
 
-  static long filename2id(String name) {
-    return Long.parseLong(name.substring("blk_".length()));
+  public static boolean isMetaFilename(String name) {
+    return metaFilePattern.matcher(name).matches();
+  }
+
+  /**
+   * Get generation stamp from the name of the metafile name
+   */
+  public static long getGenerationStamp(String metaFile) {
+    Matcher m = metaFilePattern.matcher(metaFile);
+    return m.matches() ? Long.parseLong(m.group(2))
+        : GRANDFATHER_GENERATION_STAMP;
+  }
+
+  /**
+   * Get the blockId from the name of the metafile name
+   */
+  public static long getBlockId(String metaFile) {
+    Matcher m = metaFilePattern.matcher(metaFile);
+    return m.matches() ? Long.parseLong(m.group(1)) : 0;
   }
 
   private long blockId;
@@ -96,7 +120,7 @@
   /**
    */
   public String getBlockName() {
-    return "blk_" + String.valueOf(blockId);
+    return BLOCK_FILE_PREFIX + String.valueOf(blockId);
   }
 
   /**

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=764031&r1=764030&r2=764031&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Fri Apr 10 20:14:14 2009
@@ -52,7 +52,13 @@
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.StringUtils;
 
-/*
+/**
+ * Performs two types of scanning:
+ * <li> Gets block files from the data directories and reconciles the
+ * difference between the blocks on the disk and in memory in
+ * {@link FSDataset}</li>
+ * <li> Scans the data directories for block files and verifies that
+ * the files are not corrupt</li>
  * This keeps track of blocks and their last verification times.
  * Currently it does not modify the metadata for block.
  */
@@ -96,6 +102,9 @@
   
   BlockTransferThrottler throttler = null;
   
+  // Reconciles blocks on disk to blocks in memory
+  DirectoryScanner dirScanner;
+
   private static enum ScanType {
     REMOTE_READ,           // Verified when a block read by a client etc
     VERIFICATION_SCAN,     // scanned as part of periodic verfication
@@ -143,6 +152,8 @@
     }
     scanPeriod *= 3600 * 1000;
     // initialized when the scanner thread is started.
+
+    dirScanner = new DirectoryScanner(dataset, conf);
   }
   
   private synchronized boolean isInitiliazed() {
@@ -586,6 +597,10 @@
             startNewPeriod();
           }
         }
+        if (dirScanner.newScanPeriod(now)) {
+          dirScanner.reconcile();
+          now = System.currentTimeMillis();
+        }
         if ( (now - getEarliestScanTime()) >= scanPeriod ) {
           verifyFirstBlock();
         } else {
@@ -940,7 +955,6 @@
   }
   
   public static class Servlet extends HttpServlet {
-    
     public void doGet(HttpServletRequest request, 
                       HttpServletResponse response) throws IOException {
       

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=764031&r1=764030&r2=764031&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Apr 10 20:14:14 2009
@@ -699,7 +699,6 @@
     //
     // Now loop for a long time....
     //
-
     while (shouldRun) {
       try {
         long startTime = now();
@@ -729,73 +728,10 @@
             continue;
         }
             
-        // check if there are newly received blocks
-        Block [] blockArray=null;
-        String [] delHintArray=null;
-        synchronized(receivedBlockList) {
-          synchronized(delHints) {
-            int numBlocks = receivedBlockList.size();
-            if (numBlocks > 0) {
-              if(numBlocks!=delHints.size()) {
-                LOG.warn("Panic: receiveBlockList and delHints are not of the same length" );
-              }
-              //
-              // Send newly-received blockids to namenode
-              //
-              blockArray = receivedBlockList.toArray(new Block[numBlocks]);
-              delHintArray = delHints.toArray(new String[numBlocks]);
-            }
-          }
-        }
-        if (blockArray != null) {
-          if(delHintArray == null || delHintArray.length != blockArray.length ) {
-            LOG.warn("Panic: block array & delHintArray are not the same" );
-          }
-          namenode.blockReceived(dnRegistration, blockArray, delHintArray);
-          synchronized (receivedBlockList) {
-            synchronized (delHints) {
-              for(int i=0; i<blockArray.length; i++) {
-                receivedBlockList.remove(blockArray[i]);
-                delHints.remove(delHintArray[i]);
-              }
-            }
-          }
-        }
+        reportReceivedBlocks();
 
-        // send block report
-        if (startTime - lastBlockReport > blockReportInterval) {
-          //
-          // Send latest blockinfo report if timer has expired.
-          // Get back a list of local block(s) that are obsolete
-          // and can be safely GC'ed.
-          //
-          long brStartTime = now();
-          Block[] bReport = data.getBlockReport();
-          DatanodeCommand cmd = namenode.blockReport(dnRegistration,
-                  BlockListAsLongs.convertToArrayLongs(bReport));
-          long brTime = now() - brStartTime;
-          myMetrics.blockReports.inc(brTime);
-          LOG.info("BlockReport of " + bReport.length +
-              " blocks got processed in " + brTime + " msecs");
-          //
-          // If we have sent the first block report, then wait a random
-          // time before we start the periodic block reports.
-          //
-          if (resetBlockReportTime) {
-            lastBlockReport = startTime - R.nextInt((int)(blockReportInterval));
-            resetBlockReportTime = false;
-          } else {
-            /* say the last block report was at 8:20:14. The current report 
-             * should have started around 9:20:14 (default 1 hour interval). 
-             * If current time is :
-             *   1) normal like 9:20:18, next report should be at 10:20:14
-             *   2) unexpected like 11:35:43, next report should be at 12:20:14
-             */
-            lastBlockReport += (now() - lastBlockReport) / 
-                               blockReportInterval * blockReportInterval;
-          }
-          processCommand(cmd);
-        }
+        DatanodeCommand cmd = blockReport();
+        processCommand(cmd);
 
         // start block scanner
         if (blockScanner != null && blockScannerThread == null &&
@@ -926,6 +862,88 @@
     upgradeManager.processUpgradeCommand(comm);
   }
 
+  /**
+   * Report received blocks and delete hints to the Namenode
+   * @throws IOException
+   */
+  private void reportReceivedBlocks() throws IOException {
+    //check if there are newly received blocks
+    Block [] blockArray=null;
+    String [] delHintArray=null;
+    synchronized(receivedBlockList) {
+      synchronized(delHints){
+        int numBlocks = receivedBlockList.size();
+        if (numBlocks > 0) {
+          if(numBlocks!=delHints.size()) {
+            LOG.warn("Panic: receiveBlockList and delHints are not of the same length" );
+          }
+          //
+          // Send newly-received blockids to namenode
+          //
+          blockArray = receivedBlockList.toArray(new Block[numBlocks]);
+          delHintArray = delHints.toArray(new String[numBlocks]);
+        }
+      }
+    }
+    if (blockArray != null) {
+      if(delHintArray == null || delHintArray.length != blockArray.length ) {
+        LOG.warn("Panic: block array & delHintArray are not the same" );
+      }
+      namenode.blockReceived(dnRegistration, blockArray, delHintArray);
+      synchronized(receivedBlockList) {
+        synchronized(delHints){
+          for(int i=0; i<blockArray.length; i++) {
+            receivedBlockList.remove(blockArray[i]);
+            delHints.remove(delHintArray[i]);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Report the list blocks to the Namenode
+   * @throws IOException
+   */
+  private DatanodeCommand blockReport() throws IOException {
+    // send block report
+    DatanodeCommand cmd = null;
+    long startTime = now();
+    if (startTime - lastBlockReport > blockReportInterval) {
+      //
+      // Send latest block report if timer has expired.
+      // Get back a list of local block(s) that are obsolete
+      // and can be safely GC'ed.
+      //
+      long brStartTime = now();
+      Block[] bReport = data.getBlockReport();
+
+      cmd = namenode.blockReport(dnRegistration,
+              BlockListAsLongs.convertToArrayLongs(bReport));
+      long brTime = now() - brStartTime;
+      myMetrics.blockReports.inc(brTime);
+      LOG.info("BlockReport of " + bReport.length +
+          " blocks got processed in " + brTime + " msecs");
+      //
+      // If we have sent the first block report, then wait a random
+      // time before we start the periodic block reports.
+      //
+      if (resetBlockReportTime) {
+        lastBlockReport = startTime - R.nextInt((int)(blockReportInterval));
+        resetBlockReportTime = false;
+      } else {
+        /* say the last block report was at 8:20:14. The current report
+         * should have started around 9:20:14 (default 1 hour interval).
+         * If current time is :
+         *   1) normal like 9:20:18, next report should be at 10:20:14
+         *   2) unexpected like 11:35:43, next report should be at 12:20:14
+         */
+        lastBlockReport += (now() - lastBlockReport) /
+                           blockReportInterval * blockReportInterval;
+      }
+    }
+    return cmd;
+  }
 
   /**
    * Start distributed upgrade if it should be initiated by the data-node.

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java?rev=764031&r1=764030&r2=764031&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DatanodeBlockInfo.java Fri Apr 10 20:14:14 2009
@@ -58,6 +58,10 @@
     return file;
   }
 
+  void setFile(File f) {
+    file = f;
+  }
+
   /**
    * Is this block already detached?
    */

Added: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java?rev=764031&view=auto
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java (added)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java Fri Apr 10 20:14:14 2009
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+
+/**
+ * Periodically scans the data directories for block and block metadata files.
+ * Reconciles the differences with block information maintained in
+ * {@link FSDataset}
+ */
+public class DirectoryScanner {
+  private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
+  private static final int DEFAULT_SCAN_INTERVAL = 21600;
+
+  private final FSDataset dataset;
+  private long scanPeriod;
+  private long lastScanTime;
+
+  LinkedList<ScanInfo> diff = new LinkedList<ScanInfo>();
+
+  /** Stats tracked for reporting and testing */
+  long totalBlocks;
+  long missingMetaFile;
+  long missingBlockFile;
+  long missingMemoryBlocks;
+  long mismatchBlocks;
+
+  /**
+   * Tracks the files and other information related to a block on the disk
+   * Missing file is indicated by setting the corresponding member
+   * to null.
+   */
+  static class ScanInfo implements Comparable<ScanInfo> {
+    private final long blockId;
+    private final File metaFile;
+    private final File blockFile;
+    private final FSVolume volume;
+
+    ScanInfo(long blockId) {
+      this(blockId, null, null, null);
+    }
+
+    ScanInfo(long blockId, File blockFile, File metaFile, FSVolume vol) {
+      this.blockId = blockId;
+      this.metaFile = metaFile;
+      this.blockFile = blockFile;
+      this.volume = vol;
+    }
+
+    File getMetaFile() {
+      return metaFile;
+    }
+
+    File getBlockFile() {
+      return blockFile;
+    }
+
+    long getBlockId() {
+      return blockId;
+    }
+
+    FSVolume getVolume() {
+      return volume;
+    }
+
+    @Override // Comparable
+    public int compareTo(ScanInfo b) {
+      if (blockId < b.blockId) {
+        return -1;
+      } else if (blockId == b.blockId) {
+        return 0;
+      } else {
+        return 1;
+      }
+    }
+
+    @Override // Object
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof ScanInfo)) {
+        return false;
+      }
+      return blockId == ((ScanInfo) o).blockId;
+    }
+
+    @Override // Object
+    public int hashCode() {
+      return 37 * 17 + (int) (blockId^(blockId>>>32));
+    }
+
+    public long getGenStamp() {
+      return metaFile != null ? Block.getGenerationStamp(metaFile.getName()) :
+        Block.GRANDFATHER_GENERATION_STAMP;
+    }
+  }
+
+  DirectoryScanner(FSDataset dataset, Configuration conf) {
+    this.dataset = dataset;
+    int interval = conf.getInt("dfs.datanode.directoryscan.interval",
+        DEFAULT_SCAN_INTERVAL);
+    scanPeriod = interval * 1000L;
+
+    Random rand = new Random();
+    lastScanTime = System.currentTimeMillis() - (rand.nextInt(interval) * 1000L);
+    LOG.info("scan starts at " + (lastScanTime + scanPeriod)
+        + " with interval " + scanPeriod);
+  }
+
+  boolean newScanPeriod(long now) {
+    return now > lastScanTime + scanPeriod;
+  }
+
+  private void clear() {
+    diff.clear();
+    totalBlocks = 0;
+    missingMetaFile = 0;
+    missingBlockFile = 0;
+    missingMemoryBlocks = 0;
+    mismatchBlocks = 0;
+  }
+
+  /**
+   * Reconcile differences between disk and in-memory blocks
+   */
+  void reconcile() {
+    scan();
+    for (ScanInfo info : diff) {
+      dataset.checkAndUpdate(info.getBlockId(), info.getBlockFile(), info
+          .getMetaFile(), info.getVolume());
+    }
+  }
+
+  /**
+   * Scan for the differences between disk and in-memory blocks
+   */
+  void scan() {
+    clear();
+    ScanInfo[] diskReport = getDiskReport();
+    totalBlocks = diskReport.length;
+
+    // Hold FSDataset lock to prevent further changes to the block map
+    synchronized(dataset) {
+      Block[] memReport = dataset.getBlockList(false);
+      Arrays.sort(memReport); // Sort based on blockId
+
+      int d = 0; // index for diskReport
+      int m = 0; // index for memReprot
+      while (m < memReport.length && d < diskReport.length) {
+        Block memBlock = memReport[Math.min(m, memReport.length - 1)];
+        ScanInfo info = diskReport[Math.min(d, diskReport.length - 1)];
+        if (info.getBlockId() < memBlock.getBlockId()) {
+          // Block is missing in memory
+          missingMemoryBlocks++;
+          addDifference(info);
+          d++;
+          continue;
+        }
+        if (info.getBlockId() > memBlock.getBlockId()) {
+          // Block is missing on the disk
+          addDifference(memBlock.getBlockId());
+          m++;
+          continue;
+        }
+        // Block file and/or metadata file exists on the disk
+        // Block exists in memory
+        if (info.getBlockFile() == null) {
+          // Block metadata file exits and block file is missing
+          addDifference(info);
+        } else if (info.getGenStamp() != memBlock.getGenerationStamp()
+            || info.getBlockFile().length() != memBlock.getNumBytes()) {
+          mismatchBlocks++;
+          addDifference(info);
+        }
+        d++;
+        m++;
+      }
+      while (m < memReport.length) {
+        addDifference(memReport[m++].getBlockId());
+      }
+      while (d < diskReport.length) {
+        missingMemoryBlocks++;
+        addDifference(diskReport[d++]);
+      }
+    }
+    LOG.info("Total blocks: " + totalBlocks + ", missing metadata files:"
+        + missingMetaFile + ", missing block files:" + missingBlockFile
+        + ", missing blocks in memory:" + missingMemoryBlocks
+        + ", mismatched blocks:" + mismatchBlocks);
+    lastScanTime = System.currentTimeMillis();
+  }
+
+  /**
+   * Block is found on the disk. In-memory block is missing or does not match
+   * the block on the disk
+   */
+  private void addDifference(ScanInfo info) {
+    missingMetaFile += info.getMetaFile() == null ? 1 : 0;
+    missingBlockFile += info.getBlockFile() == null ? 1 : 0;
+    diff.add(info);
+  }
+
+  /** Block is not found on the disk */
+  private void addDifference(long blockId) {
+    missingBlockFile++;
+    missingMetaFile++;
+    diff.add(new ScanInfo(blockId));
+  }
+
+  /** Get list of blocks on the disk sorted by blockId */
+  private ScanInfo[] getDiskReport() {
+    // First get list of data directories
+    FSDataset.FSVolume[] volumes = dataset.volumes.volumes;
+    ArrayList<LinkedList<ScanInfo>> dirReports =
+      new ArrayList<LinkedList<ScanInfo>>(volumes.length);
+    for (int i = 0; i < volumes.length; i++) {
+      if (!dataset.volumes.isValid(volumes[i])) { // volume is still valid
+        dirReports.add(i, null);
+      } else {
+        LinkedList<ScanInfo> dirReport = new LinkedList<ScanInfo>();
+        dirReports.add(i, compileReport(volumes[i], volumes[i].getDir(),
+            dirReport));
+      }
+    }
+
+    // Compile consolidated report for all the volumes
+    LinkedList<ScanInfo> list = new LinkedList<ScanInfo>();
+    for (int i = 0; i < volumes.length; i++) {
+      if (dataset.volumes.isValid(volumes[i])) { // volume is still valid
+        list.addAll(dirReports.get(i));
+      }
+    }
+
+    ScanInfo[] report = list.toArray(new ScanInfo[list.size()]);
+    // Sort the report based on blockId
+    Arrays.sort(report);
+    return report;
+  }
+
+  private static boolean isBlockMetaFile(String blockId, String metaFile) {
+    return metaFile.startsWith(blockId)
+        && metaFile.endsWith(Block.METADATA_EXTENSION);
+  }
+
+  /** Compile list {@link ScanInfo} for the blocks in the directory <dir>*/
+  private LinkedList<ScanInfo> compileReport(FSVolume vol, File dir,
+      LinkedList<ScanInfo> report) {
+    File[] files = dir.listFiles();
+    Arrays.sort(files);
+
+    /* Assumption: In the sorted list of files block file appears immediately
+     * before block metadata file. This is true for the current naming
+     * convention for block file blk_<blockid> and meta file
+     * blk_<blockid>_<genstamp>.meta
+     */
+    for (int i = 0; i < files.length; i++) {
+      if (files[i].isDirectory()) {
+        compileReport(vol, files[i], report);
+        continue;
+      }
+      if (!Block.isBlockFilename(files[i])) {
+        if (isBlockMetaFile("blk_", files[i].getName())) {
+          long blockId = Block.getBlockId(files[i].getName());
+          report.add(new ScanInfo(blockId, null, files[i], vol));
+        }
+        continue;
+      }
+      File blockFile = files[i];
+      long blockId = Block.filename2id(blockFile.getName());
+      File metaFile = null;
+
+      // Skip all the files that start with block name until
+      // getting to the metafile for the block
+      while (i + 1 < files.length
+          && files[i+1].isFile()
+          && files[i + 1].getName().startsWith(blockFile.getName())) {
+        i++;
+        if (isBlockMetaFile(blockFile.getName(), files[i].getName())) {
+          metaFile = files[i];
+          break;
+        }
+      }
+      report.add(new ScanInfo(blockId, blockFile, metaFile, vol));
+    }
+    return report;
+  }
+}

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=764031&r1=764030&r2=764031&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Fri Apr 10 20:14:14 2009
@@ -26,10 +26,13 @@
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.metrics.util.MBeanUtil;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.conf.*;
@@ -158,41 +161,16 @@
         if (!path.startsWith(blockName)) {
           continue;
         }
-        String[] vals = path.split("_");
-        if (vals.length != 3) {     // blk, blkid, genstamp.meta
+        if (blockFile == listdir[j]) {
           continue;
         }
-        String[] str = vals[2].split("\\.");
-        if (str.length != 2) {
-          continue;
-        }
-        return Long.parseLong(str[0]);
+        return Block.getGenerationStamp(listdir[j].getName());
       }
       DataNode.LOG.warn("Block " + blockFile + 
                         " does not have a metafile!");
       return Block.GRANDFATHER_GENERATION_STAMP;
     }
 
-    /**
-     * Populate the given blockSet with any child blocks
-     * found at this node.
-     */
-    public void getBlockInfo(TreeSet<Block> blockSet) {
-      if (children != null) {
-        for (int i = 0; i < children.length; i++) {
-          children[i].getBlockInfo(blockSet);
-        }
-      }
-
-      File blockFiles[] = dir.listFiles();
-      for (int i = 0; i < blockFiles.length; i++) {
-        if (Block.isBlockFilename(blockFiles[i])) {
-          long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);
-          blockSet.add(new Block(blockFiles[i], blockFiles[i].length(), genStamp));
-        }
-      }
-    }
-
     void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap, FSVolume volume) {
       if (children != null) {
         for (int i = 0; i < children.length; i++) {
@@ -425,10 +403,6 @@
       DiskChecker.checkDir(tmpDir);
     }
       
-    void getBlockInfo(TreeSet<Block> blockSet) {
-      dataDir.getBlockInfo(blockSet);
-    }
-      
     void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap) {
       dataDir.getVolumeMap(volumeMap, this);
     }
@@ -506,7 +480,7 @@
       return dfsUsed;
     }
 
-    synchronized long getCapacity() throws IOException {
+    long getCapacity() throws IOException {
       long capacity = 0L;
       for (int idx = 0; idx < volumes.length; idx++) {
         capacity += volumes[idx].getCapacity();
@@ -514,7 +488,7 @@
       return capacity;
     }
       
-    synchronized long getRemaining() throws IOException {
+    long getRemaining() throws IOException {
       long remaining = 0L;
       for (int idx = 0; idx < volumes.length; idx++) {
         remaining += volumes[idx].getAvailable();
@@ -522,12 +496,6 @@
       return remaining;
     }
       
-    synchronized void getBlockInfo(TreeSet<Block> blockSet) {
-      for (int idx = 0; idx < volumes.length; idx++) {
-        volumes[idx].getBlockInfo(blockSet);
-      }
-    }
-      
     synchronized void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap) {
       for (int idx = 0; idx < volumes.length; idx++) {
         volumes[idx].getVolumeMap(volumeMap);
@@ -548,6 +516,15 @@
       }
       return sb.toString();
     }
+
+    public boolean isValid(FSVolume volume) {
+      for (int idx = 0; idx < volumes.length; idx++) {
+        if (volumes[idx] == volume) {
+          return true;
+        }
+      }
+      return false;
+    }
   }
   
   //////////////////////////////////////////////////////
@@ -676,9 +653,12 @@
   FSVolumeSet volumes;
   private HashMap<Block,ActiveFile> ongoingCreates = new HashMap<Block,ActiveFile>();
   private int maxBlocksPerDir = 0;
-  private HashMap<Block,DatanodeBlockInfo> volumeMap = null;
+  HashMap<Block,DatanodeBlockInfo> volumeMap = null;
   static  Random random = new Random();
-  
+
+  // Used for synchronizing access to usage stats
+  private Object statsLock = new Object();
+
   /**
    * An FSDataset has a directory where it loads its data files.
    */
@@ -698,21 +678,27 @@
    * Return the total space used by dfs datanode
    */
   public long getDfsUsed() throws IOException {
-    return volumes.getDfsUsed();
+    synchronized(statsLock) {
+      return volumes.getDfsUsed();
+    }
   }
   
   /**
    * Return total capacity, used and unused
    */
   public long getCapacity() throws IOException {
-    return volumes.getCapacity();
+    synchronized(statsLock) {
+      return volumes.getCapacity();
+    }
   }
 
   /**
    * Return how many bytes can still be stored in the FSDataset
    */
   public long getRemaining() throws IOException {
-    return volumes.getRemaining();
+    synchronized(statsLock) {
+      return volumes.getRemaining();
+    }
   }
 
   /**
@@ -1207,19 +1193,36 @@
     }
     return true;
   }
-  
+
   /**
-   * Return a table of block data
+   * Return finalized blocks from the in-memory blockmap
    */
   public Block[] getBlockReport() {
-    TreeSet<Block> blockSet = new TreeSet<Block>();
-    volumes.getBlockInfo(blockSet);
-    Block blockTable[] = new Block[blockSet.size()];
-    int i = 0;
-    for (Iterator<Block> it = blockSet.iterator(); it.hasNext(); i++) {
-      blockTable[i] = it.next();
+    ArrayList<Block> list =  new ArrayList<Block>(volumeMap.size());
+    synchronized(this) {
+      for (Block b : volumeMap.keySet()) {
+        if (!ongoingCreates.containsKey(b)) {
+          list.add(new Block(b));
+        }
+      }
+    }
+    return list.toArray(new Block[list.size()]);
+  }
+
+  /**
+   * Get the block list from in-memory blockmap. Note if <deepcopy>
+   * is false, reference to the block in the volumeMap is returned. This block
+   * should not be changed. Suitable synchronization using {@link FSDataset}
+   * is needed to handle concurrent modification to the block.
+   */
+  synchronized Block[] getBlockList(boolean deepcopy) {
+    Block[] list = volumeMap.keySet().toArray(new Block[volumeMap.size()]);
+    if (deepcopy) {
+      for (int i = 0; i < list.length; i++) {
+        list[i] = new Block(list[i]);
+      }
     }
-    return blockTable;
+    return list;
   }
 
   /**
@@ -1430,4 +1433,190 @@
   public String getStorageInfo() {
     return toString();
   }
+
+  /**
+   * Reconcile the difference between blocks on the disk and blocks in
+   * volumeMap
+   *
+   * Check the given block for inconsistencies. Look at the
+   * current state of the block and reconcile the differences as follows:
+   * <ul>
+   * <li>If the block file is missing, delete the block from volumeMap</li>
+   * <li>If the block file exists and the block is missing in volumeMap,
+   * add the block to volumeMap <li>
+   * <li>If generation stamp does not match, then update the block with right
+   * generation stamp</li>
+   * <li>If the block length in memory does not match the actual block file length
+   * then mark the block as corrupt and update the block length in memory</li>
+   * <li>If the file in {@link DatanodeBlockInfo} does not match the file on
+   * the disk, update {@link DatanodeBlockInfo} with the correct file</li>
+   * </ul>
+   *
+   * @param blockId Block that differs
+   * @param diskFile Block file on the disk
+   * @param diskMetaFile Metadata file from on the disk
+   * @param vol Volume of the block file
+   */
+  public void checkAndUpdate(long blockId, File diskFile,
+      File diskMetaFile, FSVolume vol) {
+    Block block = new Block(blockId);
+    DataNode datanode = DataNode.getDataNode();
+    Block corruptBlock = null;
+    synchronized (this) {
+      if (ongoingCreates.get(block) != null) {
+        // Block is not finalized - ignore the difference
+        return;
+      }
+
+      final long diskGS = diskMetaFile != null && diskMetaFile.exists() ?
+          Block.getGenerationStamp(diskMetaFile.getName()) :
+            Block.GRANDFATHER_GENERATION_STAMP;
+
+      DatanodeBlockInfo memBlockInfo = volumeMap.get(block);
+      if (diskFile == null || !diskFile.exists()) {
+        if (memBlockInfo == null) {
+          // Block file does not exist and block does not exist in memory
+          // If metadata file exists then delete it
+          if (diskMetaFile != null && diskMetaFile.exists()
+              && diskMetaFile.delete()) {
+            DataNode.LOG.warn("Deleted a metadata file without a block "
+                + diskMetaFile.getAbsolutePath());
+          }
+          return;
+        }
+        if (!memBlockInfo.getFile().exists()) {
+          // Block is in memory and not on the disk
+          // Remove the block from volumeMap
+          volumeMap.remove(block);
+          if (datanode.blockScanner != null) {
+            datanode.blockScanner.deleteBlock(block);
+          }
+          DataNode.LOG.warn("Removed block " + block.getBlockId()
+              + " from memory with missing block file on the disk");
+          // Finally remove the metadata file
+          if (diskMetaFile != null && diskMetaFile.exists()
+              && diskMetaFile.delete()) {
+            DataNode.LOG.warn("Deleted a metadata file for the deleted block "
+                + diskMetaFile.getAbsolutePath());
+          }
+        }
+        return;
+      }
+      /*
+       * Block file exists on the disk
+       */
+      if (memBlockInfo == null) {
+        // Block is missing in memory - add the block to volumeMap
+        DatanodeBlockInfo diskBlockInfo = new DatanodeBlockInfo(vol, diskFile);
+        Block diskBlock = new Block(diskFile, diskFile.length(), diskGS);
+        volumeMap.put(diskBlock, diskBlockInfo);
+        if (datanode.blockScanner != null) {
+          datanode.blockScanner.addBlock(diskBlock);
+        }
+        DataNode.LOG.warn("Added missing block to memory " + diskBlock);
+        return;
+      }
+      /*
+       * Block exists in volumeMap and the block file exists on the disk
+       */
+      // Iterate to get key from volumeMap for the blockId
+      Block memBlock = getBlockKey(blockId);
+
+      // Compare block files
+      File memFile = memBlockInfo.getFile();
+      if (memFile.exists()) {
+        if (memFile.compareTo(diskFile) != 0) {
+          DataNode.LOG.warn("Block file " + memFile.getAbsolutePath()
+              + " does not match file found by scan "
+              + diskFile.getAbsolutePath());
+          // TODO: Should the diskFile be deleted?
+        }
+      } else {
+        // Block refers to a block file that does not exist.
+        // Update the block with the file found on the disk. Since the block
+        // file and metadata file are found as a pair on the disk, update
+        // the block based on the metadata file found on the disk
+        DataNode.LOG.warn("Block file in volumeMap "
+            + memFile.getAbsolutePath()
+            + " does not exist. Updating it to the file found during scan "
+            + diskFile.getAbsolutePath());
+        DatanodeBlockInfo info = volumeMap.remove(memBlock);
+        info.setFile(diskFile);
+        memFile = diskFile;
+
+        DataNode.LOG.warn("Updating generation stamp for block " + blockId
+            + " from " + memBlock.getGenerationStamp() + " to " + diskGS);
+        memBlock.setGenerationStamp(diskGS);
+        volumeMap.put(memBlock, info);
+      }
+
+      // Compare generation stamp
+      if (memBlock.getGenerationStamp() != diskGS) {
+        File memMetaFile = getMetaFile(diskFile, memBlock);
+        if (memMetaFile.exists()) {
+          if (memMetaFile.compareTo(diskMetaFile) != 0) {
+            DataNode.LOG.warn("Metadata file in memory "
+                + memMetaFile.getAbsolutePath()
+                + " does not match file found by scan "
+                + diskMetaFile.getAbsolutePath());
+          }
+        } else {
+          // Metadata file corresponding to block in memory is missing
+          // If metadata file found during the scan is on the same directory
+          // as the block file, then use the generation stamp from it
+          long gs = diskMetaFile != null && diskMetaFile.exists()
+              && diskMetaFile.getParent().equals(memFile.getParent()) ? diskGS
+              : Block.GRANDFATHER_GENERATION_STAMP;
+
+          DataNode.LOG.warn("Updating generation stamp for block " + blockId
+              + " from " + memBlock.getGenerationStamp() + " to " + gs);
+
+          DatanodeBlockInfo info = volumeMap.remove(memBlock);
+          memBlock.setGenerationStamp(gs);
+          volumeMap.put(memBlock, info);
+        }
+      }
+
+      // Compare block size
+      if (memBlock.getNumBytes() != memFile.length()) {
+        // Update the length based on the block file
+        corruptBlock = new Block(memBlock);
+        DataNode.LOG.warn("Updating size of block " + blockId + " from "
+            + memBlock.getNumBytes() + " to " + memFile.length());
+        DatanodeBlockInfo info = volumeMap.remove(memBlock);
+        memBlock.setNumBytes(memFile.length());
+        volumeMap.put(memBlock, info);
+      }
+    }
+
+    // Send corrupt block report outside the lock
+    if (corruptBlock != null) {
+      DatanodeInfo[] dnArr = { new DatanodeInfo(datanode.dnRegistration) };
+      LocatedBlock[] blocks = { new LocatedBlock(corruptBlock, dnArr) };
+      try {
+        datanode.namenode.reportBadBlocks(blocks);
+        DataNode.LOG.warn("Reporting the block " + corruptBlock
+            + " as corrupt due to length mismatch");
+      } catch (IOException e) {
+        DataNode.LOG.warn("Failed to repot bad block " + corruptBlock
+            + "Exception:" + StringUtils.stringifyException(e));
+      }
+    }
+  }
+
+  /**
+   * Get reference to the key in the volumeMap. To be called from methods that
+   * are synchronized on {@link FSDataset}
+   * @param blockId
+   * @return key from the volumeMap
+   */
+  Block getBlockKey(long blockId) {
+    assert(Thread.holdsLock(this));
+    for (Block b : volumeMap.keySet()) {
+      if (b.getBlockId() == blockId) {
+        return b;
+      }
+    }
+    return null;
+  }
 }

Added: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java?rev=764031&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java Fri Apr 10 20:14:14 2009
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.util.Random;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+
+import junit.framework.TestCase;
+
+/**
+ * Tests {@link DirectoryScanner} handling of differences
+ * between blocks on the disk and block in memory.
+ */
+public class TestDirectoryScanner extends TestCase {
+  private static final Log LOG = LogFactory.getLog(TestDirectoryScanner.class);
+  private static final Configuration CONF = new Configuration();
+  private static final int DEFAULT_GEN_STAMP = 9999;
+
+  private MiniDFSCluster cluster;
+  private FSDataset fds = null;
+  private DirectoryScanner scanner = null;
+  private Random rand = new Random();
+  private Random r = new Random();
+
+  static {
+    CONF.setLong("dfs.block.size", 100);
+    CONF.setInt("io.bytes.per.checksum", 1);
+    CONF.setLong("dfs.heartbeat.interval", 1L);
+  }
+
+  /** create a file with a length of <code>fileLen</code> */
+  private void createFile(String fileName, long fileLen) throws IOException {
+    FileSystem fs = cluster.getFileSystem();
+    Path filePath = new Path(fileName);
+    DFSTestUtil.createFile(fs, filePath, fileLen, (short) 1, r.nextLong());
+  }
+
+  /** Truncate a block file */
+  private long truncateBlockFile() throws IOException {
+    synchronized (fds) {
+      for (Entry<Block, DatanodeBlockInfo> entry : fds.volumeMap.entrySet()) {
+        Block b = entry.getKey();
+        File f = entry.getValue().getFile();
+        File mf = FSDataset.getMetaFile(f, b);
+        // Truncate a block file that has a corresponding metadata file
+        if (f.exists() && f.length() != 0 && mf.exists()) {
+          FileOutputStream s = new FileOutputStream(f);
+          FileChannel channel = s.getChannel();
+          channel.truncate(0);
+          LOG.info("Truncated block file " + f.getAbsolutePath());
+          return entry.getKey().getBlockId();
+        }
+      }
+    }
+    return 0;
+  }
+
+  /** Delete a block file */
+  private long deleteBlockFile() {
+    synchronized(fds) {
+      for (Entry<Block, DatanodeBlockInfo> entry : fds.volumeMap.entrySet()) {
+        Block b = entry.getKey();
+        File f = entry.getValue().getFile();
+        File mf = FSDataset.getMetaFile(f, b);
+        // Delete a block file that has corresponding metadata file
+        if (f.exists() && mf.exists() && f.delete()) {
+          LOG.info("Deleting block file " + f.getAbsolutePath());
+          return entry.getKey().getBlockId();
+        }
+      }
+    }
+    return 0;
+  }
+
+  /** Delete block meta file */
+  private long deleteMetaFile() {
+    synchronized(fds) {
+      for (Entry<Block, DatanodeBlockInfo> entry : fds.volumeMap.entrySet()) {
+        Block b = entry.getKey();
+        String blkfile = entry.getValue().getFile().getAbsolutePath();
+        long genStamp = b.getGenerationStamp();
+        String metafile = FSDataset.getMetaFileName(blkfile, genStamp);
+        File file = new File(metafile);
+        // Delete a metadata file
+        if (file.exists() && file.delete()) {
+          LOG.info("Deleting metadata file " + file.getAbsolutePath());
+          return entry.getKey().getBlockId();
+        }
+      }
+    }
+    return 0;
+  }
+
+  /** Get a random blockId that is not used already */
+  private long getFreeBlockId() {
+    long id = rand.nextLong();
+    while (true) {
+      id = rand.nextLong();
+      Block b = new Block(id);
+      DatanodeBlockInfo info = null;
+      synchronized(fds) {
+        info = fds.volumeMap.get(b);
+      }
+      if (info == null) {
+        break;
+      }
+    }
+    return id;
+  }
+
+  private String getBlockFile(long id) {
+    return Block.BLOCK_FILE_PREFIX + id;
+  }
+
+  private String getMetaFile(long id) {
+    return Block.BLOCK_FILE_PREFIX + id + "_" + DEFAULT_GEN_STAMP
+        + Block.METADATA_EXTENSION;
+  }
+
+  /** Create a block file in a random volume*/
+  private long createBlockFile() throws IOException {
+    FSVolume[] volumes = fds.volumes.volumes;
+    int index = rand.nextInt(volumes.length - 1);
+    long id = getFreeBlockId();
+    File file = new File(volumes[index].getDir().getPath(), getBlockFile(id));
+    if (file.createNewFile()) {
+      LOG.info("Created block file " + file.getName());
+    }
+    return id;
+  }
+
+  /** Create a metafile in a random volume*/
+  private long createMetaFile() throws IOException {
+    FSVolume[] volumes = fds.volumes.volumes;
+    int index = rand.nextInt(volumes.length - 1);
+    long id = getFreeBlockId();
+    File file = new File(volumes[index].getDir().getPath(), getMetaFile(id));
+    if (file.createNewFile()) {
+      LOG.info("Created metafile " + file.getName());
+    }
+    return id;
+  }
+
+  /** Create block file and corresponding metafile in a rondom volume */
+  private long createBlockMetaFile() throws IOException {
+    FSVolume[] volumes = fds.volumes.volumes;
+    int index = rand.nextInt(volumes.length - 1);
+    long id = getFreeBlockId();
+    File file = new File(volumes[index].getDir().getPath(), getBlockFile(id));
+    if (file.createNewFile()) {
+      LOG.info("Created block file " + file.getName());
+
+      // Create files with same prefix as block file but extension names
+      // such that during sorting, these files appear around meta file
+      // to test how DirectoryScanner handles extraneous files
+      String name1 = file.getAbsolutePath() + ".l";
+      String name2 = file.getAbsolutePath() + ".n";
+      file = new File(name1);
+      if (file.createNewFile()) {
+        LOG.info("Created extraneous file " + name1);
+      }
+
+      file = new File(name2);
+      if (file.createNewFile()) {
+        LOG.info("Created extraneous file " + name2);
+      }
+
+      file = new File(volumes[index].getDir().getPath(), getMetaFile(id));
+      if (file.createNewFile()) {
+        LOG.info("Created metafile " + file.getName());
+      }
+    }
+    return id;
+  }
+
+  private void scan(long totalBlocks, int diffsize, long missingMetaFile, long missingBlockFile,
+      long missingMemoryBlocks, long mismatchBlocks) {
+    scanner.reconcile();
+    assertEquals(totalBlocks, scanner.totalBlocks);
+    assertEquals(diffsize, scanner.diff.size());
+    assertEquals(missingMetaFile, scanner.missingMetaFile);
+    assertEquals(missingBlockFile, scanner.missingBlockFile);
+    assertEquals(missingMemoryBlocks, scanner.missingMemoryBlocks);
+    assertEquals(mismatchBlocks, scanner.mismatchBlocks);
+  }
+
+  public void test() throws Exception {
+    cluster = new MiniDFSCluster(CONF, 1, true, null);
+    try {
+      cluster.waitActive();
+      fds = (FSDataset) cluster.getDataNodes().get(0).getFSDataset();
+      scanner = new DirectoryScanner(fds, CONF);
+
+      // Add files with 100 blocks
+      createFile("/tmp/t1", 10000);
+      long totalBlocks = 100;
+
+      // Test1: No difference between in-memory and disk
+      scan(100, 0, 0, 0, 0, 0);
+
+      // Test2: block metafile is missing
+      long blockId = deleteMetaFile();
+      scan(totalBlocks, 1, 1, 0, 0, 1);
+      verifyGenStamp(blockId, Block.GRANDFATHER_GENERATION_STAMP);
+      scan(totalBlocks, 0, 0, 0, 0, 0);
+
+      // Test3: block file is missing
+      blockId = deleteBlockFile();
+      scan(totalBlocks, 1, 0, 1, 0, 0);
+      totalBlocks--;
+      verifyDeletion(blockId);
+      scan(totalBlocks, 0, 0, 0, 0, 0);
+
+      // Test4: A block file exists for which there is no metafile and
+      // a block in memory
+      blockId = createBlockFile();
+      totalBlocks++;
+      scan(totalBlocks, 1, 1, 0, 1, 0);
+      verifyAddition(blockId, Block.GRANDFATHER_GENERATION_STAMP, 0);
+      scan(totalBlocks, 0, 0, 0, 0, 0);
+
+      // Test5: A metafile exists for which there is no block file and
+      // a block in memory
+      blockId = createMetaFile();
+      scan(totalBlocks+1, 1, 0, 1, 1, 0);
+      File metafile = new File(getMetaFile(blockId));
+      assertTrue(!metafile.exists());
+      scan(totalBlocks, 0, 0, 0, 0, 0);
+
+      // Test6: A block file and metafile exists for which there is no block in
+      // memory
+      blockId = createBlockMetaFile();
+      totalBlocks++;
+      scan(totalBlocks, 1, 0, 0, 1, 0);
+      verifyAddition(blockId, DEFAULT_GEN_STAMP, 0);
+      scan(totalBlocks, 0, 0, 0, 0, 0);
+
+      // Test7: Delete bunch of metafiles
+      for (int i = 0; i < 10; i++) {
+        blockId = deleteMetaFile();
+      }
+      scan(totalBlocks, 10, 10, 0, 0, 10);
+      scan(totalBlocks, 0, 0, 0, 0, 0);
+
+      // Test8: Delete bunch of block files
+      for (int i = 0; i < 10; i++) {
+        blockId = deleteBlockFile();
+      }
+      scan(totalBlocks, 10, 0, 10, 0, 0);
+      totalBlocks -= 10;
+      scan(totalBlocks, 0, 0, 0, 0, 0);
+
+      // Test9: create a bunch of blocks files
+      for (int i = 0; i < 10 ; i++) {
+        blockId = createBlockFile();
+      }
+      totalBlocks += 10;
+      scan(totalBlocks, 10, 10, 0, 10, 0);
+      scan(totalBlocks, 0, 0, 0, 0, 0);
+
+      // Test10: create a bunch of metafiles
+      for (int i = 0; i < 10 ; i++) {
+        blockId = createMetaFile();
+      }
+      scan(totalBlocks+10, 10, 0, 10, 10, 0);
+      scan(totalBlocks, 0, 0, 0, 0, 0);
+
+      // Test11: create a bunch block files and meta files
+      for (int i = 0; i < 10 ; i++) {
+        blockId = createBlockMetaFile();
+      }
+      totalBlocks += 10;
+      scan(totalBlocks, 10, 0, 0, 10, 0);
+      scan(totalBlocks, 0, 0, 0, 0, 0);
+
+      // Test12: truncate block files to test block length mismatch
+      for (int i = 0; i < 10 ; i++) {
+        truncateBlockFile();
+      }
+      scan(totalBlocks, 10, 0, 0, 0, 10);
+      scan(totalBlocks, 0, 0, 0, 0, 0);
+
+      // Test13: all the conditions combined
+      createMetaFile();
+      createBlockFile();
+      createBlockMetaFile();
+      deleteMetaFile();
+      deleteBlockFile();
+      truncateBlockFile();
+      scan(totalBlocks+3, 6, 2, 2, 3, 2);
+      scan(totalBlocks+1, 0, 0, 0, 0, 0);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  private void verifyAddition(long blockId, long genStamp, long size) {
+    Block memBlock = fds.getBlockKey(blockId);
+    assertNotNull(memBlock);
+    DatanodeBlockInfo blockInfo;
+    synchronized(fds) {
+      blockInfo = fds.volumeMap.get(memBlock);
+    }
+    assertNotNull(blockInfo);
+
+    // Added block has the same file as the one created by the test
+    File file = new File(getBlockFile(blockId));
+    assertEquals(file.getName(), blockInfo.getFile().getName());
+
+    // Generation stamp is same as that of created file
+    assertEquals(genStamp, memBlock.getGenerationStamp());
+
+    // File size matches
+    assertEquals(size, memBlock.getNumBytes());
+  }
+
+  private void verifyDeletion(long blockId) {
+    // Ensure block does not exist in memory
+    synchronized(fds) {
+      assertEquals(null, fds.volumeMap.get(new Block(blockId)));
+    }
+  }
+
+  private void verifyGenStamp(long blockId, long genStamp) {
+    Block memBlock;
+    synchronized(fds) {
+      memBlock = fds.getBlockKey(blockId);
+    }
+    assertNotNull(memBlock);
+    assertEquals(genStamp, memBlock.getGenerationStamp());
+  }
+}



Mime
View raw message