hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bor...@apache.org
Subject svn commit: r1076051 - in /hadoop/hdfs/branches/HDFS-1052: ./ src/java/org/apache/hadoop/hdfs/server/datanode/ src/java/org/apache/hadoop/hdfs/util/ src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/
Date Tue, 01 Mar 2011 23:43:01 GMT
Author: boryas
Date: Tue Mar  1 23:43:00 2011
New Revision: 1076051

URL: http://svn.apache.org/viewvc?rev=1076051&view=rev
Log:
HDFS-1687. HDFS Federation: DirectoryScanner changes for federation

Added:
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/util/DaemonFactory.java
Modified:
    hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java

Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/CHANGES.txt?rev=1076051&r1=1076050&r2=1076051&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1052/CHANGES.txt Tue Mar  1 23:43:00 2011
@@ -100,6 +100,9 @@ Trunk (unreleased changes)
     HDFS-1672. HDFS Federation: refactor stopDatanode(name) to work 
     with multiple Block Pools (boryas)
 
+    HDFS-1687. HDFS Federation: DirectoryScanner changes for 
+    federation (mattf via boryas)
+
   IMPROVEMENTS
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=1076051&r1=1076050&r2=1076051&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
Tue Mar  1 23:43:00 2011
@@ -104,9 +104,6 @@ class DataBlockScanner implements Runnab
   
   DataTransferThrottler throttler = null;
   
-  // Reconciles blocks on disk to blocks in memory
-  DirectoryScanner dirScanner;
-
   private static enum ScanType {
     REMOTE_READ,           // Verified when a block read by a client etc
     VERIFICATION_SCAN,     // scanned as part of periodic verfication
@@ -153,9 +150,7 @@ class DataBlockScanner implements Runnab
       scanPeriod = DEFAULT_SCAN_PERIOD_HOURS;
     }
     scanPeriod *= 3600 * 1000;
-    // initialized when the scanner thread is started.
-
-    dirScanner = new DirectoryScanner(dataset, conf);
+    LOG.info("Periodic Block Verification scan initialized with interval " + scanPeriod +
".");
   }
   
   private synchronized boolean isInitialized() {
@@ -600,10 +595,6 @@ class DataBlockScanner implements Runnab
             startNewPeriod();
           }
         }
