Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9E8BC200BC0 for ; Tue, 11 Oct 2016 00:30:13 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9CD53160AE1; Mon, 10 Oct 2016 22:30:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 685B5160AEB for ; Tue, 11 Oct 2016 00:30:11 +0200 (CEST) Received: (qmail 82352 invoked by uid 500); 10 Oct 2016 22:30:10 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 82328 invoked by uid 99); 10 Oct 2016 22:30:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Oct 2016 22:30:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D3770E00A4; Mon, 10 Oct 2016 22:30:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: lei@apache.org To: common-commits@hadoop.apache.org Date: Mon, 10 Oct 2016 22:30:10 -0000 Message-Id: <97022b6227fc44b0ab20cb37399a58d1@git.apache.org> In-Reply-To: <390450eea6e14047b736d8ada2e81990@git.apache.org> References: <390450eea6e14047b736d8ada2e81990@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] hadoop git commit: HDFS-10637. Modifications to remove the assumption that FsVolumes are backed by java.io.File. (Virajith Jalaparti via lei) archived-at: Mon, 10 Oct 2016 22:30:13 -0000 HDFS-10637. Modifications to remove the assumption that FsVolumes are backed by java.io.File. (Virajith Jalaparti via lei) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/96b12662 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/96b12662 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/96b12662 Branch: refs/heads/trunk Commit: 96b12662ea76e3ded4ef13944fc8df206cfb4613 Parents: 0773ffd Author: Lei Xu Authored: Mon Oct 10 15:28:19 2016 -0700 Committer: Lei Xu Committed: Mon Oct 10 15:30:03 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hdfs/server/common/Storage.java | 22 ++ .../server/datanode/BlockPoolSliceStorage.java | 20 +- .../hdfs/server/datanode/BlockScanner.java | 8 +- .../hadoop/hdfs/server/datanode/DataNode.java | 34 +- .../hdfs/server/datanode/DataStorage.java | 34 +- .../hdfs/server/datanode/DirectoryScanner.java | 320 +------------------ .../hdfs/server/datanode/DiskBalancer.java | 25 +- .../hdfs/server/datanode/LocalReplica.java | 2 +- .../hdfs/server/datanode/ReplicaInfo.java | 2 +- .../hdfs/server/datanode/StorageLocation.java | 32 +- .../hdfs/server/datanode/VolumeScanner.java | 27 +- .../server/datanode/fsdataset/FsDatasetSpi.java | 5 +- .../server/datanode/fsdataset/FsVolumeSpi.java | 234 +++++++++++++- .../impl/FsDatasetAsyncDiskService.java | 40 ++- .../datanode/fsdataset/impl/FsDatasetImpl.java | 136 ++++---- .../datanode/fsdataset/impl/FsVolumeImpl.java | 233 ++++++++++++-- .../fsdataset/impl/FsVolumeImplBuilder.java | 65 ++++ .../datanode/fsdataset/impl/FsVolumeList.java | 44 +-- .../impl/RamDiskAsyncLazyPersistService.java | 79 +++-- .../fsdataset/impl/VolumeFailureInfo.java | 13 +- .../hdfs/server/namenode/FSNamesystem.java | 2 +- .../TestNameNodePrunesMissingStorages.java | 15 +- .../server/datanode/SimulatedFSDataset.java | 46 ++- .../hdfs/server/datanode/TestBlockScanner.java | 3 +- .../datanode/TestDataNodeHotSwapVolumes.java | 15 +- .../datanode/TestDataNodeVolumeFailure.java | 12 +- .../TestDataNodeVolumeFailureReporting.java | 10 + .../server/datanode/TestDirectoryScanner.java | 76 +++-- .../hdfs/server/datanode/TestDiskError.java | 2 +- .../extdataset/ExternalDatasetImpl.java | 10 +- .../datanode/extdataset/ExternalVolumeImpl.java | 44 ++- .../fsdataset/impl/FsDatasetImplTestUtils.java | 9 +- .../fsdataset/impl/TestFsDatasetImpl.java | 69 ++-- .../fsdataset/impl/TestFsVolumeList.java | 83 +++-- .../TestDiskBalancerWithMockMover.java | 4 +- 35 files changed, 1062 insertions(+), 713 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java index 9218e9d..e55de35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIOException; import org.apache.hadoop.util.ToolRunner; @@ -269,11 +270,17 @@ public abstract class Storage extends StorageInfo { private String storageUuid = null; // Storage directory identifier. + private final StorageLocation location; public StorageDirectory(File dir) { // default dirType is null this(dir, null, false); } + public StorageDirectory(StorageLocation location) { + // default dirType is null + this(location.getFile(), null, false, location); + } + public StorageDirectory(File dir, StorageDirType dirType) { this(dir, dirType, false); } @@ -294,11 +301,22 @@ public abstract class Storage extends StorageInfo { * disables locking on the storage directory, false enables locking */ public StorageDirectory(File dir, StorageDirType dirType, boolean isShared) { + this(dir, dirType, isShared, null); + } + + public StorageDirectory(File dir, StorageDirType dirType, + boolean isShared, StorageLocation location) { this.root = dir; this.lock = null; this.dirType = dirType; this.isShared = isShared; + this.location = location; + assert location == null || + dir.getAbsolutePath().startsWith( + location.getFile().getAbsolutePath()): + "The storage location and directory should be equal"; } + /** * Get root directory of this storage @@ -861,6 +879,10 @@ public abstract class Storage extends StorageInfo { } return false; } + + public StorageLocation getStorageLocation() { + return location; + } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java index fd89611..e3b6da1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java @@ -147,10 +147,10 @@ public class BlockPoolSliceStorage extends Storage { * @throws IOException */ private StorageDirectory loadStorageDirectory(NamespaceInfo nsInfo, - File dataDir, StartupOption startOpt, + File dataDir, StorageLocation location, StartupOption startOpt, List> callables, Configuration conf) throws IOException { - StorageDirectory sd = new StorageDirectory(dataDir, null, true); + StorageDirectory sd = new StorageDirectory(dataDir, null, true, location); try { StorageState curState = sd.analyzeStorage(startOpt, this, true); // sd is locked but not opened @@ -208,9 +208,9 @@ public class BlockPoolSliceStorage extends Storage { * @throws IOException on error */ List loadBpStorageDirectories(NamespaceInfo nsInfo, - Collection dataDirs, StartupOption startOpt, - List> callables, Configuration conf) - throws IOException { + Collection dataDirs, StorageLocation location, + StartupOption startOpt, List> callables, + Configuration conf) throws IOException { List succeedDirs = Lists.newArrayList(); try { for (File dataDir : dataDirs) { @@ -220,7 +220,7 @@ public class BlockPoolSliceStorage extends Storage { "attempt to load an used block storage: " + dataDir); } final StorageDirectory sd = loadStorageDirectory( - nsInfo, dataDir, startOpt, callables, conf); + nsInfo, dataDir, location, startOpt, callables, conf); succeedDirs.add(sd); } } catch (IOException e) { @@ -244,12 +244,12 @@ public class BlockPoolSliceStorage extends Storage { * @throws IOException on error */ List recoverTransitionRead(NamespaceInfo nsInfo, - Collection dataDirs, StartupOption startOpt, - List> callables, Configuration conf) - throws IOException { + Collection dataDirs, StorageLocation location, + StartupOption startOpt, List> callables, + Configuration conf) throws IOException { LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID()); final List loaded = loadBpStorageDirectories( - nsInfo, dataDirs, startOpt, callables, conf); + nsInfo, dataDirs, location, startOpt, callables, conf); for (StorageDirectory sd : loaded) { addStorageDir(sd); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java index 456dcc1..21484fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java @@ -201,17 +201,17 @@ public class BlockScanner { FsVolumeSpi volume = ref.getVolume(); if (!isEnabled()) { LOG.debug("Not adding volume scanner for {}, because the block " + - "scanner is disabled.", volume.getBasePath()); + "scanner is disabled.", volume); return; } VolumeScanner scanner = scanners.get(volume.getStorageID()); if (scanner != null) { LOG.error("Already have a scanner for volume {}.", - volume.getBasePath()); + volume); return; } LOG.debug("Adding scanner for volume {} (StorageID {})", - volume.getBasePath(), volume.getStorageID()); + volume, volume.getStorageID()); scanner = new VolumeScanner(conf, datanode, ref); scanner.start(); scanners.put(volume.getStorageID(), scanner); @@ -245,7 +245,7 @@ public class BlockScanner { return; } LOG.info("Removing scanner for volume {} (StorageID {})", - volume.getBasePath(), volume.getStorageID()); + volume, volume.getStorageID()); scanner.shutdown(); scanners.remove(volume.getStorageID()); Uninterruptibles.joinUninterruptibly(scanner, 5, TimeUnit.MINUTES); http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index dd7e426..cb8e308 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -58,7 +58,6 @@ import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.EOFException; -import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -78,7 +77,6 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -791,11 +789,7 @@ public class DataNode extends ReconfigurableBase if (locations.isEmpty()) { return; } - Set volumesToRemove = new HashSet<>(); - for (StorageLocation loc : locations) { - volumesToRemove.add(loc.getFile().getAbsoluteFile()); - } - removeVolumes(volumesToRemove, true); + removeVolumes(locations, true); } /** @@ -814,26 +808,22 @@ public class DataNode extends ReconfigurableBase * @throws IOException */ private synchronized void removeVolumes( - final Set absoluteVolumePaths, boolean clearFailure) + final Collection storageLocations, boolean clearFailure) throws IOException { - for (File vol : absoluteVolumePaths) { - Preconditions.checkArgument(vol.isAbsolute()); - } - - if (absoluteVolumePaths.isEmpty()) { + if (storageLocations.isEmpty()) { return; } LOG.info(String.format("Deactivating volumes (clear failure=%b): %s", - clearFailure, Joiner.on(",").join(absoluteVolumePaths))); + clearFailure, Joiner.on(",").join(storageLocations))); IOException ioe = null; // Remove volumes and block infos from FsDataset. - data.removeVolumes(absoluteVolumePaths, clearFailure); + data.removeVolumes(storageLocations, clearFailure); // Remove volumes from DataStorage. try { - storage.removeVolumes(absoluteVolumePaths); + storage.removeVolumes(storageLocations); } catch (IOException e) { ioe = e; } @@ -841,7 +831,7 @@ public class DataNode extends ReconfigurableBase // Set configuration and dataDirs to reflect volume changes. for (Iterator it = dataDirs.iterator(); it.hasNext(); ) { StorageLocation loc = it.next(); - if (absoluteVolumePaths.contains(loc.getFile().getAbsoluteFile())) { + if (storageLocations.contains(loc)) { it.remove(); } } @@ -3242,18 +3232,18 @@ public class DataNode extends ReconfigurableBase * Check the disk error */ private void checkDiskError() { - Set unhealthyDataDirs = data.checkDataDir(); - if (unhealthyDataDirs != null && !unhealthyDataDirs.isEmpty()) { + Set unhealthyLocations = data.checkDataDir(); + if (unhealthyLocations != null && !unhealthyLocations.isEmpty()) { try { // Remove all unhealthy volumes from DataNode. - removeVolumes(unhealthyDataDirs, false); + removeVolumes(unhealthyLocations, false); } catch (IOException e) { LOG.warn("Error occurred when removing unhealthy storage dirs: " + e.getMessage(), e); } StringBuilder sb = new StringBuilder("DataNode failed volumes:"); - for (File dataDir : unhealthyDataDirs) { - sb.append(dataDir.getAbsolutePath() + ";"); + for (StorageLocation location : unhealthyLocations) { + sb.append(location + ";"); } handleDiskError(sb.toString()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java index 7e620c2..7c9bea5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java @@ -263,9 +263,10 @@ public class DataStorage extends Storage { } private StorageDirectory loadStorageDirectory(DataNode datanode, - NamespaceInfo nsInfo, File dataDir, StartupOption startOpt, - List> callables) throws IOException { - StorageDirectory sd = new StorageDirectory(dataDir, null, false); + NamespaceInfo nsInfo, File dataDir, StorageLocation location, + StartupOption startOpt, List> callables) + throws IOException { + StorageDirectory sd = new StorageDirectory(dataDir, null, false, location); try { StorageState curState = sd.analyzeStorage(startOpt, this, true); // sd is locked but not opened @@ -310,7 +311,7 @@ public class DataStorage extends Storage { * builder later. * * @param datanode DataNode object. - * @param volume the root path of a storage directory. + * @param location the StorageLocation for the storage directory. * @param nsInfos an array of namespace infos. * @return a VolumeBuilder that holds the metadata of this storage directory * and can be added to DataStorage later. @@ -318,8 +319,10 @@ public class DataStorage extends Storage { * * Note that if there is IOException, the state of DataStorage is not modified. */ - public VolumeBuilder prepareVolume(DataNode datanode, File volume, - List nsInfos) throws IOException { + public VolumeBuilder prepareVolume(DataNode datanode, + StorageLocation location, List nsInfos) + throws IOException { + File volume = location.getFile(); if (containsStorageDir(volume)) { final String errorMessage = "Storage directory is in use"; LOG.warn(errorMessage + "."); @@ -327,7 +330,8 @@ public class DataStorage extends Storage { } StorageDirectory sd = loadStorageDirectory( - datanode, nsInfos.get(0), volume, StartupOption.HOTSWAP, null); + datanode, nsInfos.get(0), volume, location, + StartupOption.HOTSWAP, null); VolumeBuilder builder = new VolumeBuilder(this, sd); for (NamespaceInfo nsInfo : nsInfos) { @@ -338,7 +342,8 @@ public class DataStorage extends Storage { final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo); final List dirs = bpStorage.loadBpStorageDirectories( - nsInfo, bpDataDirs, StartupOption.HOTSWAP, null, datanode.getConf()); + nsInfo, bpDataDirs, location, StartupOption.HOTSWAP, + null, datanode.getConf()); builder.addBpStorageDirectories(nsInfo.getBlockPoolID(), dirs); } return builder; @@ -407,7 +412,7 @@ public class DataStorage extends Storage { final List> callables = Lists.newArrayList(); final StorageDirectory sd = loadStorageDirectory( - datanode, nsInfo, root, startOpt, callables); + datanode, nsInfo, root, dataDir, startOpt, callables); if (callables.isEmpty()) { addStorageDir(sd); success.add(dataDir); @@ -458,7 +463,8 @@ public class DataStorage extends Storage { final List> callables = Lists.newArrayList(); final List dirs = bpStorage.recoverTransitionRead( - nsInfo, bpDataDirs, startOpt, callables, datanode.getConf()); + nsInfo, bpDataDirs, dataDir, startOpt, + callables, datanode.getConf()); if (callables.isEmpty()) { for(StorageDirectory sd : dirs) { success.add(sd); @@ -498,9 +504,10 @@ public class DataStorage extends Storage { * @param dirsToRemove a set of storage directories to be removed. * @throws IOException if I/O error when unlocking storage directory. */ - synchronized void removeVolumes(final Set dirsToRemove) + synchronized void removeVolumes( + final Collection storageLocations) throws IOException { - if (dirsToRemove.isEmpty()) { + if (storageLocations.isEmpty()) { return; } @@ -508,7 +515,8 @@ public class DataStorage extends Storage { for (Iterator it = this.storageDirs.iterator(); it.hasNext(); ) { StorageDirectory sd = it.next(); - if (dirsToRemove.contains(sd.getRoot())) { + StorageLocation sdLocation = sd.getStorageLocation(); + if (storageLocations.contains(sdLocation)) { // Remove the block pool level storage first. for (Map.Entry entry : this.bpStorageMap.entrySet()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index c50bfaf..58071dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -22,7 +22,6 @@ import java.io.File; import java.io.FilenameFilter; import java.io.IOException; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -37,9 +36,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -47,10 +43,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; -import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.StopWatch; import org.apache.hadoop.util.Time; @@ -209,200 +204,6 @@ public class DirectoryScanner implements Runnable { } } - /** - * Tracks the files and other information related to a block on the disk - * Missing file is indicated by setting the corresponding member - * to null. - * - * Because millions of these structures may be created, we try to save - * memory here. So instead of storing full paths, we store path suffixes. - * The block file, if it exists, will have a path like this: - * / - * So we don't need to store the volume path, since we already know what the - * volume is. - * - * The metadata file, if it exists, will have a path like this: - * /_.meta - * So if we have a block file, there isn't any need to store the block path - * again. - * - * The accessor functions take care of these manipulations. - */ - static class ScanInfo implements Comparable { - private final long blockId; - - /** - * The block file path, relative to the volume's base directory. - * If there was no block file found, this may be null. If 'vol' - * is null, then this is the full path of the block file. - */ - private final String blockSuffix; - - /** - * The suffix of the meta file path relative to the block file. - * If blockSuffix is null, then this will be the entire path relative - * to the volume base directory, or an absolute path if vol is also - * null. - */ - private final String metaSuffix; - - private final FsVolumeSpi volume; - - /** - * Get the file's length in async block scan - */ - private final long blockFileLength; - - private final static Pattern CONDENSED_PATH_REGEX = - Pattern.compile("(?>>32)); - } - - public long getGenStamp() { - return metaSuffix != null ? Block.getGenerationStamp( - getMetaFile().getName()) : - HdfsConstants.GRANDFATHER_GENERATION_STAMP; - } - } /** * Create a new directory scanner, but don't cycle it running yet. @@ -644,7 +445,7 @@ public class DirectoryScanner implements Runnable { // There may be multiple on-disk records for the same block, don't increment // the memory record pointer if so. ScanInfo nextInfo = blockpoolReport[Math.min(d, blockpoolReport.length - 1)]; - if (nextInfo.getBlockId() != info.blockId) { + if (nextInfo.getBlockId() != info.getBlockId()) { ++m; } } else { @@ -763,19 +564,6 @@ public class DirectoryScanner implements Runnable { } /** - * Helper method to determine if a file name is consistent with a block. - * meta-data file - * - * @param blockId the block ID - * @param metaFile the file to check - * @return whether the file name is a block meta-data file name - */ - private static boolean isBlockMetaFile(String blockId, String metaFile) { - return metaFile.startsWith(blockId) - && metaFile.endsWith(Block.METADATA_EXTENSION); - } - - /** * The ReportCompiler class encapsulates the process of searching a datanode's * disks for block information. It operates by performing a DFS of the * volume to discover block information. @@ -784,7 +572,7 @@ public class DirectoryScanner implements Runnable { * ScanInfo object for it and adds that object to its report list. The report * list is returned by the {@link #call()} method. */ - private class ReportCompiler implements Callable { + public class ReportCompiler implements Callable { private final FsVolumeSpi volume; private final DataNode datanode; // Variable for tracking time spent running for throttling purposes @@ -816,14 +604,12 @@ public class DirectoryScanner implements Runnable { ScanInfoPerBlockPool result = new ScanInfoPerBlockPool(bpList.length); for (String bpid : bpList) { LinkedList report = new LinkedList<>(); - File bpFinalizedDir = volume.getFinalizedDir(bpid); perfTimer.start(); throttleTimer.start(); try { - result.put(bpid, - compileReport(volume, bpFinalizedDir, bpFinalizedDir, report)); + result.put(bpid, volume.compileReport(bpid, report, this)); } catch (InterruptedException ex) { // Exit quickly and flag the scanner to do the same result = null; @@ -834,106 +620,12 @@ public class DirectoryScanner implements Runnable { } /** - * Compile a list of {@link ScanInfo} for the blocks in the directory - * given by {@code dir}. - * - * @param vol the volume that contains the directory to scan - * @param bpFinalizedDir the root directory of the directory to scan - * @param dir the directory to scan - * @param report the list onto which blocks reports are placed - */ - private LinkedList compileReport(FsVolumeSpi vol, - File bpFinalizedDir, File dir, LinkedList report) - throws InterruptedException { - - throttle(); - - List fileNames; - try { - fileNames = IOUtils.listDirectory(dir, BlockDirFilter.INSTANCE); - } catch (IOException ioe) { - LOG.warn("Exception occured while compiling report: ", ioe); - // Initiate a check on disk failure. - datanode.checkDiskErrorAsync(); - // Ignore this directory and proceed. - return report; - } - Collections.sort(fileNames); - - /* - * Assumption: In the sorted list of files block file appears immediately - * before block metadata file. This is true for the current naming - * convention for block file blk_ and meta file - * blk__.meta - */ - for (int i = 0; i < fileNames.size(); i++) { - // Make sure this thread can make a timely exit. With a low throttle - // rate, completing a run can take a looooong time. - if (Thread.interrupted()) { - throw new InterruptedException(); - } - - File file = new File(dir, fileNames.get(i)); - if (file.isDirectory()) { - compileReport(vol, bpFinalizedDir, file, report); - continue; - } - if (!Block.isBlockFilename(file)) { - if (isBlockMetaFile(Block.BLOCK_FILE_PREFIX, file.getName())) { - long blockId = Block.getBlockId(file.getName()); - verifyFileLocation(file.getParentFile(), bpFinalizedDir, - blockId); - report.add(new ScanInfo(blockId, null, file, vol)); - } - continue; - } - File blockFile = file; - long blockId = Block.filename2id(file.getName()); - File metaFile = null; - - // Skip all the files that start with block name until - // getting to the metafile for the block - while (i + 1 < fileNames.size()) { - File blkMetaFile = new File(dir, fileNames.get(i + 1)); - if (!(blkMetaFile.isFile() - && blkMetaFile.getName().startsWith(blockFile.getName()))) { - break; - } - i++; - if (isBlockMetaFile(blockFile.getName(), blkMetaFile.getName())) { - metaFile = blkMetaFile; - break; - } - } - verifyFileLocation(blockFile, bpFinalizedDir, blockId); - report.add(new ScanInfo(blockId, blockFile, metaFile, vol)); - } - return report; - } - - /** - * Verify whether the actual directory location of block file has the - * expected directory path computed using its block ID. - */ - private void verifyFileLocation(File actualBlockFile, - File bpFinalizedDir, long blockId) { - File expectedBlockDir = - DatanodeUtil.idToBlockDir(bpFinalizedDir, blockId); - File actualBlockDir = actualBlockFile.getParentFile(); - if (actualBlockDir.compareTo(expectedBlockDir) != 0) { - LOG.warn("Block: " + blockId + - " found in invalid directory. Expected directory: " + - expectedBlockDir + ". Actual directory: " + actualBlockDir); - } - } - - /** * Called by the thread before each potential disk scan so that a pause * can be optionally inserted to limit the number of scans per second. * The limit is controlled by * {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY}. */ - private void throttle() throws InterruptedException { + public void throttle() throws InterruptedException { accumulateTimeRunning(); if ((throttleLimitMsPerSec < 1000) && @@ -963,7 +655,7 @@ public class DirectoryScanner implements Runnable { } } - private enum BlockDirFilter implements FilenameFilter { + public enum BlockDirFilter implements FilenameFilter { INSTANCE; @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java index e7e9105..0c75001 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java @@ -500,7 +500,8 @@ public class DiskBalancer { references = this.dataset.getFsVolumeReferences(); for (int ndx = 0; ndx < references.size(); ndx++) { FsVolumeSpi vol = references.get(ndx); - storageIDToVolBasePathMap.put(vol.getStorageID(), vol.getBasePath()); + storageIDToVolBasePathMap.put(vol.getStorageID(), + vol.getBaseURI().getPath()); } references.close(); } @@ -1023,7 +1024,7 @@ public class DiskBalancer { openPoolIters(source, poolIters); if (poolIters.size() == 0) { LOG.error("No block pools found on volume. volume : {}. Exiting.", - source.getBasePath()); + source.getBaseURI()); return; } @@ -1033,17 +1034,16 @@ public class DiskBalancer { // Check for the max error count constraint. if (item.getErrorCount() > getMaxError(item)) { LOG.error("Exceeded the max error count. source {}, dest: {} " + - "error count: {}", source.getBasePath(), - dest.getBasePath(), item.getErrorCount()); - this.setExitFlag(); - continue; + "error count: {}", source.getBaseURI(), + dest.getBaseURI(), item.getErrorCount()); + break; } // Check for the block tolerance constraint. if (isCloseEnough(item)) { LOG.info("Copy from {} to {} done. copied {} bytes and {} " + "blocks.", - source.getBasePath(), dest.getBasePath(), + source.getBaseURI(), dest.getBaseURI(), item.getBytesCopied(), item.getBlocksCopied()); this.setExitFlag(); continue; @@ -1053,7 +1053,7 @@ public class DiskBalancer { // we are not able to find any blocks to copy. if (block == null) { LOG.error("No source blocks, exiting the copy. Source: {}, " + - "Dest:{}", source.getBasePath(), dest.getBasePath()); + "Dest:{}", source.getBaseURI(), dest.getBaseURI()); this.setExitFlag(); continue; } @@ -1081,14 +1081,13 @@ public class DiskBalancer { // exiting here. LOG.error("Destination volume: {} does not have enough space to" + " accommodate a block. Block Size: {} Exiting from" + - " copyBlocks.", dest.getBasePath(), block.getNumBytes()); - this.setExitFlag(); - continue; + " copyBlocks.", dest.getBaseURI(), block.getNumBytes()); + break; } LOG.debug("Moved block with size {} from {} to {}", - block.getNumBytes(), source.getBasePath(), - dest.getBasePath()); + block.getNumBytes(), source.getBaseURI(), + dest.getBaseURI()); // Check for the max throughput constraint. // We sleep here to keep the promise that we will not http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java index cbfc9a5..58febf0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java @@ -39,8 +39,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.Storage; -import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ScanInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil; import org.apache.hadoop.io.IOUtils; http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java index cbbafc3..dc63238 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java @@ -25,8 +25,8 @@ import java.net.URI; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ScanInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.util.LightWeightResizableGSet; http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java index 3162c5c..75abc1d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.util.StringUtils; + /** * Encapsulates the URI and storage medium that together describe a * storage directory. @@ -37,7 +38,7 @@ import org.apache.hadoop.util.StringUtils; * */ @InterfaceAudience.Private -public class StorageLocation { +public class StorageLocation implements Comparable{ final StorageType storageType; final File file; @@ -104,16 +105,37 @@ public class StorageLocation { @Override public boolean equals(Object obj) { - if (obj == this) { - return true; - } else if (obj == null || !(obj instanceof StorageLocation)) { + if (obj == null || !(obj instanceof StorageLocation)) { return false; } - return toString().equals(obj.toString()); + int comp = compareTo((StorageLocation) obj); + return comp == 0; } @Override public int hashCode() { return toString().hashCode(); } + + @Override + public int compareTo(StorageLocation obj) { + if (obj == this) { + return 0; + } else if (obj == null) { + return -1; + } + + StorageLocation otherStorage = (StorageLocation) obj; + if (this.getFile() != null && otherStorage.getFile() != null) { + return this.getFile().getAbsolutePath().compareTo( + otherStorage.getFile().getAbsolutePath()); + } else if (this.getFile() == null && otherStorage.getFile() == null) { + return this.storageType.compareTo(otherStorage.getStorageType()); + } else if (this.getFile() == null) { + return -1; + } else { + return 1; + } + + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java index 3416b53..1e44fb6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java @@ -217,7 +217,7 @@ public class VolumeScanner extends Thread { public void printStats(StringBuilder p) { p.append(String.format("Block scanner information for volume %s with base" + - " path %s%n", volume.getStorageID(), volume.getBasePath())); + " path %s%n", volume.getStorageID(), volume)); synchronized (stats) { p.append(String.format("Bytes verified in last hour : %57d%n", stats.bytesScannedInPastHour)); @@ -253,20 +253,20 @@ public class VolumeScanner extends Thread { public void setup(VolumeScanner scanner) { LOG.trace("Starting VolumeScanner {}", - scanner.volume.getBasePath()); + scanner.volume); this.scanner = scanner; } public void handle(ExtendedBlock block, IOException e) { FsVolumeSpi volume = scanner.volume; if (e == null) { - LOG.trace("Successfully scanned {} on {}", block, volume.getBasePath()); + LOG.trace("Successfully scanned {} on {}", block, volume); return; } // If the block does not exist anymore, then it's not an error. if (!volume.getDataset().contains(block)) { LOG.debug("Volume {}: block {} is no longer in the dataset.", - volume.getBasePath(), block); + volume, block); return; } // If the block exists, the exception may due to a race with write: @@ -278,11 +278,10 @@ public class VolumeScanner extends Thread { if (e instanceof FileNotFoundException ) { LOG.info("Volume {}: verification failed for {} because of " + "FileNotFoundException. This may be due to a race with write.", - volume.getBasePath(), block); + volume, block); return; } - LOG.warn("Reporting bad " + block + " with volume " - + volume.getBasePath(), e); + LOG.warn("Reporting bad {} on {}", block, volume); try { scanner.datanode.reportBadBlocks(block, volume); } catch (IOException ie) { @@ -305,7 +304,7 @@ public class VolumeScanner extends Thread { handler = new ScanResultHandler(); } this.resultHandler = handler; - setName("VolumeScannerThread(" + volume.getBasePath() + ")"); + setName("VolumeScannerThread(" + volume + ")"); setDaemon(true); } @@ -376,7 +375,7 @@ public class VolumeScanner extends Thread { BlockIterator iter = blockIters.get(idx); if (!iter.atEnd()) { LOG.info("Now scanning bpid {} on volume {}", - iter.getBlockPoolId(), volume.getBasePath()); + iter.getBlockPoolId(), volume); curBlockIter = iter; return 0L; } @@ -385,7 +384,7 @@ public class VolumeScanner extends Thread { if (waitMs <= 0) { iter.rewind(); LOG.info("Now rescanning bpid {} on volume {}, after more than " + - "{} hour(s)", iter.getBlockPoolId(), volume.getBasePath(), + "{} hour(s)", iter.getBlockPoolId(), volume, TimeUnit.HOURS.convert(conf.scanPeriodMs, TimeUnit.MILLISECONDS)); curBlockIter = iter; return 0L; @@ -416,16 +415,16 @@ public class VolumeScanner extends Thread { cblock.getBlockPoolId(), cblock.getBlockId()); if (b == null) { LOG.info("Replica {} was not found in the VolumeMap for volume {}", - cblock, volume.getBasePath()); + cblock, volume); } else { block = new ExtendedBlock(cblock.getBlockPoolId(), b); } } catch (FileNotFoundException e) { LOG.info("FileNotFoundException while finding block {} on volume {}", - cblock, volume.getBasePath()); + cblock, volume); } catch (IOException e) { LOG.warn("I/O error while finding block {} on volume {}", - cblock, volume.getBasePath()); + cblock, volume); } if (block == null) { return -1; // block not found. @@ -642,7 +641,7 @@ public class VolumeScanner extends Thread { @Override public String toString() { - return "VolumeScanner(" + volume.getBasePath() + + return "VolumeScanner(" + volume + ", " + volume.getStorageID() + ")"; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index b75ed5b..f2ffa83 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.channels.ClosedChannelException; import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -206,7 +207,7 @@ public interface FsDatasetSpi extends FSDatasetMBean { * @param clearFailure set true to clear the failure information about the * volumes. */ - void removeVolumes(Set volumes, boolean clearFailure); + void removeVolumes(Collection volumes, boolean clearFailure); /** @return a storage with the given storage ID */ DatanodeStorage getStorage(final String storageUuid); @@ -482,7 +483,7 @@ public interface FsDatasetSpi extends FSDatasetMBean { * Check if all the data directories are healthy * @return A set of unhealthy data directories. */ - Set checkDataDir(); + Set checkDataDir(); /** * Shutdown the FSDataset http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java index 9e16121..dbba31d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java @@ -20,10 +20,20 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.Closeable; import java.io.File; import java.io.IOException; +import java.net.URI; import java.nio.channels.ClosedChannelException; +import java.util.LinkedList; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; /** * This is an interface for the underlying volume. @@ -48,14 +58,14 @@ public interface FsVolumeSpi { long getAvailable() throws IOException; /** @return the base path to the volume */ - String getBasePath(); + URI getBaseURI(); - /** @return the path to the volume */ - String getPath(String bpid) throws IOException; + DF getUsageStats(Configuration conf); - /** @return the directory for the finalized blocks in the block pool. */ - File getFinalizedDir(String bpid) throws IOException; - + /** @return the {@link StorageLocation} to the volume */ + StorageLocation getStorageLocation(); + + /** @return the {@link StorageType} of the volume */ StorageType getStorageType(); /** Returns true if the volume is NOT backed by persistent storage. */ @@ -186,4 +196,216 @@ public interface FsVolumeSpi { * Get the FSDatasetSpi which this volume is a part of. */ FsDatasetSpi getDataset(); + + /** + * Tracks the files and other information related to a block on the disk + * Missing file is indicated by setting the corresponding member + * to null. + * + * Because millions of these structures may be created, we try to save + * memory here. So instead of storing full paths, we store path suffixes. + * The block file, if it exists, will have a path like this: + * / + * So we don't need to store the volume path, since we already know what the + * volume is. + * + * The metadata file, if it exists, will have a path like this: + * /_.meta + * So if we have a block file, there isn't any need to store the block path + * again. + * + * The accessor functions take care of these manipulations. + */ + public static class ScanInfo implements Comparable { + private final long blockId; + + /** + * The block file path, relative to the volume's base directory. + * If there was no block file found, this may be null. If 'vol' + * is null, then this is the full path of the block file. + */ + private final String blockSuffix; + + /** + * The suffix of the meta file path relative to the block file. + * If blockSuffix is null, then this will be the entire path relative + * to the volume base directory, or an absolute path if vol is also + * null. + */ + private final String metaSuffix; + + private final FsVolumeSpi volume; + + /** + * Get the file's length in async block scan + */ + private final long blockFileLength; + + private final static Pattern CONDENSED_PATH_REGEX = + Pattern.compile("(?>>32)); + } + + public long getGenStamp() { + return metaSuffix != null ? Block.getGenerationStamp( + getMetaFile().getName()) : + HdfsConstants.GRANDFATHER_GENERATION_STAMP; + } + } + + /** + * Compile a list of {@link ScanInfo} for the blocks in + * the block pool with id {@code bpid}. + * + * @param bpid block pool id to scan + * @param report the list onto which blocks reports are placed + * @param reportCompiler + * @throws IOException + */ + LinkedList compileReport(String bpid, + LinkedList report, ReportCompiler reportCompiler) + throws InterruptedException, IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index c9160cd..b9c731b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -71,8 +71,8 @@ class FsDatasetAsyncDiskService { private final DataNode datanode; private final FsDatasetImpl fsdatasetImpl; private final ThreadGroup threadGroup; - private Map executors - = new HashMap(); + private Map executors + = new HashMap(); private Map> deletedBlockIds = new HashMap>(); private static final int MAX_DELETED_BLOCKS = 64; @@ -91,7 +91,7 @@ class FsDatasetAsyncDiskService { this.threadGroup = new ThreadGroup(getClass().getSimpleName()); } - private void addExecutorForVolume(final File volume) { + private void addExecutorForVolume(final FsVolumeImpl volume) { ThreadFactory threadFactory = new ThreadFactory() { int counter = 0; @@ -115,18 +115,21 @@ class FsDatasetAsyncDiskService { // This can reduce the number of running threads executor.allowCoreThreadTimeOut(true); - executors.put(volume, executor); + executors.put(volume.getStorageID(), executor); } /** * Starts AsyncDiskService for a new volume * @param volume the root of the new data volume. */ - synchronized void addVolume(File volume) { + synchronized void addVolume(FsVolumeImpl volume) { if (executors == null) { throw new RuntimeException("AsyncDiskService is already shutdown"); } - ThreadPoolExecutor executor = executors.get(volume); + if (volume == null) { + throw new RuntimeException("Attempt to add a null volume"); + } + ThreadPoolExecutor executor = executors.get(volume.getStorageID()); if (executor != null) { throw new RuntimeException("Volume " + volume + " is already existed."); } @@ -137,17 +140,17 @@ class FsDatasetAsyncDiskService { * Stops AsyncDiskService for a volume. * @param volume the root of the volume. */ - synchronized void removeVolume(File volume) { + synchronized void removeVolume(String storageId) { if (executors == null) { throw new RuntimeException("AsyncDiskService is already shutdown"); } - ThreadPoolExecutor executor = executors.get(volume); + ThreadPoolExecutor executor = executors.get(storageId); if (executor == null) { - throw new RuntimeException("Can not find volume " + volume - + " to remove."); + throw new RuntimeException("Can not find volume with storageId " + + storageId + " to remove."); } else { executor.shutdown(); - executors.remove(volume); + executors.remove(storageId); } } @@ -162,13 +165,16 @@ class FsDatasetAsyncDiskService { /** * Execute the task sometime in the future, using ThreadPools. */ - synchronized void execute(File root, Runnable task) { + synchronized void execute(FsVolumeImpl volume, Runnable task) { if (executors == null) { throw new RuntimeException("AsyncDiskService is already shutdown"); } - ThreadPoolExecutor executor = executors.get(root); + if (volume == null) { + throw new RuntimeException("A null volume does not have a executor"); + } + ThreadPoolExecutor executor = executors.get(volume.getStorageID()); if (executor == null) { - throw new RuntimeException("Cannot find root " + root + throw new RuntimeException("Cannot find volume " + volume + " for execution of task " + task); } else { executor.execute(task); @@ -185,7 +191,7 @@ class FsDatasetAsyncDiskService { } else { LOG.info("Shutting down all async disk service threads"); - for (Map.Entry e : executors.entrySet()) { + for (Map.Entry e : executors.entrySet()) { e.getValue().shutdown(); } // clear the executor map so that calling execute again will fail. @@ -198,7 +204,7 @@ class FsDatasetAsyncDiskService { public void submitSyncFileRangeRequest(FsVolumeImpl volume, final FileDescriptor fd, final long offset, final long nbytes, final int flags) { - execute(volume.getCurrentDir(), new Runnable() { + execute(volume, new Runnable() { @Override public void run() { try { @@ -220,7 +226,7 @@ class FsDatasetAsyncDiskService { + " replica " + replicaToDelete + " for deletion"); ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask( volumeRef, replicaToDelete, block, trashDirectory); - execute(((FsVolumeImpl) volumeRef.getVolume()).getCurrentDir(), deletionTask); + execute(((FsVolumeImpl) volumeRef.getVolume()), deletionTask); } /** A task for deleting a block file and its associated meta file, as well http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 26a2e9f..fd747bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -361,20 +361,22 @@ class FsDatasetImpl implements FsDatasetSpi { */ private static List getInitialVolumeFailureInfos( Collection dataLocations, DataStorage storage) { - Set failedLocationSet = Sets.newHashSetWithExpectedSize( + Set failedLocationSet = Sets.newHashSetWithExpectedSize( dataLocations.size()); for (StorageLocation sl: dataLocations) { - failedLocationSet.add(sl.getFile().getAbsolutePath()); + LOG.info("Adding to failedLocationSet " + sl); + failedLocationSet.add(sl); } for (Iterator it = storage.dirIterator(); it.hasNext(); ) { Storage.StorageDirectory sd = it.next(); - failedLocationSet.remove(sd.getRoot().getAbsolutePath()); + failedLocationSet.remove(sd.getStorageLocation()); + LOG.info("Removing from failedLocationSet " + sd.getStorageLocation()); } List volumeFailureInfos = Lists.newArrayListWithCapacity( failedLocationSet.size()); long failureDate = Time.now(); - for (String failedStorageLocation: failedLocationSet) { + for (StorageLocation failedStorageLocation: failedLocationSet) { volumeFailureInfos.add(new VolumeFailureInfo(failedStorageLocation, failureDate)); } @@ -403,49 +405,55 @@ class FsDatasetImpl implements FsDatasetSpi { new DatanodeStorage(sd.getStorageUuid(), DatanodeStorage.State.NORMAL, storageType)); - asyncDiskService.addVolume(sd.getCurrentDir()); + asyncDiskService.addVolume((FsVolumeImpl) ref.getVolume()); volumes.addVolume(ref); } } private void addVolume(Collection dataLocations, Storage.StorageDirectory sd) throws IOException { - final File dir = sd.getCurrentDir(); - final StorageType storageType = - getStorageTypeFromLocations(dataLocations, sd.getRoot()); + final StorageLocation storageLocation = sd.getStorageLocation(); // If IOException raises from FsVolumeImpl() or getVolumeMap(), there is // nothing needed to be rolled back to make various data structures, e.g., // storageMap and asyncDiskService, consistent. - FsVolumeImpl fsVolume = new FsVolumeImpl( - this, sd.getStorageUuid(), dir, this.conf, storageType); + FsVolumeImpl fsVolume = new FsVolumeImplBuilder() + .setDataset(this) + .setStorageID(sd.getStorageUuid()) + .setStorageDirectory(sd) + .setConf(this.conf) + .build(); FsVolumeReference ref = fsVolume.obtainReference(); ReplicaMap tempVolumeMap = new ReplicaMap(datasetLock); fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker); - activateVolume(tempVolumeMap, sd, storageType, ref); - LOG.info("Added volume - " + dir + ", StorageType: " + storageType); + activateVolume(tempVolumeMap, sd, storageLocation.getStorageType(), ref); + LOG.info("Added volume - " + storageLocation + ", StorageType: " + + storageLocation.getStorageType()); } @VisibleForTesting - public FsVolumeImpl createFsVolume(String storageUuid, File currentDir, - StorageType storageType) throws IOException { - return new FsVolumeImpl(this, storageUuid, currentDir, conf, storageType); + public FsVolumeImpl createFsVolume(String storageUuid, + Storage.StorageDirectory sd, + final StorageLocation location) throws IOException { + return new FsVolumeImplBuilder() + .setDataset(this) + .setStorageID(storageUuid) + .setStorageDirectory(sd) + .setConf(conf) + .build(); } @Override public void addVolume(final StorageLocation location, final List nsInfos) throws IOException { - final File dir = location.getFile(); - // Prepare volume in DataStorage final DataStorage.VolumeBuilder builder; try { - builder = dataStorage.prepareVolume(datanode, location.getFile(), nsInfos); + builder = dataStorage.prepareVolume(datanode, location, nsInfos); } catch (IOException e) { - volumes.addVolumeFailureInfo(new VolumeFailureInfo( - location.getFile().getAbsolutePath(), Time.now())); + volumes.addVolumeFailureInfo(new VolumeFailureInfo(location, Time.now())); throw e; } @@ -453,7 +461,7 @@ class FsDatasetImpl implements FsDatasetSpi { StorageType storageType = location.getStorageType(); final FsVolumeImpl fsVolume = - createFsVolume(sd.getStorageUuid(), sd.getCurrentDir(), storageType); + createFsVolume(sd.getStorageUuid(), sd, location); final ReplicaMap tempVolumeMap = new ReplicaMap(new AutoCloseableLock()); ArrayList exceptions = Lists.newArrayList(); @@ -482,34 +490,33 @@ class FsDatasetImpl implements FsDatasetSpi { builder.build(); activateVolume(tempVolumeMap, sd, storageType, ref); - LOG.info("Added volume - " + dir + ", StorageType: " + storageType); + LOG.info("Added volume - " + location + ", StorageType: " + storageType); } /** * Removes a set of volumes from FsDataset. - * @param volumesToRemove a set of absolute root path of each volume. + * @param storageLocationsToRemove a set of + * {@link StorageLocation}s for each volume. * @param clearFailure set true to clear failure information. */ @Override - public void removeVolumes(Set volumesToRemove, boolean clearFailure) { - // Make sure that all volumes are absolute path. - for (File vol : volumesToRemove) { - Preconditions.checkArgument(vol.isAbsolute(), - String.format("%s is not absolute path.", vol.getPath())); - } - + public void removeVolumes( + Collection storageLocationsToRemove, + boolean clearFailure) { Map> blkToInvalidate = new HashMap<>(); List storageToRemove = new ArrayList<>(); try (AutoCloseableLock lock = datasetLock.acquire()) { for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) { Storage.StorageDirectory sd = dataStorage.getStorageDir(idx); - final File absRoot = sd.getRoot().getAbsoluteFile(); - if (volumesToRemove.contains(absRoot)) { - LOG.info("Removing " + absRoot + " from FsDataset."); - + final StorageLocation sdLocation = sd.getStorageLocation(); + LOG.info("Checking removing StorageLocation " + + sdLocation + " with id " + sd.getStorageUuid()); + if (storageLocationsToRemove.contains(sdLocation)) { + LOG.info("Removing StorageLocation " + sdLocation + " with id " + + sd.getStorageUuid() + " from FsDataset."); // Disable the volume from the service. - asyncDiskService.removeVolume(sd.getCurrentDir()); - volumes.removeVolume(absRoot, clearFailure); + asyncDiskService.removeVolume(sd.getStorageUuid()); + volumes.removeVolume(sdLocation, clearFailure); volumes.waitVolumeRemoved(5000, datasetLockCondition); // Removed all replica information for the blocks on the volume. @@ -517,12 +524,14 @@ class FsDatasetImpl implements FsDatasetSpi { // not scan disks. for (String bpid : volumeMap.getBlockPoolList()) { List blocks = new ArrayList<>(); - for (Iterator it = volumeMap.replicas(bpid).iterator(); - it.hasNext(); ) { + for (Iterator it = + volumeMap.replicas(bpid).iterator(); it.hasNext();) { ReplicaInfo block = it.next(); - final File absBasePath = - new File(block.getVolume().getBasePath()).getAbsoluteFile(); - if (absBasePath.equals(absRoot)) { + final StorageLocation blockStorageLocation = + block.getVolume().getStorageLocation(); + LOG.info("checking for block " + block.getBlockId() + + " with storageLocation " + blockStorageLocation); + if (blockStorageLocation.equals(sdLocation)) { blocks.add(block); it.remove(); } @@ -625,7 +634,8 @@ class FsDatasetImpl implements FsDatasetSpi { List failedStorageLocations = Lists.newArrayListWithCapacity( infos.length); for (VolumeFailureInfo info: infos) { - failedStorageLocations.add(info.getFailedStorageLocation()); + failedStorageLocations.add( + info.getFailedStorageLocation().getFile().getAbsolutePath()); } return failedStorageLocations.toArray( new String[failedStorageLocations.size()]); @@ -663,7 +673,8 @@ class FsDatasetImpl implements FsDatasetSpi { long lastVolumeFailureDate = 0; long estimatedCapacityLostTotal = 0; for (VolumeFailureInfo info: infos) { - failedStorageLocations.add(info.getFailedStorageLocation()); + failedStorageLocations.add( + info.getFailedStorageLocation().getFile().getAbsolutePath()); long failureDate = info.getFailureDate(); if (failureDate > lastVolumeFailureDate) { lastVolumeFailureDate = failureDate; @@ -960,25 +971,15 @@ class FsDatasetImpl implements FsDatasetSpi { FsVolumeImpl targetVolume = (FsVolumeImpl) volumeRef.getVolume(); // Copy files to temp dir first - File[] blockFiles = copyBlockFiles(block.getBlockId(), - block.getGenerationStamp(), replicaInfo, - targetVolume.getTmpDir(block.getBlockPoolId()), - replicaInfo.isOnTransientStorage(), smallBufferSize, conf); - - ReplicaInfo newReplicaInfo = new ReplicaBuilder(ReplicaState.TEMPORARY) - .setBlockId(replicaInfo.getBlockId()) - .setGenerationStamp(replicaInfo.getGenerationStamp()) - .setFsVolume(targetVolume) - .setDirectoryToUse(blockFiles[0].getParentFile()) - .setBytesToReserve(0) - .build(); - newReplicaInfo.setNumBytes(blockFiles[1].length()); + ReplicaInfo newReplicaInfo = targetVolume.moveBlockToTmpLocation(block, + replicaInfo, smallBufferSize, conf); + // Finalize the copied files newReplicaInfo = finalizeReplica(block.getBlockPoolId(), newReplicaInfo); try (AutoCloseableLock lock = datasetLock.acquire()) { // Increment numBlocks here as this block moved without knowing to BPS FsVolumeImpl volume = (FsVolumeImpl) newReplicaInfo.getVolume(); - volume.getBlockPoolSlice(block.getBlockPoolId()).incrNumBlocks(); + volume.incrNumBlocks(block.getBlockPoolId()); } removeOldReplica(replicaInfo, newReplicaInfo, block.getBlockPoolId()); @@ -2072,7 +2073,7 @@ class FsDatasetImpl implements FsDatasetSpi { * @return the failed volumes. Returns null if no volume failed. */ @Override // FsDatasetSpi - public Set checkDataDir() { + public Set checkDataDir() { return volumes.checkDirs(); } @@ -2250,9 +2251,8 @@ class FsDatasetImpl implements FsDatasetSpi { .setFsVolume(vol) .setDirectoryToUse(diskFile.getParentFile()) .build(); - ((FsVolumeImpl) vol).getBlockPoolSlice(bpid) - .resolveDuplicateReplicas( - memBlockInfo, diskBlockInfo, volumeMap); + ((FsVolumeImpl) vol).resolveDuplicateReplicas(bpid, + memBlockInfo, diskBlockInfo, volumeMap); } } else { if (!diskFile.delete()) { @@ -2803,15 +2803,15 @@ class FsDatasetImpl implements FsDatasetSpi { // Add thread for DISK volume if RamDisk is configured if (ramDiskConfigured && asyncLazyPersistService != null && - !asyncLazyPersistService.queryVolume(v.getCurrentDir())) { - asyncLazyPersistService.addVolume(v.getCurrentDir()); + !asyncLazyPersistService.queryVolume(v)) { + asyncLazyPersistService.addVolume(v); } // Remove thread for DISK volume if RamDisk is not configured if (!ramDiskConfigured && asyncLazyPersistService != null && - asyncLazyPersistService.queryVolume(v.getCurrentDir())) { - asyncLazyPersistService.removeVolume(v.getCurrentDir()); + asyncLazyPersistService.queryVolume(v)) { + asyncLazyPersistService.removeVolume(v); } } @@ -2946,11 +2946,9 @@ class FsDatasetImpl implements FsDatasetSpi { // Move the replica from lazyPersist/ to finalized/ on // the target volume - BlockPoolSlice bpSlice = - replicaState.getLazyPersistVolume().getBlockPoolSlice(bpid); - newReplicaInfo = - bpSlice.activateSavedReplica(replicaInfo, replicaState); + replicaState.getLazyPersistVolume().activateSavedReplica(bpid, + replicaInfo, replicaState); // Update the volumeMap entry. volumeMap.add(bpid, newReplicaInfo); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org