Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4CC00200C72 for ; Fri, 28 Apr 2017 00:05:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4B29F160BB2; Thu, 27 Apr 2017 22:05:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C0564160BA7 for ; Fri, 28 Apr 2017 00:04:59 +0200 (CEST) Received: (qmail 47807 invoked by uid 500); 27 Apr 2017 22:04:58 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 47798 invoked by uid 99); 27 Apr 2017 22:04:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 27 Apr 2017 22:04:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BCA45DFC4A; Thu, 27 Apr 2017 22:04:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhz@apache.org To: common-commits@hadoop.apache.org Message-Id: <9bd9b339f54a42debfe61305665b87df@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HDFS-8873. Allow the directoryScanner to be rate-limited. Contributed by Daniel Templeton. Date: Thu, 27 Apr 2017 22:04:58 +0000 (UTC) archived-at: Thu, 27 Apr 2017 22:05:01 -0000 Repository: hadoop Updated Branches: refs/heads/branch-2.7 5e67eb756 -> 8ad9efbe1 HDFS-8873. Allow the directoryScanner to be rate-limited. Contributed by Daniel Templeton. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8ad9efbe Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8ad9efbe Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8ad9efbe Branch: refs/heads/branch-2.7 Commit: 8ad9efbe14a7cda01d410072f65fcb3184876558 Parents: 5e67eb7 Author: Zhe Zhang Authored: Thu Apr 27 15:04:10 2017 -0700 Committer: Zhe Zhang Committed: Thu Apr 27 15:04:10 2017 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 + .../hdfs/server/datanode/DirectoryScanner.java | 344 +++++++++++++++++-- .../src/main/resources/hdfs-default.xml | 20 ++ .../server/datanode/TestDirectoryScanner.java | 226 +++++++++++- 5 files changed, 565 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ad9efbe/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 63f5ba4..5159459 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -75,6 +75,9 @@ Release 2.7.4 - UNRELEASED HDFS-11466. Change dfs.namenode.write-lock-reporting-threshold-ms default from 1000ms to 5000ms. (wang via zhz) + HDFS-8873. Allow the directoryScanner to be rate-limited (Daniel Templeton + via Colin P. McCabe) + OPTIMIZATIONS HDFS-10896. Move lock logging logic from FSNamesystem into FSNamesystemLock. http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ad9efbe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 64186a7..fb41ee1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -481,6 +481,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT = 21600; public static final String DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = "dfs.datanode.directoryscan.threads"; public static final int DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT = 1; + public static final String + DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY = + "dfs.datanode.directoryscan.throttle.limit.ms.per.sec"; + public static final int + DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT = 1000; public static final String DFS_DATANODE_DNS_INTERFACE_KEY = "dfs.datanode.dns.interface"; public static final String DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default"; public static final String DFS_DATANODE_DNS_NAMESERVER_KEY = "dfs.datanode.dns.nameserver"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ad9efbe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index dd864a5..d1a44d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.datanode; +import com.google.common.annotations.VisibleForTesting; import java.io.File; import java.io.IOException; import java.util.Arrays; @@ -32,6 +33,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -47,6 +49,7 @@ import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.StopWatch; import org.apache.hadoop.util.Time; /** @@ -56,27 +59,59 @@ import org.apache.hadoop.util.Time; @InterfaceAudience.Private public class DirectoryScanner implements Runnable { private static final Log LOG = LogFactory.getLog(DirectoryScanner.class); + private static final int MILLIS_PER_SECOND = 1000; + private static final String START_MESSAGE = + "Periodic Directory Tree Verification scan" + + " starting at %dms with interval of %dms"; + private static final String START_MESSAGE_WITH_THROTTLE = START_MESSAGE + + " and throttle limit of %dms/s"; private final FsDatasetSpi dataset; private final ExecutorService reportCompileThreadPool; private final ScheduledExecutorService masterThread; private final long scanPeriodMsecs; + private final int throttleLimitMsPerSec; private volatile boolean shouldRun = false; private boolean retainDiffs = false; private final DataNode datanode; + /** + * Total combined wall clock time (in milliseconds) spent by the report + * compiler threads executing. Used for testing purposes. + */ + @VisibleForTesting + final AtomicLong timeRunningMs = new AtomicLong(0L); + /** + * Total combined wall clock time (in milliseconds) spent by the report + * compiler threads blocked by the throttle. Used for testing purposes. + */ + @VisibleForTesting + final AtomicLong timeWaitingMs = new AtomicLong(0L); + /** + * The complete list of block differences indexed by block pool ID. + */ + @VisibleForTesting final ScanInfoPerBlockPool diffs = new ScanInfoPerBlockPool(); + /** + * Statistics about the block differences in each blockpool, indexed by + * block pool ID. + */ + @VisibleForTesting final Map stats = new HashMap(); /** - * Allow retaining diffs for unit test and analysis - * @param b - defaults to false (off) + * Allow retaining diffs for unit test and analysis. Defaults to false (off) + * @param b whether to retain diffs */ + @VisibleForTesting void setRetainDiffs(boolean b) { retainDiffs = b; } - /** Stats tracked for reporting and testing, per blockpool */ + /** + * Stats tracked for reporting and testing, per blockpool + */ + @VisibleForTesting static class Stats { final String bpid; long totalBlocks = 0; @@ -86,6 +121,10 @@ public class DirectoryScanner implements Runnable { long mismatchBlocks = 0; long duplicateBlocks = 0; + /** + * Create a new Stats object for the given blockpool ID. + * @param bpid blockpool ID + */ public Stats(String bpid) { this.bpid = bpid; } @@ -99,18 +138,32 @@ public class DirectoryScanner implements Runnable { + ", mismatched blocks:" + mismatchBlocks; } } - + + /** + * Helper class for compiling block info reports from report compiler threads. + */ static class ScanInfoPerBlockPool extends HashMap> { private static final long serialVersionUID = 1L; + /** + * Create a new info list. + */ ScanInfoPerBlockPool() {super();} - + + /** + * Create a new info list initialized to the given expected size. + * See {@link java.util.HashMap#HashMap(int)}. + * + * @param sz initial expected size + */ ScanInfoPerBlockPool(int sz) {super(sz);} /** * Merges {@code that} ScanInfoPerBlockPool into this one + * + * @param the ScanInfoPerBlockPool to merge */ public void addAll(ScanInfoPerBlockPool that) { if (that == null) return; @@ -132,6 +185,7 @@ public class DirectoryScanner implements Runnable { /** * Convert all the LinkedList values in this ScanInfoPerBlockPool map * into sorted arrays, and return a new map of these arrays per blockpool + * * @return a map of ScanInfo arrays per blockpool */ public Map toSortedArrays() { @@ -208,6 +262,9 @@ public class DirectoryScanner implements Runnable { * For example, the condensed version of /foo//bar is /foo/bar * Unlike {@link File#getCanonicalPath()}, this will never perform I/O * on the filesystem. + * + * @param path the path to condense + * @return the condensed path */ private static String getCondensedPath(String path) { return CONDENSED_PATH_REGEX.matcher(path). @@ -230,6 +287,15 @@ public class DirectoryScanner implements Runnable { throw new RuntimeException(prefix + " is not a prefix of " + fullPath); } + /** + * Create a ScanInfo object for a block. This constructor will examine + * the block data and meta-data files. + * + * @param blockId the block ID + * @param blockFile the path to the block data file + * @param metaFile the path to the block meta-data file + * @param vol the volume that contains the block + */ ScanInfo(long blockId, File blockFile, File metaFile, FsVolumeSpi vol) { this.blockId = blockId; String condensedVolPath = vol == null ? null : @@ -248,15 +314,31 @@ public class DirectoryScanner implements Runnable { this.volume = vol; } + /** + * Returns the block data file. + * + * @return the block data file + */ File getBlockFile() { return (blockSuffix == null) ? null : new File(volume.getBasePath(), blockSuffix); } + /** + * Return the length of the data block. The length returned is the length + * cached when this object was created. + * + * @return the length of the data block + */ long getBlockFileLength() { return blockFileLength; } + /** + * Returns the block meta data file or null if there isn't one. + * + * @return the block meta data file + */ File getMetaFile() { if (metaSuffix == null) { return null; @@ -267,10 +349,20 @@ public class DirectoryScanner implements Runnable { } } + /** + * Returns the block ID. + * + * @return the block ID + */ long getBlockId() { return blockId; } + /** + * Returns the volume that contains the block that this object describes. + * + * @return the volume + */ FsVolumeSpi getVolume() { return volume; } @@ -309,12 +401,44 @@ public class DirectoryScanner implements Runnable { } } + /** + * Create a new directory scanner, but don't cycle it running yet. + * + * @param datanode the parent datanode + * @param dataset the dataset to scan + * @param conf the Configuration object + */ DirectoryScanner(DataNode datanode, FsDatasetSpi dataset, Configuration conf) { this.datanode = datanode; this.dataset = dataset; int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT); - scanPeriodMsecs = interval * 1000L; //msec + scanPeriodMsecs = interval * MILLIS_PER_SECOND; //msec + + int throttle = + conf.getInt( + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT); + + if ((throttle > MILLIS_PER_SECOND) || (throttle <= 0)) { + if (throttle > MILLIS_PER_SECOND) { + LOG.error( + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY + + " set to value above 1000 ms/sec. Assuming default value of " + + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT); + } else { + LOG.error( + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY + + " set to value below 1 ms/sec. Assuming default value of " + + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT); + } + + throttleLimitMsPerSec = + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT; + } else { + throttleLimitMsPerSec = throttle; + } + int threads = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT); @@ -325,29 +449,50 @@ public class DirectoryScanner implements Runnable { new Daemon.DaemonFactory()); } + /** + * Start the scanner. The scanner will run every + * {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY} seconds. + */ void start() { shouldRun = true; - long offset = DFSUtil.getRandom().nextInt((int) (scanPeriodMsecs/1000L)) * 1000L; //msec + long offset = DFSUtil.getRandom().nextInt( + (int) (scanPeriodMsecs/MILLIS_PER_SECOND)) * MILLIS_PER_SECOND; //msec long firstScanTime = Time.now() + offset; - LOG.info("Periodic Directory Tree Verification scan starting at " - + firstScanTime + " with interval " + scanPeriodMsecs); + String logMsg; + + if (throttleLimitMsPerSec < MILLIS_PER_SECOND) { + logMsg = String.format(START_MESSAGE_WITH_THROTTLE, firstScanTime, + scanPeriodMsecs, throttleLimitMsPerSec); + } else { + logMsg = String.format(START_MESSAGE, firstScanTime, scanPeriodMsecs); + } + + LOG.info(logMsg); masterThread.scheduleAtFixedRate(this, offset, scanPeriodMsecs, TimeUnit.MILLISECONDS); } - // for unit test + /** + * Return whether the scanner has been started. + * + * @return whether the scanner has been started + */ + @VisibleForTesting boolean getRunStatus() { return shouldRun; } + /** + * Clear the current cache of diffs and statistics. + */ private void clear() { diffs.clear(); stats.clear(); } /** - * Main program loop for DirectoryScanner - * Runs "reconcile()" periodically under the masterThread. + * Main program loop for DirectoryScanner. Runs {@link reconcile()} + * and handles any exceptions. */ @Override public void run() { @@ -371,6 +516,12 @@ public class DirectoryScanner implements Runnable { } } + /** + * Stops the directory scanner. This method will wait for 1 minute for the + * main thread to exit and an additional 1 minute for the report compilation + * threads to exit. If a thread does not exit in that time period, it is + * left running, and an error is logged. + */ void shutdown() { if (!shouldRun) { LOG.warn("DirectoryScanner: shutdown has been called, but periodic scanner not started"); @@ -379,7 +530,11 @@ public class DirectoryScanner implements Runnable { } shouldRun = false; if (masterThread != null) masterThread.shutdown(); - if (reportCompileThreadPool != null) reportCompileThreadPool.shutdown(); + + if (reportCompileThreadPool != null) { + reportCompileThreadPool.shutdownNow(); + } + if (masterThread != null) { try { masterThread.awaitTermination(1, TimeUnit.MINUTES); @@ -402,6 +557,7 @@ public class DirectoryScanner implements Runnable { /** * Reconcile differences between disk and in-memory blocks */ + @VisibleForTesting void reconcile() throws IOException { scan(); for (Entry> entry : diffs.entrySet()) { @@ -420,7 +576,7 @@ public class DirectoryScanner implements Runnable { * Scan for the differences between disk and in-memory blocks * Scan only the "finalized blocks" lists of both disk and memory. */ - void scan() { + private void scan() { clear(); Map diskReport = getDiskReport(); @@ -508,8 +664,13 @@ public class DirectoryScanner implements Runnable { } /** - * Block is found on the disk. In-memory block is missing or does not match - * the block on the disk + * Add the ScanInfo object to the list of differences and adjust the stats + * accordingly. This method is called when a block is found on the disk, + * but the in-memory block is missing or does not match the block on the disk. + * + * @param diffRecord the list to which to add the info + * @param statsRecord the stats to update + * @param info the differing info */ private void addDifference(LinkedList diffRecord, Stats statsRecord, ScanInfo info) { @@ -518,7 +679,15 @@ public class DirectoryScanner implements Runnable { diffRecord.add(info); } - /** Block is not found on the disk */ + /** + * Add a new ScanInfo object to the list of differences and adjust the stats + * accordingly. This method is called when a block is not found on the disk. + * + * @param diffRecord the list to which to add the info + * @param statsRecord the stats to update + * @param blockId the id of the missing block + * @param vol the volume that contains the missing block + */ private void addDifference(LinkedList diffRecord, Stats statsRecord, long blockId, FsVolumeSpi vol) { @@ -538,7 +707,13 @@ public class DirectoryScanner implements Runnable { return false; } - /** Get lists of blocks on the disk sorted by blockId, per blockpool */ + /** + * Get the lists of blocks on the disks in the dataset, sorted by blockId. + * The returned map contains one entry per blockpool, keyed by the blockpool + * ID. + * + * @return a map of sorted arrays of block information + */ private Map getDiskReport() { // First get list of data directories final List volumes = dataset.getVolumes(); @@ -554,16 +729,22 @@ public class DirectoryScanner implements Runnable { if (isValid(dataset, volumes.get(i))) { ReportCompiler reportCompiler = new ReportCompiler(datanode,volumes.get(i)); - Future result = + Future result = reportCompileThreadPool.submit(reportCompiler); compilersInProgress.put(i, result); } } - + for (Entry> report : compilersInProgress.entrySet()) { try { dirReports[report.getKey()] = report.getValue().get(); + + // If our compiler threads were interrupted, give up on this run + if (dirReports[report.getKey()] == null) { + dirReports = null; + break; + } } catch (Exception ex) { LOG.error("Error compiling report", ex); // Propagate ex to DataBlockScanner to deal with @@ -573,48 +754,106 @@ public class DirectoryScanner implements Runnable { // Compile consolidated report for all the volumes ScanInfoPerBlockPool list = new ScanInfoPerBlockPool(); - for (int i = 0; i < volumes.size(); i++) { - if (isValid(dataset, volumes.get(i))) { - // volume is still valid - list.addAll(dirReports[i]); + if (dirReports != null) { + for (int i = 0; i < volumes.size(); i++) { + if (isValid(dataset, volumes.get(i))) { + // volume is still valid + list.addAll(dirReports[i]); + } } } return list.toSortedArrays(); } + /** + * Helper method to determine if a file name is consistent with a block. + * meta-data file + * + * @param blockId the block ID + * @param metaFile the file to check + * @return whether the file name is a block meta-data file name + */ private static boolean isBlockMetaFile(String blockId, String metaFile) { return metaFile.startsWith(blockId) && metaFile.endsWith(Block.METADATA_EXTENSION); } - private static class ReportCompiler - implements Callable { + /** + * The ReportCompiler class encapsulates the process of searching a datanode's + * disks for block information. It operates by performing a DFS of the + * volume to discover block information. + * + * When the ReportCompiler discovers block information, it create a new + * ScanInfo object for it and adds that object to its report list. The report + * list is returned by the {@link #call()} method. + */ + private class ReportCompiler implements Callable { private final FsVolumeSpi volume; private final DataNode datanode; + // Variable for tracking time spent running for throttling purposes + private final StopWatch throttleTimer = new StopWatch(); + // Variable for tracking time spent running and waiting for testing + // purposes + private final StopWatch perfTimer = new StopWatch(); + /** + * Create a report compiler for the given volume on the given datanode. + * + * @param datanode the target datanode + * @param volume the target volume + */ public ReportCompiler(DataNode datanode, FsVolumeSpi volume) { this.datanode = datanode; this.volume = volume; } + /** + * Run this report compiler thread. + * + * @return the block info report list + * @throws IOException if the block pool isn't found + */ @Override - public ScanInfoPerBlockPool call() throws Exception { + public ScanInfoPerBlockPool call() throws IOException { String[] bpList = volume.getBlockPoolList(); ScanInfoPerBlockPool result = new ScanInfoPerBlockPool(bpList.length); for (String bpid : bpList) { - LinkedList report = new LinkedList(); + LinkedList report = new LinkedList<>(); File bpFinalizedDir = volume.getFinalizedDir(bpid); - result.put(bpid, - compileReport(volume, bpFinalizedDir, bpFinalizedDir, report)); + + perfTimer.start(); + throttleTimer.start(); + + try { + result.put(bpid, + compileReport(volume, bpFinalizedDir, bpFinalizedDir, report)); + } catch (InterruptedException ex) { + // Exit quickly and flag the scanner to do the same + result = null; + break; + } } return result; } - /** Compile list {@link ScanInfo} for the blocks in the directory */ + /** + * Compile a list of {@link ScanInfo} for the blocks in the directory + * given by {@code dir}. + * + * @param vol the volume that contains the directory to scan + * @param bpFinalizedDir the root directory of the directory to scan + * @param dir the directory to scan + * @param report the list onto which blocks reports are placed + */ private LinkedList compileReport(FsVolumeSpi vol, - File bpFinalizedDir, File dir, LinkedList report) { + File bpFinalizedDir, File dir, LinkedList report) + throws InterruptedException { + File[] files; + + throttle(); + try { files = FileUtil.listFiles(dir); } catch (IOException ioe) { @@ -632,6 +871,12 @@ public class DirectoryScanner implements Runnable { * blk__.meta */ for (int i = 0; i < files.length; i++) { + // Make sure this thread can make a timely exit. With a low throttle + // rate, completing a run can take a looooong time. + if (Thread.interrupted()) { + throw new InterruptedException(); + } + if (files[i].isDirectory()) { compileReport(vol, bpFinalizedDir, files[i], report); continue; @@ -680,5 +925,40 @@ public class DirectoryScanner implements Runnable { + ", expected block file path: " + expBlockFile); } } + + /** + * Called by the thread before each potential disk scan so that a pause + * can be optionally inserted to limit the number of scans per second. + * The limit is controlled by + * {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY}. + */ + private void throttle() throws InterruptedException { + accumulateTimeRunning(); + + if ((throttleLimitMsPerSec < 1000) && + (throttleTimer.now(TimeUnit.MILLISECONDS) > throttleLimitMsPerSec)) { + + Thread.sleep(MILLIS_PER_SECOND - throttleLimitMsPerSec); + throttleTimer.reset().start(); + } + + accumulateTimeWaiting(); + } + + /** + * Helper method to measure time running. + */ + private void accumulateTimeRunning() { + timeRunningMs.getAndAdd(perfTimer.now(TimeUnit.MILLISECONDS)); + perfTimer.reset().start(); + } + + /** + * Helper method to measure time waiting. + */ + private void accumulateTimeWaiting() { + timeWaitingMs.getAndAdd(perfTimer.now(TimeUnit.MILLISECONDS)); + perfTimer.reset().start(); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ad9efbe/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 4c9e2b2..4359d84 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -601,6 +601,26 @@ + dfs.datanode.directoryscan.throttle.limit.ms.per.sec + 0 + The report compilation threads are limited to only running for + a given number of milliseconds per second, as configured by the + property. The limit is taken per thread, not in aggregate, e.g. setting + a limit of 100ms for 4 compiler threads will result in each thread being + limited to 100ms, not 25ms. + + Note that the throttle does not interrupt the report compiler threads, so the + actual running time of the threads per second will typically be somewhat + higher than the throttle limit, usually by no more than 20%. + + Setting this limit to 1000 disables compiler thread throttling. Only + values between 1 and 1000 are valid. Setting an invalid value will result + in the throttle being disbled and an error message being logged. 1000 is + the default setting. + + + + dfs.heartbeat.interval 3 Determines datanode heartbeat interval in seconds. http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ad9efbe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index f060f3e..5046e97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -33,6 +33,10 @@ import java.nio.channels.FileChannel; import java.util.LinkedList; import java.util.List; import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; @@ -55,6 +59,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; import org.junit.Test; /** @@ -494,7 +499,13 @@ public class TestDirectoryScanner { scan(totalBlocks+3, 6, 2, 2, 3, 2); scan(totalBlocks+1, 0, 0, 0, 0, 0); - // Test14: validate clean shutdown of DirectoryScanner + // Test14: make sure no throttling is happening + assertTrue("Throttle appears to be engaged", + scanner.timeWaitingMs.get() < 10L); + assertTrue("Report complier threads logged no execution time", + scanner.timeRunningMs.get() > 0L); + + // Test15: validate clean shutdown of DirectoryScanner ////assertTrue(scanner.getRunStatus()); //assumes "real" FSDataset, not sim scanner.shutdown(); assertFalse(scanner.getRunStatus()); @@ -508,6 +519,219 @@ public class TestDirectoryScanner { } } + /** + * Test that the timeslice throttle limits the report compiler thread's + * execution time correctly. We test by scanning a large block pool and + * comparing the time spent waiting to the time spent running. + * + * The block pool has to be large, or the ratio will be off. The throttle + * allows the report compiler thread to finish its current cycle when + * blocking it, so the ratio will always be a little lower than expected. + * The smaller the block pool, the further off the ratio will be. + * + * @throws Exception thrown on unexpected failure + */ + @Test (timeout=300000) + public void testThrottling() throws Exception { + Configuration conf = new Configuration(CONF); + + // We need lots of blocks so the report compiler threads have enough to + // keep them busy while we watch them. + int blocks = 20000; + int maxRetries = 3; + + cluster = new MiniDFSCluster.Builder(conf).build(); + + try { + cluster.waitActive(); + bpid = cluster.getNamesystem().getBlockPoolId(); + fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0)); + client = cluster.getFileSystem().getClient(); + conf.setInt( + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, + 100); + DataNode dataNode = cluster.getDataNodes().get(0); + + createFile(GenericTestUtils.getMethodName(), + BLOCK_LENGTH * blocks, false); + + float ratio = 0.0f; + int retries = maxRetries; + + while ((retries > 0) && ((ratio < 7f) || (ratio > 10f))) { + scanner = new DirectoryScanner(dataNode, fds, conf); + ratio = runThrottleTest(blocks); + retries -= 1; + } + + // Waiting should be about 9x running. + LOG.info("RATIO: " + ratio); + assertTrue("Throttle is too restrictive", ratio <= 10f); + assertTrue("Throttle is too permissive", ratio >= 7f); + + // Test with a different limit + conf.setInt( + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, + 200); + ratio = 0.0f; + retries = maxRetries; + + while ((retries > 0) && ((ratio < 3f) || (ratio > 4.5f))) { + scanner = new DirectoryScanner(dataNode, fds, conf); + ratio = runThrottleTest(blocks); + retries -= 1; + } + + // Waiting should be about 4x running. + LOG.info("RATIO: " + ratio); + assertTrue("Throttle is too restrictive", ratio <= 4.5f); + assertTrue("Throttle is too permissive", ratio >= 3.0f); + + // Test with more than 1 thread + conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 3); + conf.setInt( + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, + 100); + ratio = 0.0f; + retries = maxRetries; + + while ((retries > 0) && ((ratio < 7f) || (ratio > 10f))) { + scanner = new DirectoryScanner(dataNode, fds, conf); + ratio = runThrottleTest(blocks); + retries -= 1; + } + + // Waiting should be about 9x running. + LOG.info("RATIO: " + ratio); + assertTrue("Throttle is too restrictive", ratio <= 10f); + assertTrue("Throttle is too permissive", ratio >= 7f); + + // Test with no limit + scanner = new DirectoryScanner(dataNode, fds, CONF); + scanner.setRetainDiffs(true); + scan(blocks, 0, 0, 0, 0, 0); + scanner.shutdown(); + assertFalse(scanner.getRunStatus()); + + assertTrue("Throttle appears to be engaged", + scanner.timeWaitingMs.get() < 10L); + assertTrue("Report complier threads logged no execution time", + scanner.timeRunningMs.get() > 0L); + + // Test with a 1ms limit. This also tests whether the scanner can be + // shutdown cleanly in mid stride. + conf.setInt( + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, + 1); + ratio = 0.0f; + retries = maxRetries; + ScheduledExecutorService interruptor = + Executors.newScheduledThreadPool(maxRetries); + + try { + while ((retries > 0) && (ratio < 10)) { + scanner = new DirectoryScanner(dataNode, fds, conf); + scanner.setRetainDiffs(true); + + final AtomicLong nowMs = new AtomicLong(); + + // Stop the scanner after 2 seconds because otherwise it will take an + // eternity to complete it's run + interruptor.schedule(new Runnable() { + @Override + public void run() { + scanner.shutdown(); + nowMs.set(Time.monotonicNow()); + } + }, 2L, TimeUnit.SECONDS); + + scanner.reconcile(); + assertFalse(scanner.getRunStatus()); + LOG.info("Scanner took " + (Time.monotonicNow() - nowMs.get()) + + "ms to shutdown"); + assertTrue("Scanner took too long to shutdown", + Time.monotonicNow() - nowMs.get() < 1000L); + + ratio = + (float)scanner.timeWaitingMs.get() / scanner.timeRunningMs.get(); + retries -= 1; + } + } finally { + interruptor.shutdown(); + } + + // We just want to test that it waits a lot, but it also runs some + LOG.info("RATIO: " + ratio); + assertTrue("Throttle is too permissive", + ratio > 10); + assertTrue("Report complier threads logged no execution time", + scanner.timeRunningMs.get() > 0L); + + // Test with a 0 limit, i.e. disabled + conf.setInt( + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, + 0); + scanner = new DirectoryScanner(dataNode, fds, conf); + scanner.setRetainDiffs(true); + scan(blocks, 0, 0, 0, 0, 0); + scanner.shutdown(); + assertFalse(scanner.getRunStatus()); + + assertTrue("Throttle appears to be engaged", + scanner.timeWaitingMs.get() < 10L); + assertTrue("Report complier threads logged no execution time", + scanner.timeRunningMs.get() > 0L); + + // Test with a 1000 limit, i.e. disabled + conf.setInt( + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, + 1000); + scanner = new DirectoryScanner(dataNode, fds, conf); + scanner.setRetainDiffs(true); + scan(blocks, 0, 0, 0, 0, 0); + scanner.shutdown(); + assertFalse(scanner.getRunStatus()); + + assertTrue("Throttle appears to be engaged", + scanner.timeWaitingMs.get() < 10L); + assertTrue("Report complier threads logged no execution time", + scanner.timeRunningMs.get() > 0L); + + // Test that throttle works from regular start + conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1); + conf.setInt( + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, + 10); + conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, + 1); + scanner = new DirectoryScanner(dataNode, fds, conf); + scanner.setRetainDiffs(true); + scanner.start(); + + int count = 50; + + while ((count > 0) && (scanner.timeWaitingMs.get() < 500L)) { + Thread.sleep(100L); + count -= 1; + } + + scanner.shutdown(); + assertFalse(scanner.getRunStatus()); + assertTrue("Throttle does not appear to be engaged", count > 0); + } finally { + cluster.shutdown(); + } + } + + private float runThrottleTest(int blocks) throws IOException { + scanner.setRetainDiffs(true); + scan(blocks, 0, 0, 0, 0, 0); + scanner.shutdown(); + assertFalse(scanner.getRunStatus()); + + return (float)scanner.timeWaitingMs.get() / scanner.timeRunningMs.get(); + } + private void verifyAddition(long blockId, long genStamp, long size) { final ReplicaInfo replicainfo; replicainfo = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org