-        if (dirScanner.newScanPeriod(now)) {
-          dirScanner.reconcile();
-          now = System.currentTimeMillis();
-        }
         if ( (now - getEarliestScanTime()) >= scanPeriod ) {
           verifyFirstBlock();
         } else {
@@ -625,7 +616,6 @@ class DataBlockScanner implements Runnab
   synchronized void shutdown() {
     LogFileHandler log = verificationLog;
     verificationLog = null;
-    dirScanner.shutdown();
     if (log != null) {
       log.close();
     }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1076051&r1=1076050&r2=1076051&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Tue Mar  1 23:43:00 2011
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 
 import java.io.BufferedOutputStream;
@@ -344,6 +345,7 @@ public class DataNode extends Configured
   
   public DataBlockScanner blockScanner = null;
   public Daemon blockScannerThread = null;
+  private DirectoryScanner directoryScanner = null;
   
   /** Activated plug-ins. */
   private List<ServicePlugin> plugins;
@@ -494,22 +496,81 @@ public class DataNode extends Configured
     LOG.info("datanodeId = " + datanodeId);
   }
   
-
-  private void initBlockScanner(Configuration conf) {
+/**
+ * Initialize the datanode's periodic scanners:
+ *     {@link DataBlockScanner}
+ *     {@link DirectoryScanner}
+ * They report results on a per-blockpool basis but do their scanning 
+ * on a per-Volume basis to minimize competition for disk iops.
+ * 
+ * @param conf - Configuration has the run intervals and other 
+ *               parameters for these periodic scanners
+ */
+  private void initPeriodicScanners(Configuration conf) {
+    initDataBlockScanner(conf);
+    initDirectoryScanner(conf);
+  }
+  
+  private void shutdownPeriodicScanners() {
+    shutdownDirectoryScanner();
+    shutdownDataBlockScanner();
+  }
+  
+  /**
+   * See {@link DataBlockScanner}
+   */
+  private void initDataBlockScanner(Configuration conf) {
     String reason = null;
-    if (conf.getInt("dfs.datanode.scan.period.hours", 0) < 0) {
+    if (conf.getInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
+                    DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT) < 0) {
       reason = "verification is turned off by configuration";
-    } else if ( !(data instanceof FSDataset) ) {
+    } else if (!(data instanceof FSDataset)) {
       reason = "verifcation is supported only with FSDataset";
     } 
-    if ( reason == null ) {
+    if (reason == null) {
       blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);
     } else {
-      LOG.info("Periodic Block Verification is disabled because " +
+      LOG.info("Periodic Block Verification scan is disabled because " +
                reason + ".");
     }
   }
   
+  private void shutdownDataBlockScanner() {
+    if (blockScannerThread != null) { 
+      blockScannerThread.interrupt();
+      try {
+        blockScannerThread.join(3600000L); // wait for at most 1 hour
+      } catch (InterruptedException ie) {
+      }
+    }
+  }
+  
+  /**
+   * See {@link DirectoryScanner}
+   */
+  private void initDirectoryScanner(Configuration conf) {
+    String reason = null;
+    if (conf.getInt(DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 
+                    DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT) < 0) {
+      reason = "verification is turned off by configuration";
+    } else if (!(data instanceof FSDataset)) {
+      reason = "verification is supported only with FSDataset";
+    } 
+    if (reason == null) {
+      directoryScanner = new DirectoryScanner((FSDataset) data, conf);
+      directoryScanner.start();
+    } else {
+      LOG.info("Periodic Directory Tree Verification scan is disabled because " +
+               reason + ".");
+    }
+  }
+  
+  private void shutdownDirectoryScanner() {
+    if (directoryScanner != null) {
+      directoryScanner.shutdown();
+    }
+  }
+  
   private void initDataXceiver(Configuration conf) throws IOException {
     // construct registration
     InetSocketAddress socAddr = DataNode.getStreamingAddr(conf);
@@ -924,10 +985,17 @@ public class DataNode extends Configured
           processCommand(cmd);
 
           // start block scanner
+          // TODO:FEDERATION - SHOULD BE MOVED OUT OF THE THREAD...
+          // although it will work as is, since it checks (blockScannerThread == null)
+          // under synchronized(blockScanner).  See also {@link initDataBlockScanner()}
+          //
+          // Note that there may be a problem starting this before BPOfferService
+          // is running; and we may need to kick it to restart within each BP subdir
+          // as they come online, otherwise will wait 3 weeks for the first run.
           if (blockScanner != null) {
-            synchronized(blockScanner) { // SHOULD BE MOVED OUT OF THE THREAD.. FEDERATION
+            synchronized(blockScanner) { 
               if(blockScannerThread == null && upgradeManager.isUpgradeCompleted())
{
-                LOG.info("Starting Periodic block scanner.");
+                LOG.info("Periodic Block Verification scan starting.");
                 blockScannerThread = new Daemon(blockScanner);
                 blockScannerThread.start();
               }
@@ -1466,6 +1534,8 @@ public class DataNode extends Configured
       }
     }
     
+    shutdownPeriodicScanners();
+    
     if (infoServer != null) {
       try {
         infoServer.stop();
@@ -1518,13 +1588,7 @@ public class DataNode extends Configured
 
     if(upgradeManager != null)
       upgradeManager.shutdownUpgrade();
-    if (blockScannerThread != null) { 
-      blockScannerThread.interrupt();
-      try {
-        blockScannerThread.join(3600000L); // wait for at most 1 hour
-      } catch (InterruptedException ie) {
-      }
-    }
+        
     if (storage != null) {
       try {
         this.storage.unlockAll();
@@ -1864,7 +1928,7 @@ public class DataNode extends Configured
     // start dataXceiveServer
     dataXceiverServer.start();
     ipcServer.start();
-    initBlockScanner(conf);
+    initPeriodicScanners(conf);
     startPlugins(conf);
     
     // BlockTokenSecretManager is created here, but it shouldn't be

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java?rev=1076051&r1=1076050&r2=1076051&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
Tue Mar  1 23:43:00 2011
@@ -22,12 +22,17 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+import org.apache.hadoop.hdfs.util.DaemonFactory;
 
 /**
  * Periodically scans the data directories for block and block metadata files.
@@ -44,23 +50,102 @@ import org.apache.hadoop.hdfs.server.dat
  * {@link FSDataset}
  */
 @InterfaceAudience.Private
-public class DirectoryScanner {
+public class DirectoryScanner implements Runnable {
   private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
   private static final int DEFAULT_SCAN_INTERVAL = 21600;
 
   private final FSDataset dataset;
-  private long scanPeriod;
-  private long lastScanTime;
-  private ExecutorService reportCompileThreadPool;
-
-  LinkedList<ScanInfo> diff = new LinkedList<ScanInfo>();
-
-  /** Stats tracked for reporting and testing */
-  long totalBlocks;
-  long missingMetaFile;
-  long missingBlockFile;
-  long missingMemoryBlocks;
-  long mismatchBlocks;
+  private final ExecutorService reportCompileThreadPool;
+  private final ScheduledExecutorService masterThread;
+  private final long scanPeriodMsecs;
+  private volatile boolean shouldRun = false;
+  private boolean retainDiffs = false;
+
+  ScanInfoPerBlockPool diffs = new ScanInfoPerBlockPool();
+  Map<String, Stats> stats = new HashMap<String, Stats>();
+  
+  /**
+   * Allow retaining diffs for unit test and analysis
+   * @param b - defaults to false (off)
+   */
+  void setRetainDiffs(boolean b) {
+    retainDiffs = b;
+  }
+
+  /** Stats tracked for reporting and testing, per blockpool */
+  static class Stats {
+    String bpid;
+    long totalBlocks = 0;
+    long missingMetaFile = 0;
+    long missingBlockFile = 0;
+    long missingMemoryBlocks = 0;
+    long mismatchBlocks = 0;
+    
+    public Stats(String bpid) {
+      this.bpid = bpid;
+    }
+    
+    public String toString() {
+      return "BlockPool " + bpid
+      + " Total blocks: " + totalBlocks + ", missing metadata files:"
+      + missingMetaFile + ", missing block files:" + missingBlockFile
+      + ", missing blocks in memory:" + missingMemoryBlocks
+      + ", mismatched blocks:" + mismatchBlocks;
+    }
+  }
+  
+  static class ScanInfoPerBlockPool extends 
+                     HashMap<String, LinkedList<ScanInfo>> {
+    
+    private static final long serialVersionUID = 1L;
+
+    ScanInfoPerBlockPool() {super();}
+    
+    ScanInfoPerBlockPool(int sz) {super(sz);}
+    
+    /**
+     * Merges "that" ScanInfoPerBlockPool into this one
+     * @param that
+     */
+    public void addAll(ScanInfoPerBlockPool that) {
+      if (that == null) return;
+      
+      for (Entry<String, LinkedList<ScanInfo>> entry : that.entrySet()) {
+        String bpid = entry.getKey();
+        LinkedList<ScanInfo> list = entry.getValue();
+        
+        if (this.containsKey(bpid)) {
+          //merge that per-bpid linked list with this one
+          this.get(bpid).addAll(list);
+        } else {
+          //add that new bpid and its linked list to this
+          this.put(bpid, list);
+        }
+      }
+    }
+    
+    /**
+     * Convert all the LinkedList values in this ScanInfoPerBlockPool map
+     * into sorted arrays, and return a new map of these arrays per blockpool
+     * @return
+     */
+    public Map<String, ScanInfo[]> toSortedArrays() {
+      Map<String, ScanInfo[]> result = 
+        new HashMap<String, ScanInfo[]>(this.size());
+      
+      for (Entry<String, LinkedList<ScanInfo>> entry : this.entrySet()) {
+        String bpid = entry.getKey();
+        LinkedList<ScanInfo> list = entry.getValue();
+        
+        // convert list to array
+        ScanInfo[] record = list.toArray(new ScanInfo[list.size()]);
+        // Sort array based on blockId
+        Arrays.sort(record);
+        result.put(bpid, record);            
+      }
+      return result;
+    }
+  }
 
   /**
    * Tracks the files and other information related to a block on the disk
@@ -137,34 +222,80 @@ public class DirectoryScanner {
     this.dataset = dataset;
     int interval = conf.getInt("dfs.datanode.directoryscan.interval",
         DEFAULT_SCAN_INTERVAL);
-    scanPeriod = interval * 1000L;
+    scanPeriodMsecs = interval * 1000L; //msec
     int threads = 
         conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
                     DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT);
 
-    reportCompileThreadPool = Executors.newFixedThreadPool(threads);
+    reportCompileThreadPool = Executors.newFixedThreadPool(threads, 
+        new DaemonFactory());
+    masterThread = new ScheduledThreadPoolExecutor(1, new DaemonFactory());
+  }
 
+  void start() {
+    shouldRun = true;
     Random rand = new Random();
-    lastScanTime = System.currentTimeMillis() - (rand.nextInt(interval) * 1000L);
-    LOG.info("scan starts at " + (lastScanTime + scanPeriod)
-        + " with interval " + scanPeriod);
+    long offset = rand.nextInt((int) (scanPeriodMsecs/1000L)) * 1000L; //msec
+    long firstScanTime = System.currentTimeMillis() + offset;
+    LOG.info("Periodic Directory Tree Verification scan starting at " 
+        + firstScanTime + " with interval " + scanPeriodMsecs);
+    masterThread.scheduleAtFixedRate(this, offset, scanPeriodMsecs, 
+                                     TimeUnit.MILLISECONDS);
   }
-
-  boolean newScanPeriod(long now) {
-    return now > lastScanTime + scanPeriod;
+  
+  // for unit test
+  boolean getRunStatus() {
+    return shouldRun;
   }
 
   private void clear() {
-    diff.clear();
-    totalBlocks = 0;
-    missingMetaFile = 0;
-    missingBlockFile = 0;
-    missingMemoryBlocks = 0;
-    mismatchBlocks = 0;
+    diffs.clear();
+    stats.clear();
+  }
+
+  /**
+   * Main program loop for DirectoryScanner
+   * Runs "reconcile()" periodically under the masterThread.
+   */
+  @Override
+  public void run() {
+    try {
+      if (!shouldRun) {
+        //shutdown has been activated
+        LOG.warn("this cycle terminating immediately because 'shouldRun' has been deactivated");
+        return;
+      }
+      
+      if (!DataNode.getDataNode().upgradeManager.isUpgradeCompleted()) {
+        //If distributed upgrades underway, exit and wait for next cycle.
+        //TODO:FEDERATION update this when Distributed Upgrade is modified for Federation
+        LOG.warn("this cycle terminating immediately because Distributed Upgrade is in process");
+        return; 
+      }
+      
+      //We're are okay to run - do it
+      reconcile();      
+      
+    } catch (Exception e) {
+      //Log and continue - allows Executor to run again next cycle
+      LOG.error("Exception during DirectoryScanner execution - will continue next cycle",
e);
+    } catch (Error er) {
+      //Non-recoverable error - re-throw after logging the problem
+      LOG.error("System Error during DirectoryScanner execution - permanently terminating
periodic scanner", er);
+      throw er;
+    }
   }
   
   void shutdown() {
-    reportCompileThreadPool.shutdown();
+    if (!shouldRun) {
+      LOG.warn("DirectoryScanner: shutdown has been called, but periodic scanner not started");
+    } else {
+      LOG.warn("DirectoryScanner: shutdown has been called");      
+    }
+    shouldRun = false;
+    if (masterThread != null) masterThread.shutdown();
+    if (reportCompileThreadPool != null) reportCompileThreadPool.shutdown();
+    if (!retainDiffs) clear();
   }
 
   /**
@@ -172,113 +303,130 @@ public class DirectoryScanner {
    */
   void reconcile() {
     scan();
-    for (ScanInfo info : diff) {
-      // TODO:FEDERATION use right block pool Id
-      dataset.checkAndUpdate("TODO", info.getBlockId(), info.getBlockFile(),
-          info.getMetaFile(), info.getVolume());
+    for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
+      String bpid = entry.getKey();
+      LinkedList<ScanInfo> diff = entry.getValue();
+      
+      for (ScanInfo info : diff) {
+        dataset.checkAndUpdate(bpid, info.getBlockId(), info.getBlockFile(),
+            info.getMetaFile(), info.getVolume());
+      }
     }
+    if (!retainDiffs) clear();
   }
 
   /**
    * Scan for the differences between disk and in-memory blocks
+   * Scan only the "finalized blocks" lists of both disk and memory.
    */
   void scan() {
     clear();
-    ScanInfo[] diskReport = getDiskReport();
-    totalBlocks = diskReport.length;
+    Map<String, ScanInfo[]> diskReport = getDiskReport();
 
     // Hold FSDataset lock to prevent further changes to the block map
     synchronized(dataset) {
-      // TODO:FEDERATION use right block pool Id
-      Block[] memReport = dataset.getBlockList("TODO", false);
-      Arrays.sort(memReport); // Sort based on blockId
-
-      int d = 0; // index for diskReport
-      int m = 0; // index for memReprot
-      while (m < memReport.length && d < diskReport.length) {
-        Block memBlock = memReport[Math.min(m, memReport.length - 1)];
-        ScanInfo info = diskReport[Math.min(d, diskReport.length - 1)];
-        if (info.getBlockId() < memBlock.getBlockId()) {
-          // Block is missing in memory
-          missingMemoryBlocks++;
-          addDifference(info);
+      for (Entry<String, ScanInfo[]> entry : diskReport.entrySet()) {
+        String bpid = entry.getKey();
+        ScanInfo[] blockpoolReport = entry.getValue();
+        
+        Stats statsRecord = new Stats(bpid);
+        stats.put(bpid, statsRecord);
+        LinkedList<ScanInfo> diffRecord = new LinkedList<ScanInfo>();
+        diffs.put(bpid, diffRecord);
+        
+        statsRecord.totalBlocks = blockpoolReport.length;
+        List<Block> bl = dataset.getFinalizedBlocks(bpid);
+        Block[] memReport = bl.toArray(new Block[bl.size()]);
+        Arrays.sort(memReport); // Sort based on blockId
+  
+        int d = 0; // index for blockpoolReport
+        int m = 0; // index for memReprot
+        while (m < memReport.length && d < blockpoolReport.length) {
+          Block memBlock = memReport[Math.min(m, memReport.length - 1)];
+          ScanInfo info = blockpoolReport[Math.min(
+              d, blockpoolReport.length - 1)];
+          if (info.getBlockId() < memBlock.getBlockId()) {
+            // Block is missing in memory
+            statsRecord.missingMemoryBlocks++;
+            addDifference(diffRecord, statsRecord, info);
+            d++;
+            continue;
+          }
+          if (info.getBlockId() > memBlock.getBlockId()) {
+            // Block is missing on the disk
+            addDifference(diffRecord, statsRecord, memBlock.getBlockId());
+            m++;
+            continue;
+          }
+          // Block file and/or metadata file exists on the disk
+          // Block exists in memory
+          if (info.getBlockFile() == null) {
+            // Block metadata file exits and block file is missing
+            addDifference(diffRecord, statsRecord, info);
+          } else if (info.getGenStamp() != memBlock.getGenerationStamp()
+              || info.getBlockFile().length() != memBlock.getNumBytes()) {
+            // Block metadata file is missing or has wrong generation stamp,
+            // or block file length is different than expected
+            statsRecord.mismatchBlocks++;
+            addDifference(diffRecord, statsRecord, info);
+          }
           d++;
-          continue;
-        }
-        if (info.getBlockId() > memBlock.getBlockId()) {
-          // Block is missing on the disk
-          addDifference(memBlock.getBlockId());
           m++;
-          continue;
         }
-        // Block file and/or metadata file exists on the disk
-        // Block exists in memory
-        if (info.getBlockFile() == null) {
-          // Block metadata file exits and block file is missing
-          addDifference(info);
-        } else if (info.getGenStamp() != memBlock.getGenerationStamp()
-            || info.getBlockFile().length() != memBlock.getNumBytes()) {
-          mismatchBlocks++;
-          addDifference(info);
+        while (m < memReport.length) {
+          addDifference(diffRecord, statsRecord, memReport[m++].getBlockId());
         }
-        d++;
-        m++;
-      }
-      while (m < memReport.length) {
-        addDifference(memReport[m++].getBlockId());
-      }
-      while (d < diskReport.length) {
-        missingMemoryBlocks++;
-        addDifference(diskReport[d++]);
-      }
-    }
-    LOG.info("Total blocks: " + totalBlocks + ", missing metadata files:"
-        + missingMetaFile + ", missing block files:" + missingBlockFile
-        + ", missing blocks in memory:" + missingMemoryBlocks
-        + ", mismatched blocks:" + mismatchBlocks);
-    lastScanTime = System.currentTimeMillis();
+        while (d < blockpoolReport.length) {
+          statsRecord.missingMemoryBlocks++;
+          addDifference(diffRecord, statsRecord, blockpoolReport[d++]);
+        }
+        LOG.info(statsRecord.toString());
+      } //end for
+    } //end synchronized
   }
 
   /**
    * Block is found on the disk. In-memory block is missing or does not match
    * the block on the disk
    */
-  private void addDifference(ScanInfo info) {
-    missingMetaFile += info.getMetaFile() == null ? 1 : 0;
-    missingBlockFile += info.getBlockFile() == null ? 1 : 0;
-    diff.add(info);
+  private void addDifference(LinkedList<ScanInfo> diffRecord, 
+                             Stats statsRecord, ScanInfo info) {
+    statsRecord.missingMetaFile += info.getMetaFile() == null ? 1 : 0;
+    statsRecord.missingBlockFile += info.getBlockFile() == null ? 1 : 0;
+    diffRecord.add(info);
   }
 
   /** Block is not found on the disk */
-  private void addDifference(long blockId) {
-    missingBlockFile++;
-    missingMetaFile++;
-    diff.add(new ScanInfo(blockId));
+  private void addDifference(LinkedList<ScanInfo> diffRecord,
+                             Stats statsRecord, long blockId) {
+    statsRecord.missingBlockFile++;
+    statsRecord.missingMetaFile++;
+    diffRecord.add(new ScanInfo(blockId));
   }
 
-  /** Get list of blocks on the disk sorted by blockId */
-  private ScanInfo[] getDiskReport() {
+  /** Get lists of blocks on the disk sorted by blockId, per blockpool */
+  private Map<String, ScanInfo[]> getDiskReport() {
     // First get list of data directories
     FSDataset.FSVolume[] volumes = dataset.volumes.volumes;
-    ArrayList<LinkedList<ScanInfo>> dirReports =
-      new ArrayList<LinkedList<ScanInfo>>(volumes.length);
+    ArrayList<ScanInfoPerBlockPool> dirReports =
+      new ArrayList<ScanInfoPerBlockPool>(volumes.length);
     
-    Map<Integer, Future<LinkedList<ScanInfo>>> compilersInProgress =
-      new HashMap<Integer, Future<LinkedList<ScanInfo>>>();
+    Map<Integer, Future<ScanInfoPerBlockPool>> compilersInProgress =
+      new HashMap<Integer, Future<ScanInfoPerBlockPool>>();
     for (int i = 0; i < volumes.length; i++) {
       if (!dataset.volumes.isValid(volumes[i])) { // volume is still valid
         dirReports.add(i, null);
       } else {
         ReportCompiler reportCompiler =
-          new ReportCompiler(volumes[i], volumes[i].getDir());
-        Future<LinkedList<ScanInfo>> result = 
+          new ReportCompiler(volumes[i]);
+        Future<ScanInfoPerBlockPool> result = 
           reportCompileThreadPool.submit(reportCompiler);
         compilersInProgress.put(i, result);
       }
     }
     
-    for (Map.Entry<Integer, Future<LinkedList<ScanInfo>>> report :
-      compilersInProgress.entrySet()) {
+    for (Entry<Integer, Future<ScanInfoPerBlockPool>> report :
+        compilersInProgress.entrySet()) {
       try {
         dirReports.add(report.getKey(), report.getValue().get());
       } catch (Exception ex) {
@@ -289,17 +437,14 @@ public class DirectoryScanner {
     }
 
     // Compile consolidated report for all the volumes
-    LinkedList<ScanInfo> list = new LinkedList<ScanInfo>();
+    ScanInfoPerBlockPool list = new ScanInfoPerBlockPool();
     for (int i = 0; i < volumes.length; i++) {
       if (dataset.volumes.isValid(volumes[i])) { // volume is still valid
         list.addAll(dirReports.get(i));
       }
     }
 
-    ScanInfo[] report = list.toArray(new ScanInfo[list.size()]);
-    // Sort the report based on blockId
-    Arrays.sort(report);
-    return report;
+    return list.toSortedArrays();
   }
 
   private static boolean isBlockMetaFile(String blockId, String metaFile) {
@@ -307,19 +452,23 @@ public class DirectoryScanner {
         && metaFile.endsWith(Block.METADATA_EXTENSION);
   }
 
-  private static class ReportCompiler implements Callable<LinkedList<ScanInfo>>
{
+  private static class ReportCompiler 
+  implements Callable<ScanInfoPerBlockPool> {
     private FSVolume volume;
-    private File dir;
 
-    public ReportCompiler(FSVolume volume, File dir) {
-      this.dir = dir;
+    public ReportCompiler(FSVolume volume) {
       this.volume = volume;
     }
 
     @Override
-    public LinkedList<ScanInfo> call() throws Exception {
-      LinkedList<ScanInfo> result = new LinkedList<ScanInfo>();
-      compileReport(volume, dir, result);
+    public ScanInfoPerBlockPool call() throws Exception {
+      String[] bpList = volume.getBlockPoolList();
+      ScanInfoPerBlockPool result = new ScanInfoPerBlockPool(bpList.length);
+      for (String bpid : bpList) {
+        LinkedList<ScanInfo> report = new LinkedList<ScanInfo>();
+        File bpFinalizedDir = volume.getBlockPool(bpid).getFinalizedDir();
+        result.put(bpid, compileReport(volume, bpFinalizedDir, report));
+      }
       return result;
     }
 

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1076051&r1=1076050&r2=1076051&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
Tue Mar  1 23:43:00 2011
@@ -301,7 +301,13 @@ public class FSDataset implements FSCons
   }
 
   /**
-   * Manages block pool directory under {@link FSVolume}
+   * Manages block pool subdirectory under {@link FSVolume}
+   * 
+   * TODO:FEDERATION This class should be renamed, perhaps to "BlockPoolSlice".
+   * The real "block pool" is an abstraction that spans Volumes and
+   * Datanodes.  This "BlockPool" object doesn't represent that.  Rather,
+   * it only represents the concrete portion of a particular block pool 
+   * that resides on a particular Volume.
    */
   class BlockPool {
     private final String bpid;
@@ -619,7 +625,7 @@ public class FSDataset implements FSCons
       return usage.getMount();
     }
     
-    private BlockPool getBlockPool(String bpid) throws IOException {
+    BlockPool getBlockPool(String bpid) throws IOException {
       BlockPool bp = map.get(bpid);
       if (bp == null) {
         // TODO:FEDERATION cleanup this exception
@@ -629,6 +635,15 @@ public class FSDataset implements FSCons
     }
     
     /**
+     * Make a deep copy of the list of currently active BPIDs
+     */
+    String[] getBlockPoolList() {
+      synchronized(FSDataset.this) {
+        return map.keySet().toArray(new String[map.keySet().size()]);   
+      }
+    }
+      
+    /**
      * Temporary files. They get moved to the finalized block directory when
      * the block is finalized.
      */

Added: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/util/DaemonFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/util/DaemonFactory.java?rev=1076051&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/util/DaemonFactory.java
(added)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/util/DaemonFactory.java
Tue Mar  1 23:43:00 2011
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.util;
+
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.Daemon;
+
+/**
+ * Provide a factory for named daemon threads,
+ * for use in ExecutorServices constructors
+ * 
+ * TODO:FEDERATION This should become an inner class of the 
+ * org.apache.hadoop.util.Daemon class
+ * after Federation is united with trunk.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class DaemonFactory extends Daemon implements ThreadFactory {
+
+  public Thread newThread(Runnable runnable) {
+    return new Daemon(runnable);
+  }
+
+}

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java?rev=1076051&r1=1076050&r2=1076051&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
Tue Mar  1 23:43:00 2011
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
+import java.util.LinkedList;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -143,7 +144,8 @@ public class TestDirectoryScanner extend
     FSVolume[] volumes = fds.volumes.volumes;
     int index = rand.nextInt(volumes.length - 1);
     long id = getFreeBlockId();
-    File file = new File(volumes[index].getDir().getPath(), getBlockFile(id));
+    File finalizedDir = volumes[index].getBlockPool(bpid).getFinalizedDir();
+    File file = new File(finalizedDir, getBlockFile(id));
     if (file.createNewFile()) {
       LOG.info("Created block file " + file.getName());
     }
@@ -155,7 +157,8 @@ public class TestDirectoryScanner extend
     FSVolume[] volumes = fds.volumes.volumes;
     int index = rand.nextInt(volumes.length - 1);
     long id = getFreeBlockId();
-    File file = new File(volumes[index].getDir().getPath(), getMetaFile(id));
+    File finalizedDir = volumes[index].getBlockPool(bpid).getFinalizedDir();
+    File file = new File(finalizedDir, getMetaFile(id));
     if (file.createNewFile()) {
       LOG.info("Created metafile " + file.getName());
     }
@@ -167,7 +170,8 @@ public class TestDirectoryScanner extend
     FSVolume[] volumes = fds.volumes.volumes;
     int index = rand.nextInt(volumes.length - 1);
     long id = getFreeBlockId();
-    File file = new File(volumes[index].getDir().getPath(), getBlockFile(id));
+    File finalizedDir = volumes[index].getBlockPool(bpid).getFinalizedDir();
+    File file = new File(finalizedDir, getBlockFile(id));
     if (file.createNewFile()) {
       LOG.info("Created block file " + file.getName());
 
@@ -186,7 +190,7 @@ public class TestDirectoryScanner extend
         LOG.info("Created extraneous file " + name2);
       }
 
-      file = new File(volumes[index].getDir().getPath(), getMetaFile(id));
+      file = new File(finalizedDir, getMetaFile(id));
       if (file.createNewFile()) {
         LOG.info("Created metafile " + file.getName());
       }
@@ -197,12 +201,18 @@ public class TestDirectoryScanner extend
   private void scan(long totalBlocks, int diffsize, long missingMetaFile, long missingBlockFile,
       long missingMemoryBlocks, long mismatchBlocks) {
     scanner.reconcile();
-    assertEquals(totalBlocks, scanner.totalBlocks);
-    assertEquals(diffsize, scanner.diff.size());
-    assertEquals(missingMetaFile, scanner.missingMetaFile);
-    assertEquals(missingBlockFile, scanner.missingBlockFile);
-    assertEquals(missingMemoryBlocks, scanner.missingMemoryBlocks);
-    assertEquals(mismatchBlocks, scanner.mismatchBlocks);
+    
+    assertTrue(scanner.diffs.containsKey(bpid));
+    LinkedList<DirectoryScanner.ScanInfo> diff = scanner.diffs.get(bpid);
+    assertTrue(scanner.stats.containsKey(bpid));
+    DirectoryScanner.Stats stats = scanner.stats.get(bpid);
+    
+    assertEquals(diffsize, diff.size());
+    assertEquals(totalBlocks, stats.totalBlocks);
+    assertEquals(missingMetaFile, stats.missingMetaFile);
+    assertEquals(missingBlockFile, stats.missingBlockFile);
+    assertEquals(missingMemoryBlocks, stats.missingMemoryBlocks);
+    assertEquals(mismatchBlocks, stats.mismatchBlocks);
   }
 
   public void testDirectoryScanner() throws Exception {
@@ -221,6 +231,7 @@ public class TestDirectoryScanner extend
       CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
                   parallelism);
       scanner = new DirectoryScanner(fds, CONF);
+      scanner.setRetainDiffs(true);
 
       // Add files with 100 blocks
       createFile("/tmp/t1", 10000);
@@ -320,7 +331,14 @@ public class TestDirectoryScanner extend
       truncateBlockFile();
       scan(totalBlocks+3, 6, 2, 2, 3, 2);
       scan(totalBlocks+1, 0, 0, 0, 0, 0);
+      
+      // Test14: validate clean shutdown of DirectoryScanner
+      ////assertTrue(scanner.getRunStatus()); //assumes "real" FSDataset, not sim
+      scanner.shutdown();
+      assertFalse(scanner.getRunStatus());
+      
     } finally {
+      scanner.shutdown();
       cluster.shutdown();
     }
   }



Mime
View raw message