Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7E3BA200BAA for ; Wed, 12 Oct 2016 17:07:20 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7AF87160AD3; Wed, 12 Oct 2016 15:07:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2C580160AEE for ; Wed, 12 Oct 2016 17:07:18 +0200 (CEST) Received: (qmail 68310 invoked by uid 500); 12 Oct 2016 15:07:16 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 67685 invoked by uid 99); 12 Oct 2016 15:07:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Oct 2016 15:07:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C2284E09B3; Wed, 12 Oct 2016 15:07:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sunilg@apache.org To: common-commits@hadoop.apache.org Date: Wed, 12 Oct 2016 15:07:17 -0000 Message-Id: <994a3055a6a0471f901d808c2f931f20@git.apache.org> In-Reply-To: <16abc1c4e3004c72af086a534bf939cb@git.apache.org> References: <16abc1c4e3004c72af086a534bf939cb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/50] [abbrv] hadoop git commit: HDFS-10637. Modifications to remove the assumption that FsVolumes are backed by java.io.File. (Virajith Jalaparti via lei) archived-at: Wed, 12 Oct 2016 15:07:20 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 57fab66..76af724 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -23,11 +23,13 @@ import java.io.FileOutputStream; import java.io.FilenameFilter; import java.io.IOException; import java.io.OutputStreamWriter; +import java.net.URI; import java.nio.channels.ClosedChannelException; import java.nio.file.Files; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -56,13 +58,18 @@ import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.LocalReplica; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder; import org.apache.hadoop.hdfs.server.datanode.LocalReplicaInPipeline; import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.BlockDirFilter; +import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.CloseableReferenceCount; @@ -102,8 +109,14 @@ public class FsVolumeImpl implements FsVolumeSpi { private final StorageType storageType; private final Map bpSlices = new ConcurrentHashMap(); + + // Refers to the base StorageLocation used to construct this volume + // (i.e., does not include STORAGE_DIR_CURRENT in + // /STORAGE_DIR_CURRENT/) + private final StorageLocation storageLocation; + private final File currentDir; // /current - private final DF usage; + private final DF usage; private final long reserved; private CloseableReferenceCount reference = new CloseableReferenceCount(); @@ -124,19 +137,25 @@ public class FsVolumeImpl implements FsVolumeSpi { */ protected ThreadPoolExecutor cacheExecutor; - FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir, - Configuration conf, StorageType storageType) throws IOException { + FsVolumeImpl(FsDatasetImpl dataset, String storageID, StorageDirectory sd, + Configuration conf) throws IOException { + + if (sd.getStorageLocation() == null) { + throw new IOException("StorageLocation specified for storage directory " + + sd + " is null"); + } this.dataset = dataset; this.storageID = storageID; + this.reservedForReplicas = new AtomicLong(0L); + this.storageLocation = sd.getStorageLocation(); + this.currentDir = sd.getCurrentDir(); + File parent = currentDir.getParentFile(); + this.usage = new DF(parent, conf); + this.storageType = storageLocation.getStorageType(); this.reserved = conf.getLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY + "." + StringUtils.toLowerCase(storageType.toString()), conf.getLong( DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT)); - this.reservedForReplicas = new AtomicLong(0L); - this.currentDir = currentDir; - File parent = currentDir.getParentFile(); - this.usage = new DF(parent, conf); - this.storageType = storageType; this.configuredCapacity = -1; this.conf = conf; cacheExecutor = initializeCacheExecutor(parent); @@ -285,19 +304,20 @@ public class FsVolumeImpl implements FsVolumeSpi { return true; } + @VisibleForTesting File getCurrentDir() { return currentDir; } - File getRbwDir(String bpid) throws IOException { + protected File getRbwDir(String bpid) throws IOException { return getBlockPoolSlice(bpid).getRbwDir(); } - File getLazyPersistDir(String bpid) throws IOException { + protected File getLazyPersistDir(String bpid) throws IOException { return getBlockPoolSlice(bpid).getLazypersistDir(); } - File getTmpDir(String bpid) throws IOException { + protected File getTmpDir(String bpid) throws IOException { return getBlockPoolSlice(bpid).getTmpDir(); } @@ -448,6 +468,7 @@ public class FsVolumeImpl implements FsVolumeSpi { return reserved; } + @VisibleForTesting BlockPoolSlice getBlockPoolSlice(String bpid) throws IOException { BlockPoolSlice bp = bpSlices.get(bpid); if (bp == null) { @@ -457,21 +478,33 @@ public class FsVolumeImpl implements FsVolumeSpi { } @Override - public String getBasePath() { - return currentDir.getParent(); + public URI getBaseURI() { + return new File(currentDir.getParent()).toURI(); } - + @Override - public boolean isTransientStorage() { - return storageType.isTransient(); + public DF getUsageStats(Configuration conf) { + if (currentDir != null) { + try { + return new DF(new File(currentDir.getParent()), conf); + } catch (IOException e) { + LOG.error("Unable to get disk statistics for volume " + this); + } + } + return null; } @Override - public String getPath(String bpid) throws IOException { - return getBlockPoolSlice(bpid).getDirectory().getAbsolutePath(); + public StorageLocation getStorageLocation() { + return storageLocation; } @Override + public boolean isTransientStorage() { + return storageType.isTransient(); + } + + @VisibleForTesting public File getFinalizedDir(String bpid) throws IOException { return getBlockPoolSlice(bpid).getFinalizedDir(); } @@ -951,7 +984,7 @@ public class FsVolumeImpl implements FsVolumeSpi { @Override public String toString() { - return currentDir.getAbsolutePath(); + return currentDir != null ? currentDir.getParent() : "NULL"; } void shutdown() { @@ -1189,5 +1222,167 @@ public class FsVolumeImpl implements FsVolumeSpi { dstBlockFile, true, DFSUtilClient.getSmallBufferSize(conf), conf); } + @Override + public LinkedList compileReport(String bpid, + LinkedList report, ReportCompiler reportCompiler) + throws InterruptedException, IOException { + return compileReport(getFinalizedDir(bpid), + getFinalizedDir(bpid), report, reportCompiler); + } + + private LinkedList compileReport(File bpFinalizedDir, + File dir, LinkedList report, ReportCompiler reportCompiler) + throws InterruptedException { + + reportCompiler.throttle(); + + List fileNames; + try { + fileNames = IOUtils.listDirectory(dir, BlockDirFilter.INSTANCE); + } catch (IOException ioe) { + LOG.warn("Exception occured while compiling report: ", ioe); + // Initiate a check on disk failure. + dataset.datanode.checkDiskErrorAsync(); + // Ignore this directory and proceed. + return report; + } + Collections.sort(fileNames); + + /* + * Assumption: In the sorted list of files block file appears immediately + * before block metadata file. This is true for the current naming + * convention for block file blk_ and meta file + * blk__.meta + */ + for (int i = 0; i < fileNames.size(); i++) { + // Make sure this thread can make a timely exit. With a low throttle + // rate, completing a run can take a looooong time. + if (Thread.interrupted()) { + throw new InterruptedException(); + } + + File file = new File(dir, fileNames.get(i)); + if (file.isDirectory()) { + compileReport(bpFinalizedDir, file, report, reportCompiler); + continue; + } + if (!Block.isBlockFilename(file)) { + if (isBlockMetaFile(Block.BLOCK_FILE_PREFIX, file.getName())) { + long blockId = Block.getBlockId(file.getName()); + verifyFileLocation(file.getParentFile(), bpFinalizedDir, + blockId); + report.add(new ScanInfo(blockId, null, file, this)); + } + continue; + } + File blockFile = file; + long blockId = Block.filename2id(file.getName()); + File metaFile = null; + + // Skip all the files that start with block name until + // getting to the metafile for the block + while (i + 1 < fileNames.size()) { + File blkMetaFile = new File(dir, fileNames.get(i + 1)); + if (!(blkMetaFile.isFile() + && blkMetaFile.getName().startsWith(blockFile.getName()))) { + break; + } + i++; + if (isBlockMetaFile(blockFile.getName(), blkMetaFile.getName())) { + metaFile = blkMetaFile; + break; + } + } + verifyFileLocation(blockFile, bpFinalizedDir, blockId); + report.add(new ScanInfo(blockId, blockFile, metaFile, this)); + } + return report; + } + + /** + * Helper method to determine if a file name is consistent with a block. + * meta-data file + * + * @param blockId the block ID + * @param metaFile the file to check + * @return whether the file name is a block meta-data file name + */ + private static boolean isBlockMetaFile(String blockId, String metaFile) { + return metaFile.startsWith(blockId) + && metaFile.endsWith(Block.METADATA_EXTENSION); + } + + /** + * Verify whether the actual directory location of block file has the + * expected directory path computed using its block ID. + */ + private void verifyFileLocation(File actualBlockFile, + File bpFinalizedDir, long blockId) { + File expectedBlockDir = + DatanodeUtil.idToBlockDir(bpFinalizedDir, blockId); + File actualBlockDir = actualBlockFile.getParentFile(); + if (actualBlockDir.compareTo(expectedBlockDir) != 0) { + LOG.warn("Block: " + blockId + + " found in invalid directory. Expected directory: " + + expectedBlockDir + ". Actual directory: " + actualBlockDir); + } + } + + public ReplicaInfo moveBlockToTmpLocation(ExtendedBlock block, + ReplicaInfo replicaInfo, + int smallBufferSize, + Configuration conf) throws IOException { + + File[] blockFiles = FsDatasetImpl.copyBlockFiles(block.getBlockId(), + block.getGenerationStamp(), replicaInfo, + getTmpDir(block.getBlockPoolId()), + replicaInfo.isOnTransientStorage(), smallBufferSize, conf); + + ReplicaInfo newReplicaInfo = new ReplicaBuilder(ReplicaState.TEMPORARY) + .setBlockId(replicaInfo.getBlockId()) + .setGenerationStamp(replicaInfo.getGenerationStamp()) + .setFsVolume(this) + .setDirectoryToUse(blockFiles[0].getParentFile()) + .setBytesToReserve(0) + .build(); + newReplicaInfo.setNumBytes(blockFiles[1].length()); + return newReplicaInfo; + } + + public File[] copyBlockToLazyPersistLocation(String bpId, long blockId, + long genStamp, + ReplicaInfo replicaInfo, + int smallBufferSize, + Configuration conf) throws IOException { + + File lazyPersistDir = getLazyPersistDir(bpId); + if (!lazyPersistDir.exists() && !lazyPersistDir.mkdirs()) { + FsDatasetImpl.LOG.warn("LazyWriter failed to create " + lazyPersistDir); + throw new IOException("LazyWriter fail to find or " + + "create lazy persist dir: " + lazyPersistDir.toString()); + } + + // No FsDatasetImpl lock for the file copy + File[] targetFiles = FsDatasetImpl.copyBlockFiles( + blockId, genStamp, replicaInfo, lazyPersistDir, true, + smallBufferSize, conf); + return targetFiles; + } + + public void incrNumBlocks(String bpid) throws IOException { + getBlockPoolSlice(bpid).incrNumBlocks(); + } + + public void resolveDuplicateReplicas(String bpid, ReplicaInfo memBlockInfo, + ReplicaInfo diskBlockInfo, ReplicaMap volumeMap) throws IOException { + getBlockPoolSlice(bpid).resolveDuplicateReplicas( + memBlockInfo, diskBlockInfo, volumeMap); + } + + public ReplicaInfo activateSavedReplica(String bpid, + ReplicaInfo replicaInfo, RamDiskReplica replicaState) throws IOException { + return getBlockPoolSlice(bpid).activateSavedReplica(replicaInfo, + replicaState); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java new file mode 100644 index 0000000..a1f7e91 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; + +/** + * This class is to be used as a builder for {@link FsVolumeImpl} objects. + */ +public class FsVolumeImplBuilder { + + private FsDatasetImpl dataset; + private String storageID; + private StorageDirectory sd; + private Configuration conf; + + public FsVolumeImplBuilder() { + dataset = null; + storageID = null; + sd = null; + conf = null; + } + + FsVolumeImplBuilder setDataset(FsDatasetImpl dataset) { + this.dataset = dataset; + return this; + } + + FsVolumeImplBuilder setStorageID(String id) { + this.storageID = id; + return this; + } + + FsVolumeImplBuilder setStorageDirectory(StorageDirectory sd) { + this.sd = sd; + return this; + } + + FsVolumeImplBuilder setConf(Configuration conf) { + this.conf = conf; + return this; + } + + FsVolumeImpl build() throws IOException { + return new FsVolumeImpl(dataset, storageID, sd, conf); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index f869008..cf9c319 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; -import java.io.File; import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.util.ArrayList; @@ -41,6 +40,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.BlockScanner; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.AutoCloseableLock; @@ -51,8 +51,10 @@ class FsVolumeList { private final CopyOnWriteArrayList volumes = new CopyOnWriteArrayList<>(); // Tracks volume failures, sorted by volume path. - private final Map volumeFailureInfos = - Collections.synchronizedMap(new TreeMap()); + // map from volume storageID to the volume failure info + private final Map volumeFailureInfos = + Collections.synchronizedMap( + new TreeMap()); private final ConcurrentLinkedQueue volumesBeingRemoved = new ConcurrentLinkedQueue<>(); private final AutoCloseableLock checkDirsLock; @@ -234,10 +236,9 @@ class FsVolumeList { * * @return list of all the failed volumes. */ - Set checkDirs() { + Set checkDirs() { try (AutoCloseableLock lock = checkDirsLock.acquire()) { - Set failedVols = null; - + Set failedLocations = null; // Make a copy of volumes for performing modification final List volumeList = getVolumes(); @@ -247,10 +248,10 @@ class FsVolumeList { fsv.checkDirs(); } catch (DiskErrorException e) { FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ", e); - if (failedVols == null) { - failedVols = new HashSet<>(1); + if (failedLocations == null) { + failedLocations = new HashSet<>(1); } - failedVols.add(new File(fsv.getBasePath()).getAbsoluteFile()); + failedLocations.add(fsv.getStorageLocation()); addVolumeFailureInfo(fsv); removeVolume(fsv); } catch (ClosedChannelException e) { @@ -261,13 +262,13 @@ class FsVolumeList { } } - if (failedVols != null && failedVols.size() > 0) { - FsDatasetImpl.LOG.warn("Completed checkDirs. Found " + failedVols.size() - + " failure volumes."); + if (failedLocations != null && failedLocations.size() > 0) { + FsDatasetImpl.LOG.warn("Completed checkDirs. Found " + + failedLocations.size() + " failure volumes."); } waitVolumeRemoved(5000, checkDirsLockCondition); - return failedVols; + return failedLocations; } } @@ -315,7 +316,7 @@ class FsVolumeList { } // If the volume is used to replace a failed volume, it needs to reset the // volume failure info for this volume. - removeVolumeFailureInfo(new File(volume.getBasePath())); + removeVolumeFailureInfo(volume.getStorageLocation()); FsDatasetImpl.LOG.info("Added new volume: " + volume.getStorageID()); } @@ -351,16 +352,15 @@ class FsVolumeList { * @param volume the volume to be removed. * @param clearFailure set true to remove failure info for this volume. */ - void removeVolume(File volume, boolean clearFailure) { + void removeVolume(StorageLocation storageLocation, boolean clearFailure) { for (FsVolumeImpl fsVolume : volumes) { - String basePath = new File(fsVolume.getBasePath()).getAbsolutePath(); - String targetPath = volume.getAbsolutePath(); - if (basePath.equals(targetPath)) { + StorageLocation baseLocation = fsVolume.getStorageLocation(); + if (baseLocation.equals(storageLocation)) { removeVolume(fsVolume); } } if (clearFailure) { - removeVolumeFailureInfo(volume); + removeVolumeFailureInfo(storageLocation); } } @@ -394,13 +394,13 @@ class FsVolumeList { private void addVolumeFailureInfo(FsVolumeImpl vol) { addVolumeFailureInfo(new VolumeFailureInfo( - new File(vol.getBasePath()).getAbsolutePath(), + vol.getStorageLocation(), Time.now(), vol.getCapacity())); } - private void removeVolumeFailureInfo(File vol) { - volumeFailureInfos.remove(vol.getAbsolutePath()); + private void removeVolumeFailureInfo(StorageLocation location) { + volumeFailureInfos.remove(location); } void addBlockPool(final String bpid, final Configuration conf) throws IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java index 9e549f9..d6969c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RamDiskAsyncLazyPersistService.java @@ -58,8 +58,8 @@ class RamDiskAsyncLazyPersistService { private final Configuration conf; private final ThreadGroup threadGroup; - private Map executors - = new HashMap(); + private Map executors + = new HashMap(); private final static HdfsConfiguration EMPTY_HDFS_CONF = new HdfsConfiguration(); /** @@ -75,13 +75,14 @@ class RamDiskAsyncLazyPersistService { this.threadGroup = new ThreadGroup(getClass().getSimpleName()); } - private void addExecutorForVolume(final File volume) { + private void addExecutorForVolume(final String storageId) { ThreadFactory threadFactory = new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(threadGroup, r); - t.setName("Async RamDisk lazy persist worker for volume " + volume); + t.setName("Async RamDisk lazy persist worker " + + " for volume with id " + storageId); return t; } }; @@ -93,39 +94,41 @@ class RamDiskAsyncLazyPersistService { // This can reduce the number of running threads executor.allowCoreThreadTimeOut(true); - executors.put(volume, executor); + executors.put(storageId, executor); } /** * Starts AsyncLazyPersistService for a new volume * @param volume the root of the new data volume. */ - synchronized void addVolume(File volume) { + synchronized void addVolume(FsVolumeImpl volume) { + String storageId = volume.getStorageID(); if (executors == null) { throw new RuntimeException("AsyncLazyPersistService is already shutdown"); } - ThreadPoolExecutor executor = executors.get(volume); + ThreadPoolExecutor executor = executors.get(storageId); if (executor != null) { throw new RuntimeException("Volume " + volume + " is already existed."); } - addExecutorForVolume(volume); + addExecutorForVolume(storageId); } /** * Stops AsyncLazyPersistService for a volume. * @param volume the root of the volume. */ - synchronized void removeVolume(File volume) { + synchronized void removeVolume(FsVolumeImpl volume) { + String storageId = volume.getStorageID(); if (executors == null) { throw new RuntimeException("AsyncDiskService is already shutdown"); } - ThreadPoolExecutor executor = executors.get(volume); + ThreadPoolExecutor executor = executors.get(storageId); if (executor == null) { - throw new RuntimeException("Can not find volume " + volume - + " to remove."); + throw new RuntimeException("Can not find volume with storage id " + + storageId + " to remove."); } else { executor.shutdown(); - executors.remove(volume); + executors.remove(storageId); } } @@ -135,25 +138,28 @@ class RamDiskAsyncLazyPersistService { * @return true if there is one thread pool for the volume * false otherwise */ - synchronized boolean queryVolume(File volume) { + synchronized boolean queryVolume(FsVolumeImpl volume) { + String storageId = volume.getStorageID(); if (executors == null) { - throw new RuntimeException("AsyncLazyPersistService is already shutdown"); + throw new RuntimeException( + "AsyncLazyPersistService is already shutdown"); } - ThreadPoolExecutor executor = executors.get(volume); + ThreadPoolExecutor executor = executors.get(storageId); return (executor != null); } /** * Execute the task sometime in the future, using ThreadPools. */ - synchronized void execute(File root, Runnable task) { + synchronized void execute(String storageId, Runnable task) { if (executors == null) { - throw new RuntimeException("AsyncLazyPersistService is already shutdown"); + throw new RuntimeException( + "AsyncLazyPersistService is already shutdown"); } - ThreadPoolExecutor executor = executors.get(root); + ThreadPoolExecutor executor = executors.get(storageId); if (executor == null) { - throw new RuntimeException("Cannot find root " + root - + " for execution of task " + task); + throw new RuntimeException("Cannot find root storage volume with id " + + storageId + " for execution of task " + task); } else { executor.execute(task); } @@ -169,7 +175,7 @@ class RamDiskAsyncLazyPersistService { } else { LOG.info("Shutting down all async lazy persist service threads"); - for (Map.Entry e : executors.entrySet()) { + for (Map.Entry e : executors.entrySet()) { e.getValue().shutdown(); } // clear the executor map so that calling execute again will fail. @@ -189,18 +195,11 @@ class RamDiskAsyncLazyPersistService { + bpId + " block id: " + blockId); } - FsVolumeImpl volume = (FsVolumeImpl)target.getVolume(); - File lazyPersistDir = volume.getLazyPersistDir(bpId); - if (!lazyPersistDir.exists() && !lazyPersistDir.mkdirs()) { - FsDatasetImpl.LOG.warn("LazyWriter failed to create " + lazyPersistDir); - throw new IOException("LazyWriter fail to find or create lazy persist dir: " - + lazyPersistDir.toString()); - } - ReplicaLazyPersistTask lazyPersistTask = new ReplicaLazyPersistTask( - bpId, blockId, genStamp, creationTime, replica, - target, lazyPersistDir); - execute(volume.getCurrentDir(), lazyPersistTask); + bpId, blockId, genStamp, creationTime, replica, target); + + FsVolumeImpl volume = (FsVolumeImpl)target.getVolume(); + execute(volume.getStorageID(), lazyPersistTask); } class ReplicaLazyPersistTask implements Runnable { @@ -210,19 +209,17 @@ class RamDiskAsyncLazyPersistService { private final long creationTime; private final ReplicaInfo replicaInfo; private final FsVolumeReference targetVolume; - private final File lazyPersistDir; ReplicaLazyPersistTask(String bpId, long blockId, long genStamp, long creationTime, ReplicaInfo replicaInfo, - FsVolumeReference targetVolume, File lazyPersistDir) { + FsVolumeReference targetVolume) { this.bpId = bpId; this.blockId = blockId; this.genStamp = genStamp; this.creationTime = creationTime; this.replicaInfo = replicaInfo; this.targetVolume = targetVolume; - this.lazyPersistDir = lazyPersistDir; } @Override @@ -241,14 +238,14 @@ class RamDiskAsyncLazyPersistService { final FsDatasetImpl dataset = (FsDatasetImpl)datanode.getFSDataset(); try (FsVolumeReference ref = this.targetVolume) { int smallBufferSize = DFSUtilClient.getSmallBufferSize(EMPTY_HDFS_CONF); - // No FsDatasetImpl lock for the file copy - File targetFiles[] = FsDatasetImpl.copyBlockFiles( - blockId, genStamp, replicaInfo, lazyPersistDir, true, - smallBufferSize, conf); + + FsVolumeImpl volume = (FsVolumeImpl)ref.getVolume(); + File[] targetFiles = volume.copyBlockToLazyPersistLocation(bpId, + blockId, genStamp, replicaInfo, smallBufferSize, conf); // Lock FsDataSetImpl during onCompleteLazyPersist callback dataset.onCompleteLazyPersist(bpId, blockId, - creationTime, targetFiles, (FsVolumeImpl)ref.getVolume()); + creationTime, targetFiles, volume); succeeded = true; } catch (Exception e){ FsDatasetImpl.LOG.warn( http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/VolumeFailureInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/VolumeFailureInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/VolumeFailureInfo.java index c3ce2a4..a762785 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/VolumeFailureInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/VolumeFailureInfo.java @@ -17,11 +17,13 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; + /** * Tracks information about failure of a data volume. */ final class VolumeFailureInfo { - private final String failedStorageLocation; + private final StorageLocation failedStorageLocation; private final long failureDate; private final long estimatedCapacityLost; @@ -33,7 +35,8 @@ final class VolumeFailureInfo { * @param failedStorageLocation storage location that has failed * @param failureDate date/time of failure in milliseconds since epoch */ - public VolumeFailureInfo(String failedStorageLocation, long failureDate) { + public VolumeFailureInfo(StorageLocation failedStorageLocation, + long failureDate) { this(failedStorageLocation, failureDate, 0); } @@ -44,8 +47,8 @@ final class VolumeFailureInfo { * @param failureDate date/time of failure in milliseconds since epoch * @param estimatedCapacityLost estimate of capacity lost in bytes */ - public VolumeFailureInfo(String failedStorageLocation, long failureDate, - long estimatedCapacityLost) { + public VolumeFailureInfo(StorageLocation failedStorageLocation, + long failureDate, long estimatedCapacityLost) { this.failedStorageLocation = failedStorageLocation; this.failureDate = failureDate; this.estimatedCapacityLost = estimatedCapacityLost; @@ -56,7 +59,7 @@ final class VolumeFailureInfo { * * @return storage location that has failed */ - public String getFailedStorageLocation() { + public StorageLocation getFailedStorageLocation() { return this.failedStorageLocation; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 0f4f14c..2471dc8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -5413,7 +5413,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, VolumeFailureSummary volumeFailureSummary = node.getVolumeFailureSummary(); if (volumeFailureSummary != null) { innerinfo - .put("failedStorageLocations", + .put("failedStorageIDs", volumeFailureSummary.getFailedStorageLocations()) .put("lastVolumeFailureDate", volumeFailureSummary.getLastVolumeFailureDate()) http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java index b11b48a..6efc53a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; @@ -216,13 +217,13 @@ public class TestNameNodePrunesMissingStorages { datanodeToRemoveStorageFromIdx++; } // Find the volume within the datanode which holds that first storage. - String volumeDirectoryToRemove = null; + StorageLocation volumeLocationToRemove = null; try (FsVolumeReferences volumes = datanodeToRemoveStorageFrom.getFSDataset().getFsVolumeReferences()) { assertEquals(NUM_STORAGES_PER_DN, volumes.size()); for (FsVolumeSpi volume : volumes) { if (volume.getStorageID().equals(storageIdToRemove)) { - volumeDirectoryToRemove = volume.getBasePath(); + volumeLocationToRemove = volume.getStorageLocation(); } } }; @@ -230,10 +231,11 @@ public class TestNameNodePrunesMissingStorages { // Replace the volume directory with a regular file, which will // cause a volume failure. (If we merely removed the directory, // it would be re-initialized with a new storage ID.) - assertNotNull(volumeDirectoryToRemove); + assertNotNull(volumeLocationToRemove); datanodeToRemoveStorageFrom.shutdown(); - FileUtil.fullyDelete(new File(volumeDirectoryToRemove)); - FileOutputStream fos = new FileOutputStream(volumeDirectoryToRemove); + FileUtil.fullyDelete(volumeLocationToRemove.getFile()); + FileOutputStream fos = new FileOutputStream( + volumeLocationToRemove.getFile().toString()); try { fos.write(1); } finally { @@ -326,7 +328,8 @@ public class TestNameNodePrunesMissingStorages { dn.getFSDataset().getFsVolumeReferences(); final String newStorageId = DatanodeStorage.generateUuid(); try { - File currentDir = new File(volumeRefs.get(0).getBasePath(), "current"); + File currentDir = new File( + volumeRefs.get(0).getStorageLocation().getFile(), "current"); File versionFile = new File(currentDir, "VERSION"); rewriteVersionFile(versionFile, newStorageId); } finally { http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 6034d1e..6c59231 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -22,7 +22,9 @@ import java.io.FileDescriptor; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.URI; import java.nio.channels.ClosedChannelException; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -38,6 +40,7 @@ import javax.management.StandardMBean; import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -46,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; @@ -495,21 +499,6 @@ public class SimulatedFSDataset implements FsDatasetSpi { } @Override - public String getBasePath() { - return null; - } - - @Override - public String getPath(String bpid) throws IOException { - return null; - } - - @Override - public File getFinalizedDir(String bpid) throws IOException { - return null; - } - - @Override public StorageType getStorageType() { return null; } @@ -546,6 +535,28 @@ public class SimulatedFSDataset implements FsDatasetSpi { public FsDatasetSpi getDataset() { throw new UnsupportedOperationException(); } + + @Override + public StorageLocation getStorageLocation() { + return null; + } + + @Override + public URI getBaseURI() { + return null; + } + + @Override + public DF getUsageStats(Configuration conf) { + return null; + } + + @Override + public LinkedList compileReport(String bpid, + LinkedList report, ReportCompiler reportCompiler) + throws InterruptedException, IOException { + return null; + } } private final Map> blockMap @@ -1030,7 +1041,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { } @Override - public Set checkDataDir() { + public Set checkDataDir() { // nothing to check for simulated data set return null; } @@ -1344,7 +1355,8 @@ public class SimulatedFSDataset implements FsDatasetSpi { } @Override - public synchronized void removeVolumes(Set volumes, boolean clearFailure) { + public synchronized void removeVolumes(Collection volumes, + boolean clearFailure) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java index 021361b..c55a828 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java @@ -549,7 +549,8 @@ public class TestBlockScanner { info.shouldRun = false; } ctx.datanode.shutdown(); - String vPath = ctx.volumes.get(0).getBasePath(); + String vPath = ctx.volumes.get(0).getStorageLocation() + .getFile().getAbsolutePath(); File cursorPath = new File(new File(new File(vPath, "current"), ctx.bpids[0]), "scanner.cursor"); assertTrue("Failed to find cursor save file in " + http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java index 0dbb09c..06387c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java @@ -52,7 +52,6 @@ import org.junit.Test; import java.io.File; import java.io.IOException; -import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -519,11 +518,8 @@ public class TestDataNodeHotSwapVolumes { ExtendedBlock block = DFSTestUtil.getAllBlocks(fs, testFile).get(1).getBlock(); FsVolumeSpi volumeWithBlock = dn.getFSDataset().getVolume(block); - String basePath = volumeWithBlock.getBasePath(); - File storageDir = new File(basePath); - URI fileUri = storageDir.toURI(); - String dirWithBlock = - "[" + volumeWithBlock.getStorageType() + "]" + fileUri; + String dirWithBlock = "[" + volumeWithBlock.getStorageType() + "]" + + volumeWithBlock.getStorageLocation().getFile().toURI(); String newDirs = dirWithBlock; for (String dir : oldDirs) { if (dirWithBlock.startsWith(dir)) { @@ -581,8 +577,8 @@ public class TestDataNodeHotSwapVolumes { try (FsDatasetSpi.FsVolumeReferences volumes = dataset.getFsVolumeReferences()) { for (FsVolumeSpi volume : volumes) { - assertThat(volume.getBasePath(), is(not(anyOf( - is(newDirs.get(0)), is(newDirs.get(2)))))); + assertThat(volume.getStorageLocation().getFile().toString(), + is(not(anyOf(is(newDirs.get(0)), is(newDirs.get(2)))))); } } DataStorage storage = dn.getStorage(); @@ -765,7 +761,7 @@ public class TestDataNodeHotSwapVolumes { try (FsDatasetSpi.FsVolumeReferences volumes = dn.getFSDataset().getFsVolumeReferences()) { for (FsVolumeSpi vol : volumes) { - if (vol.getBasePath().equals(basePath.getPath())) { + if (vol.getBaseURI().equals(basePath.toURI())) { return (FsVolumeImpl) vol; } } @@ -810,6 +806,7 @@ public class TestDataNodeHotSwapVolumes { assertEquals(used, failedVolume.getDfsUsed()); DataNodeTestUtils.restoreDataDirFromFailure(dirToFail); + LOG.info("reconfiguring DN "); assertThat( "DN did not update its own config", dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, oldDataDir), http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index 6792ba8..47f4823 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -21,7 +21,6 @@ import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -254,17 +253,18 @@ public class TestDataNodeVolumeFailure { FsDatasetSpi data = dn0.getFSDataset(); try (FsDatasetSpi.FsVolumeReferences vols = data.getFsVolumeReferences()) { for (FsVolumeSpi volume : vols) { - assertNotEquals(new File(volume.getBasePath()).getAbsoluteFile(), - dn0Vol1.getAbsoluteFile()); + assertFalse(volume.getStorageLocation().getFile() + .getAbsolutePath().startsWith(dn0Vol1.getAbsolutePath() + )); } } // 3. all blocks on dn0Vol1 have been removed. for (ReplicaInfo replica : FsDatasetTestUtil.getReplicas(data, bpid)) { assertNotNull(replica.getVolume()); - assertNotEquals( - new File(replica.getVolume().getBasePath()).getAbsoluteFile(), - dn0Vol1.getAbsoluteFile()); + assertFalse(replica.getVolume().getStorageLocation().getFile() + .getAbsolutePath().startsWith(dn0Vol1.getAbsolutePath() + )); } // 4. dn0Vol1 is not in DN0's configuration and dataDirs anymore. http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java index 8d021cd..4bb5e7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java @@ -539,6 +539,16 @@ public class TestDataNodeVolumeFailureReporting { assertCounter("VolumeFailures", expectedVolumeFailuresCounter, getMetrics(dn.getMetrics().name())); FsDatasetSpi fsd = dn.getFSDataset(); + StringBuilder strBuilder = new StringBuilder(); + strBuilder.append("expectedFailedVolumes is "); + for (String expected: expectedFailedVolumes) { + strBuilder.append(expected + ","); + } + strBuilder.append(" fsd.getFailedStorageLocations() is "); + for (String expected: fsd.getFailedStorageLocations()) { + strBuilder.append(expected + ","); + } + LOG.info(strBuilder.toString()); assertEquals(expectedFailedVolumes.length, fsd.getNumFailedVolumes()); assertArrayEquals(expectedFailedVolumes, fsd.getFailedStorageLocations()); if (expectedFailedVolumes.length > 0) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index 576aae0..08a5af9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URI; import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.util.ArrayList; @@ -44,6 +45,7 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; @@ -56,11 +58,13 @@ import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.GenericTestUtils; @@ -185,18 +189,20 @@ public class TestDirectoryScanner { // Volume without a copy of the block. Make a copy now. File sourceBlock = new File(b.getBlockURI()); File sourceMeta = new File(b.getMetadataURI()); - String sourceRoot = b.getVolume().getBasePath(); - String destRoot = v.getBasePath(); + URI sourceRoot = b.getVolume().getStorageLocation().getFile().toURI(); + URI destRoot = v.getStorageLocation().getFile().toURI(); String relativeBlockPath = - new File(sourceRoot).toURI().relativize(sourceBlock.toURI()) + sourceRoot.relativize(sourceBlock.toURI()) .getPath(); String relativeMetaPath = - new File(sourceRoot).toURI().relativize(sourceMeta.toURI()) + sourceRoot.relativize(sourceMeta.toURI()) .getPath(); - File destBlock = new File(destRoot, relativeBlockPath); - File destMeta = new File(destRoot, relativeMetaPath); + File destBlock = new File(new File(destRoot).toString(), + relativeBlockPath); + File destMeta = new File(new File(destRoot).toString(), + relativeMetaPath); destBlock.getParentFile().mkdirs(); FileUtils.copyFile(sourceBlock, destBlock); @@ -238,7 +244,8 @@ public class TestDirectoryScanner { try (FsDatasetSpi.FsVolumeReferences volumes = fds.getFsVolumeReferences()) { int numVolumes = volumes.size(); int index = rand.nextInt(numVolumes - 1); - File finalizedDir = volumes.get(index).getFinalizedDir(bpid); + File finalizedDir = ((FsVolumeImpl) volumes.get(index)) + .getFinalizedDir(bpid); File file = new File(finalizedDir, getBlockFile(id)); if (file.createNewFile()) { LOG.info("Created block file " + file.getName()); @@ -253,8 +260,8 @@ public class TestDirectoryScanner { try (FsDatasetSpi.FsVolumeReferences refs = fds.getFsVolumeReferences()) { int numVolumes = refs.size(); int index = rand.nextInt(numVolumes - 1); - - File finalizedDir = refs.get(index).getFinalizedDir(bpid); + File finalizedDir = ((FsVolumeImpl) refs.get(index)) + .getFinalizedDir(bpid); File file = new File(finalizedDir, getMetaFile(id)); if (file.createNewFile()) { LOG.info("Created metafile " + file.getName()); @@ -271,7 +278,8 @@ public class TestDirectoryScanner { int numVolumes = refs.size(); int index = rand.nextInt(numVolumes - 1); - File finalizedDir = refs.get(index).getFinalizedDir(bpid); + File finalizedDir = + ((FsVolumeImpl) refs.get(index)).getFinalizedDir(bpid); File file = new File(finalizedDir, getBlockFile(id)); if (file.createNewFile()) { LOG.info("Created block file " + file.getName()); @@ -311,7 +319,7 @@ public class TestDirectoryScanner { scanner.reconcile(); assertTrue(scanner.diffs.containsKey(bpid)); - LinkedList diff = scanner.diffs.get(bpid); + LinkedList diff = scanner.diffs.get(bpid); assertTrue(scanner.stats.containsKey(bpid)); DirectoryScanner.Stats stats = scanner.stats.get(bpid); @@ -820,17 +828,6 @@ public class TestDirectoryScanner { return 0; } - @Override - public String getBasePath() { - return (new File("/base")).getAbsolutePath(); - } - - @Override - public String getPath(String bpid) throws IOException { - return (new File("/base/current/" + bpid)).getAbsolutePath(); - } - - @Override public File getFinalizedDir(String bpid) throws IOException { return new File("/base/current/" + bpid + "/finalized"); } @@ -877,6 +874,29 @@ public class TestDirectoryScanner { public FsDatasetSpi getDataset() { throw new UnsupportedOperationException(); } + + @Override + public StorageLocation getStorageLocation() { + return null; + } + + @Override + public URI getBaseURI() { + return (new File("/base")).toURI(); + } + + @Override + public DF getUsageStats(Configuration conf) { + return null; + } + + @Override + public LinkedList compileReport(String bpid, + LinkedList report, ReportCompiler reportCompiler) + throws InterruptedException, IOException { + return null; + } + } private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi(); @@ -887,8 +907,8 @@ public class TestDirectoryScanner { void testScanInfoObject(long blockId, File blockFile, File metaFile) throws Exception { - DirectoryScanner.ScanInfo scanInfo = - new DirectoryScanner.ScanInfo(blockId, blockFile, metaFile, TEST_VOLUME); + FsVolumeSpi.ScanInfo scanInfo = + new FsVolumeSpi.ScanInfo(blockId, blockFile, metaFile, TEST_VOLUME); assertEquals(blockId, scanInfo.getBlockId()); if (blockFile != null) { assertEquals(blockFile.getAbsolutePath(), @@ -906,8 +926,8 @@ public class TestDirectoryScanner { } void testScanInfoObject(long blockId) throws Exception { - DirectoryScanner.ScanInfo scanInfo = - new DirectoryScanner.ScanInfo(blockId, null, null, null); + FsVolumeSpi.ScanInfo scanInfo = + new FsVolumeSpi.ScanInfo(blockId, null, null, null); assertEquals(blockId, scanInfo.getBlockId()); assertNull(scanInfo.getBlockFile()); assertNull(scanInfo.getMetaFile()); @@ -963,8 +983,8 @@ public class TestDirectoryScanner { List volumes = new ArrayList<>(); Iterator iterator = fds.getFsVolumeReferences().iterator(); while (iterator.hasNext()) { - FsVolumeSpi volume = iterator.next(); - FsVolumeSpi spy = Mockito.spy(volume); + FsVolumeImpl volume = (FsVolumeImpl) iterator.next(); + FsVolumeImpl spy = Mockito.spy(volume); Mockito.doThrow(new IOException("Error while getFinalizedDir")) .when(spy).getFinalizedDir(volume.getBlockPoolList()[0]); volumes.add(spy); http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java index 86d2ff4..2103392 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java @@ -199,7 +199,7 @@ public class TestDiskError { try (FsDatasetSpi.FsVolumeReferences volumes = dn.getFSDataset().getFsVolumeReferences()) { for (FsVolumeSpi vol : volumes) { - String dir = vol.getBasePath(); + String dir = vol.getStorageLocation().getFile().getAbsolutePath(); Path dataDir = new Path(dir); FsPermission actual = localFS.getFileStatus(dataDir).getPermission(); assertEquals("Permission for dir: " + dataDir + ", is " + actual + http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 1268108..7b7f04f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -56,12 +56,14 @@ public class ExternalDatasetImpl implements FsDatasetSpi { } @Override - public void addVolume(StorageLocation location, List nsInfos) throws IOException { - + public void addVolume(StorageLocation location, List nsInfos) + throws IOException { } @Override - public void removeVolumes(Set volumes, boolean clearFailure) { + public void removeVolumes(Collection volumes, + boolean clearFailure) { + throw new UnsupportedOperationException(); } @Override @@ -242,7 +244,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi { } @Override - public Set checkDataDir() { + public Set checkDataDir() { return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java index 985a259..83d6c4c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java @@ -18,11 +18,16 @@ package org.apache.hadoop.hdfs.server.datanode.extdataset; -import java.io.File; import java.io.IOException; +import java.net.URI; import java.nio.channels.ClosedChannelException; +import java.util.LinkedList; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; @@ -44,21 +49,6 @@ public class ExternalVolumeImpl implements FsVolumeSpi { } @Override - public String getBasePath() { - return null; - } - - @Override - public String getPath(String bpid) throws IOException { - return null; - } - - @Override - public File getFinalizedDir(String bpid) throws IOException { - return null; - } - - @Override public String getStorageID() { return null; } @@ -100,4 +90,26 @@ public class ExternalVolumeImpl implements FsVolumeSpi { public FsDatasetSpi getDataset() { return null; } + + @Override + public StorageLocation getStorageLocation() { + return null; + } + + @Override + public URI getBaseURI() { + return null; + } + + @Override + public DF getUsageStats(Configuration conf) { + return null; + } + + @Override + public LinkedList compileReport(String bpid, + LinkedList report, ReportCompiler reportCompiler) + throws InterruptedException, IOException { + return null; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java index a465c05..07ddb59 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java @@ -374,9 +374,12 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils { public long getRawCapacity() throws IOException { try (FsVolumeReferences volRefs = dataset.getFsVolumeReferences()) { Preconditions.checkState(volRefs.size() != 0); - DF df = new DF(new File(volRefs.get(0).getBasePath()), - dataset.datanode.getConf()); - return df.getCapacity(); + DF df = volRefs.get(0).getUsageStats(dataset.datanode.getConf()); + if (df != null) { + return df.getCapacity(); + } else { + return -1; + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 179b617..e48aae0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.datanode.BlockScanner; import org.apache.hadoop.hdfs.server.datanode.DNConf; @@ -50,7 +51,9 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.io.MultipleIOException; @@ -122,8 +125,10 @@ public class TestFsDatasetImpl { private final static String BLOCKPOOL = "BP-TEST"; - private static Storage.StorageDirectory createStorageDirectory(File root) { - Storage.StorageDirectory sd = new Storage.StorageDirectory(root); + private static Storage.StorageDirectory createStorageDirectory(File root) + throws SecurityException, IOException { + Storage.StorageDirectory sd = new Storage.StorageDirectory( + StorageLocation.parse(root.toURI().toString())); DataStorage.createStorageID(sd, false); return sd; } @@ -196,16 +201,18 @@ public class TestFsDatasetImpl { for (int i = 0; i < numNewVolumes; i++) { String path = BASE_DIR + "/newData" + i; String pathUri = new Path(path).toUri().toString(); - expectedVolumes.add(new File(pathUri).toString()); + expectedVolumes.add(new File(pathUri).getAbsolutePath()); StorageLocation loc = StorageLocation.parse(pathUri); Storage.StorageDirectory sd = createStorageDirectory(new File(path)); DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd); - when(storage.prepareVolume(eq(datanode), eq(loc.getFile()), + when(storage.prepareVolume(eq(datanode), eq(loc), anyListOf(NamespaceInfo.class))) .thenReturn(builder); dataset.addVolume(loc, nsInfos); + LOG.info("expectedVolumes " + i + " is " + + new File(pathUri).getAbsolutePath()); } assertEquals(totalVolumes, getNumVolumes()); @@ -215,7 +222,9 @@ public class TestFsDatasetImpl { try (FsDatasetSpi.FsVolumeReferences volumes = dataset.getFsVolumeReferences()) { for (int i = 0; i < numNewVolumes; i++) { - actualVolumes.add(volumes.get(numExistingVolumes + i).getBasePath()); + String volumeName = volumes.get(numExistingVolumes + i).toString(); + actualVolumes.add(volumeName); + LOG.info("actualVolume " + i + " is " + volumeName); } } assertEquals(actualVolumes.size(), expectedVolumes.size()); @@ -262,9 +271,18 @@ public class TestFsDatasetImpl { final String[] dataDirs = conf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY).split(","); final String volumePathToRemove = dataDirs[0]; - Set volumesToRemove = new HashSet<>(); - volumesToRemove.add(StorageLocation.parse(volumePathToRemove).getFile()); - + Set volumesToRemove = new HashSet<>(); + volumesToRemove.add(StorageLocation.parse(volumePathToRemove)); + + FsVolumeReferences volReferences = dataset.getFsVolumeReferences(); + FsVolumeImpl volumeToRemove = null; + for (FsVolumeSpi vol: volReferences) { + if (vol.getStorageLocation().equals(volumesToRemove.iterator().next())) { + volumeToRemove = (FsVolumeImpl) vol; + } + } + assertTrue(volumeToRemove != null); + volReferences.close(); dataset.removeVolumes(volumesToRemove, true); int expectedNumVolumes = dataDirs.length - 1; assertEquals("The volume has been removed from the volumeList.", @@ -273,7 +291,7 @@ public class TestFsDatasetImpl { expectedNumVolumes, dataset.storageMap.size()); try { - dataset.asyncDiskService.execute(volumesToRemove.iterator().next(), + dataset.asyncDiskService.execute(volumeToRemove, new Runnable() { @Override public void run() {} @@ -281,7 +299,7 @@ public class TestFsDatasetImpl { fail("Expect RuntimeException: the volume has been removed from the " + "AsyncDiskService."); } catch (RuntimeException e) { - GenericTestUtils.assertExceptionContains("Cannot find root", e); + GenericTestUtils.assertExceptionContains("Cannot find volume", e); } int totalNumReplicas = 0; @@ -306,7 +324,7 @@ public class TestFsDatasetImpl { Storage.StorageDirectory sd = createStorageDirectory(new File(newVolumePath)); DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd); - when(storage.prepareVolume(eq(datanode), eq(loc.getFile()), + when(storage.prepareVolume(eq(datanode), eq(loc), anyListOf(NamespaceInfo.class))) .thenReturn(builder); @@ -315,8 +333,8 @@ public class TestFsDatasetImpl { when(storage.getNumStorageDirs()).thenReturn(numExistingVolumes + 1); when(storage.getStorageDir(numExistingVolumes)).thenReturn(sd); - Set volumesToRemove = new HashSet<>(); - volumesToRemove.add(loc.getFile()); + Set volumesToRemove = new HashSet<>(); + volumesToRemove.add(loc); dataset.removeVolumes(volumesToRemove, true); assertEquals(numExistingVolumes, getNumVolumes()); } @@ -336,7 +354,8 @@ public class TestFsDatasetImpl { for (int i = 0; i < NUM_VOLUMES; i++) { FsVolumeImpl volume = mock(FsVolumeImpl.class); oldVolumes.add(volume); - when(volume.getBasePath()).thenReturn("data" + i); + when(volume.getStorageLocation()).thenReturn( + StorageLocation.parse(new File("data" + i).toURI().toString())); when(volume.checkClosed()).thenReturn(true); FsVolumeReference ref = mock(FsVolumeReference.class); when(ref.getVolume()).thenReturn(volume); @@ -348,13 +367,16 @@ public class TestFsDatasetImpl { final FsVolumeImpl newVolume = mock(FsVolumeImpl.class); final FsVolumeReference newRef = mock(FsVolumeReference.class); when(newRef.getVolume()).thenReturn(newVolume); - when(newVolume.getBasePath()).thenReturn("data4"); + when(newVolume.getStorageLocation()).thenReturn( + StorageLocation.parse(new File("data4").toURI().toString())); FsVolumeImpl blockedVolume = volumeList.getVolumes().get(1); doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - volumeList.removeVolume(new File("data4"), false); + volumeList.removeVolume( + StorageLocation.parse((new File("data4")).toURI().toString()), + false); volumeList.addVolume(newRef); return null; } @@ -386,7 +408,8 @@ public class TestFsDatasetImpl { File badDir = new File(BASE_DIR, "bad"); badDir.mkdirs(); doReturn(mockVolume).when(spyDataset) - .createFsVolume(anyString(), any(File.class), any(StorageType.class)); + .createFsVolume(anyString(), any(StorageDirectory.class), + any(StorageLocation.class)); doThrow(new IOException("Failed to getVolumeMap()")) .when(mockVolume).getVolumeMap( anyString(), @@ -396,7 +419,8 @@ public class TestFsDatasetImpl { Storage.StorageDirectory sd = createStorageDirectory(badDir); sd.lock(); DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd); - when(storage.prepareVolume(eq(datanode), eq(badDir.getAbsoluteFile()), + when(storage.prepareVolume(eq(datanode), + eq(StorageLocation.parse(badDir.toURI().toString())), Matchers.>any())) .thenReturn(builder); @@ -540,7 +564,7 @@ public class TestFsDatasetImpl { DataStorage.VolumeBuilder builder = new DataStorage.VolumeBuilder(storage, sd); when( - storage.prepareVolume(eq(datanode), eq(loc.getFile()), + storage.prepareVolume(eq(datanode), eq(loc), anyListOf(NamespaceInfo.class))).thenReturn(builder); String cacheFilePath = @@ -584,7 +608,7 @@ public class TestFsDatasetImpl { return dfsUsed; } - @Test(timeout = 30000) + @Test(timeout = 60000) public void testRemoveVolumeBeingWritten() throws Exception { // Will write and remove on dn0. final ExtendedBlock eb = new ExtendedBlock(BLOCK_POOL_IDS[0], 0); @@ -636,10 +660,9 @@ public class TestFsDatasetImpl { class VolRemoveThread extends Thread { public void run() { - Set volumesToRemove = new HashSet<>(); + Set volumesToRemove = new HashSet<>(); try { - volumesToRemove.add(StorageLocation.parse( - dataset.getVolume(eb).getBasePath()).getFile()); + volumesToRemove.add(dataset.getVolume(eb).getStorageLocation()); } catch (Exception e) { LOG.info("Problem preparing volumes to remove: ", e); Assert.fail("Exception in remove volume thread, check log for " + http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java index 3d4c38c..6eff300 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java @@ -22,7 +22,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.datanode.BlockScanner; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; @@ -71,8 +73,13 @@ public class TestFsVolumeList { for (int i = 0; i < 3; i++) { File curDir = new File(baseDir, "nextvolume-" + i); curDir.mkdirs(); - FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", curDir, - conf, StorageType.DEFAULT); + FsVolumeImpl volume = new FsVolumeImplBuilder() + .setConf(conf) + .setDataset(dataset) + .setStorageID("storage-id") + .setStorageDirectory( + new StorageDirectory(StorageLocation.parse(curDir.getPath()))) + .build(); volume.setCapacityForTesting(1024 * 1024 * 1024); volumes.add(volume); volumeList.addVolume(volume.obtainReference()); @@ -109,8 +116,13 @@ public class TestFsVolumeList { for (int i = 0; i < 3; i++) { File curDir = new File(baseDir, "volume-" + i); curDir.mkdirs(); - FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", curDir, - conf, StorageType.DEFAULT); + FsVolumeImpl volume = new FsVolumeImplBuilder() + .setConf(conf) + .setDataset(dataset) + .setStorageID("storage-id") + .setStorageDirectory( + new StorageDirectory(StorageLocation.parse(curDir.getPath()))) + .build(); volumes.add(volume); volumeList.addVolume(volume.obtainReference()); } @@ -139,8 +151,13 @@ public class TestFsVolumeList { Collections.emptyList(), null, blockChooser); File volDir = new File(baseDir, "volume-0"); volDir.mkdirs(); - FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", volDir, - conf, StorageType.DEFAULT); + FsVolumeImpl volume = new FsVolumeImplBuilder() + .setConf(conf) + .setDataset(dataset) + .setStorageID("storage-id") + .setStorageDirectory( + new StorageDirectory(StorageLocation.parse(volDir.getPath()))) + .build(); FsVolumeReference ref = volume.obtainReference(); volumeList.addVolume(ref); assertNull(ref.getVolume()); @@ -155,8 +172,13 @@ public class TestFsVolumeList { volDir.mkdirs(); // when storage type reserved is not configured,should consider // dfs.datanode.du.reserved. - FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", volDir, conf, - StorageType.RAM_DISK); + FsVolumeImpl volume = new FsVolumeImplBuilder().setDataset(dataset) + .setStorageDirectory( + new StorageDirectory( + StorageLocation.parse("[RAM_DISK]"+volDir.getPath()))) + .setStorageID("storage-id") + .setConf(conf) + .build(); assertEquals("", 100L, volume.getReserved()); // when storage type reserved is configured. conf.setLong( @@ -165,17 +187,37 @@ public class TestFsVolumeList { conf.setLong( DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY + "." + StringUtils.toLowerCase(StorageType.SSD.toString()), 2L); - FsVolumeImpl volume1 = new FsVolumeImpl(dataset, "storage-id", volDir, - conf, StorageType.RAM_DISK); + FsVolumeImpl volume1 = new FsVolumeImplBuilder().setDataset(dataset) + .setStorageDirectory( + new StorageDirectory( + StorageLocation.parse("[RAM_DISK]"+volDir.getPath()))) + .setStorageID("storage-id") + .setConf(conf) + .build(); assertEquals("", 1L, volume1.getReserved()); - FsVolumeImpl volume2 = new FsVolumeImpl(dataset, "storage-id", volDir, - conf, StorageType.SSD); + FsVolumeImpl volume2 = new FsVolumeImplBuilder().setDataset(dataset) + .setStorageDirectory( + new StorageDirectory( + StorageLocation.parse("[SSD]"+volDir.getPath()))) + .setStorageID("storage-id") + .setConf(conf) + .build(); assertEquals("", 2L, volume2.getReserved()); - FsVolumeImpl volume3 = new FsVolumeImpl(dataset, "storage-id", volDir, - conf, StorageType.DISK); + FsVolumeImpl volume3 = new FsVolumeImplBuilder().setDataset(dataset) + .setStorageDirectory( + new StorageDirectory( + StorageLocation.parse("[DISK]"+volDir.getPath()))) + .setStorageID("storage-id") + .setConf(conf) + .build(); assertEquals("", 100L, volume3.getReserved()); - FsVolumeImpl volume4 = new FsVolumeImpl(dataset, "storage-id", volDir, - conf, StorageType.DEFAULT); + FsVolumeImpl volume4 = new FsVolumeImplBuilder().setDataset(dataset) + .setStorageDirectory( + new StorageDirectory( + StorageLocation.parse(volDir.getPath()))) + .setStorageID("storage-id") + .setConf(conf) + .build(); assertEquals("", 100L, volume4.getReserved()); } @@ -197,8 +239,13 @@ public class TestFsVolumeList { long actualNonDfsUsage = 300L; long reservedForReplicas = 50L; conf.setLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, duReserved); - FsVolumeImpl volume = new FsVolumeImpl(dataset, "storage-id", volDir, conf, - StorageType.DEFAULT); + FsVolumeImpl volume = new FsVolumeImplBuilder().setDataset(dataset) + .setStorageDirectory( + new StorageDirectory( + StorageLocation.parse(volDir.getPath()))) + .setStorageID("storage-id") + .setConf(conf) + .build(); FsVolumeImpl spyVolume = Mockito.spy(volume); // Set Capacity for testing long testCapacity = diskCapacity - duReserved; http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b12662/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java index 794a887..7df0333 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java @@ -331,8 +331,8 @@ public class TestDiskBalancerWithMockMover { .getFsVolumeReferences(); nodeID = dataNode.getDatanodeUuid(); - sourceName = references.get(0).getBasePath(); - destName = references.get(1).getBasePath(); + sourceName = references.get(0).getBaseURI().getPath(); + destName = references.get(1).getBaseURI().getPath(); sourceUUID = references.get(0).getStorageID(); destUUID = references.get(1).getStorageID(); references.close(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org