From common-commits-return-77907-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Wed Jan 24 08:32:20 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id A660B18076D for ; Wed, 24 Jan 2018 08:32:20 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 95FC7160C3C; Wed, 24 Jan 2018 07:32:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 96839160C4E for ; Wed, 24 Jan 2018 08:32:18 +0100 (CET) Received: (qmail 81810 invoked by uid 500); 24 Jan 2018 07:32:05 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 77318 invoked by uid 99); 24 Jan 2018 07:32:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Jan 2018 07:32:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 79E9FF4E09; Wed, 24 Jan 2018 07:31:57 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rakeshr@apache.org To: common-commits@hadoop.apache.org Date: Wed, 24 Jan 2018 07:32:44 -0000 Message-Id: <4709e8c997784dd9bd0bb126ab782225@git.apache.org> In-Reply-To: <7dcf1007d414402a9c2fee91c25d7b1f@git.apache.org> References: <7dcf1007d414402a9c2fee91c25d7b1f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [49/50] [abbrv] hadoop git commit: HDFS-13025. [SPS]: Implement a mechanism to scan the files for external SPS. Contributed by Uma Maheswara Rao G. HDFS-13025. [SPS]: Implement a mechanism to scan the files for external SPS. Contributed by Uma Maheswara Rao G. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6980058e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6980058e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6980058e Branch: refs/heads/HDFS-10285 Commit: 6980058e1b44e2200301d6254eeb98f20ba91b14 Parents: 3e280b9 Author: Rakesh Radhakrishnan Authored: Tue Jan 23 20:09:26 2018 +0530 Committer: Rakesh Radhakrishnan Committed: Wed Jan 24 12:27:42 2018 +0530 ---------------------------------------------------------------------- .../sps/BlockStorageMovementNeeded.java | 70 +++- .../hdfs/server/namenode/sps/Context.java | 8 + .../IntraSPSNameNodeBlockMoveTaskHandler.java | 2 + .../namenode/sps/IntraSPSNameNodeContext.java | 7 + .../sps/IntraSPSNameNodeFileIdCollector.java | 6 +- .../hdfs/server/namenode/sps/SPSService.java | 10 +- .../namenode/sps/StoragePolicySatisfier.java | 8 +- .../server/sps/ExternalSPSFileIDCollector.java | 156 +++++++++ .../hadoop/hdfs/server/sps/package-info.java | 28 ++ .../sps/TestStoragePolicySatisfier.java | 323 ++++++++++--------- .../sps/TestExternalStoragePolicySatisfier.java | 108 +++++++ 11 files changed, 556 insertions(+), 170 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6980058e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java index 39a0051..b141502 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementNeeded.java @@ -97,23 +97,53 @@ public class BlockStorageMovementNeeded { } /** - * Add the itemInfo to tracking list for which storage movement - * expected if necessary. + * Add the itemInfo list to tracking list for which storage movement expected + * if necessary. + * * @param startId - * - start id + * - start id * @param itemInfoList - * - List of child in the directory + * - List of child in the directory + * @param scanCompleted + * -Indicates whether the start id directory has no more elements to + * scan. */ @VisibleForTesting - public synchronized void addAll(long startId, - List itemInfoList, boolean scanCompleted) { + public synchronized void addAll(long startId, List itemInfoList, + boolean scanCompleted) { storageMovementNeeded.addAll(itemInfoList); + updatePendingDirScanStats(startId, itemInfoList.size(), scanCompleted); + } + + /** + * Add the itemInfo to tracking list for which storage movement expected if + * necessary. + * + * @param itemInfoList + * - List of child in the directory + * @param scanCompleted + * -Indicates whether the ItemInfo start id directory has no more + * elements to scan. + */ + @VisibleForTesting + public synchronized void add(ItemInfo itemInfo, boolean scanCompleted) { + storageMovementNeeded.add(itemInfo); + // This represents sps start id is file, so no need to update pending dir + // stats. + if (itemInfo.getStartId() == itemInfo.getFileId()) { + return; + } + updatePendingDirScanStats(itemInfo.getStartId(), 1, scanCompleted); + } + + private void updatePendingDirScanStats(long startId, int numScannedFiles, + boolean scanCompleted) { DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId); if (pendingWork == null) { pendingWork = new DirPendingWorkInfo(); pendingWorkForDirectory.put(startId, pendingWork); } - pendingWork.addPendingWorkCount(itemInfoList.size()); + pendingWork.addPendingWorkCount(numScannedFiles); if (scanCompleted) { pendingWork.markScanCompleted(); } @@ -250,13 +280,15 @@ public class BlockStorageMovementNeeded { @Override public void run() { - LOG.info("Starting FileInodeIdCollector!."); + LOG.info("Starting SPSPathIdProcessor!."); long lastStatusCleanTime = 0; + Long startINodeId = null; while (ctxt.isRunning()) { - LOG.info("Running FileInodeIdCollector!."); try { if (!ctxt.isInSafeMode()) { - Long startINodeId = ctxt.getNextSPSPathId(); + if (startINodeId == null) { + startINodeId = ctxt.getNextSPSPathId(); + } // else same id will be retried if (startINodeId == null) { // Waiting for SPS path Thread.sleep(3000); @@ -281,9 +313,18 @@ public class BlockStorageMovementNeeded { lastStatusCleanTime = Time.monotonicNow(); cleanSpsStatus(); } + startINodeId = null; // Current inode id successfully scanned. } } catch (Throwable t) { - LOG.warn("Exception while loading inodes to satisfy the policy", t); + String reClass = t.getClass().getName(); + if (InterruptedException.class.getName().equals(reClass)) { + LOG.info("SPSPathIdProcessor thread is interrupted. Stopping.."); + Thread.currentThread().interrupt(); + break; + } + LOG.warn("Exception while scanning file inodes to satisfy the policy", + t); + // TODO: may be we should retry the current inode id? } } } @@ -426,4 +467,11 @@ public class BlockStorageMovementNeeded { public static long getStatusClearanceElapsedTimeMs() { return statusClearanceElapsedTimeMs; } + + public void markScanCompletedForDir(Long inodeId) { + DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(inodeId); + if (pendingWork != null) { + pendingWork.markScanCompleted(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6980058e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java index b7053b9..f103dfe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java @@ -167,4 +167,12 @@ public interface Context { */ void removeAllSPSPathIds(); + /** + * Gets the file path for a given inode id. + * + * @param inodeId + * - path inode id. + */ + String getFilePath(Long inodeId); + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6980058e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java index 1da4af9..b27e8c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeBlockMoveTaskHandler.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode.sps; import java.io.IOException; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.namenode.Namesystem; @@ -29,6 +30,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockM * This class handles the internal SPS block movements. This will assign block * movement tasks to target datanode descriptors. */ +@InterfaceAudience.Private public class IntraSPSNameNodeBlockMoveTaskHandler implements BlockMoveTaskHandler { http://git-wip-us.apache.org/repos/asf/hadoop/blob/6980058e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java index cef26ed..aed684a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SAT import java.io.IOException; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.UnresolvedLinkException; @@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory; * are expecting to change its storages and assigning the block storage * movements to satisfy the storage policy. */ +@InterfaceAudience.Private public class IntraSPSNameNodeContext implements Context { private static final Logger LOG = LoggerFactory .getLogger(IntraSPSNameNodeContext.class); @@ -195,4 +197,9 @@ public class IntraSPSNameNodeContext implements Context { public void removeAllSPSPathIds() { blockManager.removeAllSPSPathIds(); } + + @Override + public String getFilePath(Long inodeId) { + return namesystem.getFilePath(inodeId); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6980058e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java index c6834c1..f7cd754 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.server.namenode.FSDirectory; @@ -32,6 +33,7 @@ import org.apache.hadoop.hdfs.server.namenode.INode; * A specific implementation for scanning the directory with Namenode internal * Inode structure and collects the file ids under the given directory ID. */ +@InterfaceAudience.Private public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser implements FileIdCollector { private int maxQueueLimitToScan; @@ -131,12 +133,12 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser } else { readLock(); - // NOTE: this lock will not be held until full directory scanning. It is + // NOTE: this lock will not be held for full directory scanning. It is // basically a sliced locking. Once it collects a batch size( at max the // size of maxQueueLimitToScan (default 1000)) file ids, then it will // unlock and submits the current batch to SPSService. Once // service.processingQueueSize() shows empty slots, then lock will be - // resumed and scan also will be resumed. This logic was re-used from + // re-acquired and scan will be resumed. This logic was re-used from // EDEK feature. try { traverseDir(startInode.asDirectory(), startINodeId, http://git-wip-us.apache.org/repos/asf/hadoop/blob/6980058e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java index 6d85ea6..d74e391 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java @@ -80,7 +80,7 @@ public interface SPSService { * * @param itemInfo */ - void addFileIdToProcess(ItemInfo itemInfo); + void addFileIdToProcess(ItemInfo itemInfo, boolean scanCompleted); /** * Adds all the Item information(file id etc) to processing queue. @@ -104,4 +104,12 @@ public interface SPSService { * @return the configuration. */ Configuration getConf(); + + /** + * Marks the scanning of directory if finished. + * + * @param inodeId + * - directory inode id. + */ + void markScanCompletedForPath(Long inodeId); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6980058e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java index 28c1372..aafdc65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java @@ -563,7 +563,6 @@ public class StoragePolicySatisfier implements SPSService, Runnable { chosenTarget.storageType, blockMovingInfos); } expected.remove(chosenTarget.storageType); - // TODO: We can increment scheduled block count for this node? } } // To avoid choosing this excludeNodes as targets later @@ -924,7 +923,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable { } @Override - public void addFileIdToProcess(ItemInfo trackInfo) { + public void addFileIdToProcess(ItemInfo trackInfo, boolean scanCompleted) { storageMovementNeeded.add(trackInfo); } @@ -948,4 +947,9 @@ public class StoragePolicySatisfier implements SPSService, Runnable { public BlockStorageMovementNeeded getStorageMovementQueue() { return storageMovementNeeded; } + + @Override + public void markScanCompletedForPath(Long inodeId) { + getStorageMovementQueue().markScanCompletedForDir(inodeId); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6980058e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java new file mode 100644 index 0000000..597a7d3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java @@ -0,0 +1,156 @@ +package org.apache.hadoop.hdfs.server.sps; +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.namenode.sps.Context; +import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector; +import org.apache.hadoop.hdfs.server.namenode.sps.ItemInfo; +import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is to scan the paths recursively. If file is directory, then it + * will scan for files recursively. If the file is non directory, then it will + * just submit the same file to process. + */ +@InterfaceAudience.Private +public class ExternalSPSFileIDCollector implements FileIdCollector { + public static final Logger LOG = + LoggerFactory.getLogger(ExternalSPSFileIDCollector.class); + private Context cxt; + private DistributedFileSystem dfs; + private SPSService service; + private int maxQueueLimitToScan; + + public ExternalSPSFileIDCollector(Context cxt, SPSService service, + int batchSize) { + this.cxt = cxt; + this.service = service; + this.maxQueueLimitToScan = service.getConf().getInt( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT); + try { + // TODO: probably we could get this dfs from external context? but this is + // too specific to external. + dfs = getFS(service.getConf()); + } catch (IOException e) { + LOG.error("Unable to get the filesystem. Make sure Namenode running and " + + "configured namenode address is correct.", e); + } + } + + private DistributedFileSystem getFS(Configuration conf) throws IOException { + return (DistributedFileSystem) FileSystem + .get(FileSystem.getDefaultUri(conf), conf); + } + + /** + * Recursively scan the given path and add the file info to SPS service for + * processing. + */ + private void processPath(long startID, String fullPath) { + for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) { + final DirectoryListing children; + try { + children = dfs.getClient().listPaths(fullPath, lastReturnedName, false); + } catch (IOException e) { + LOG.warn("Failed to list directory " + fullPath + + ". Ignore the directory and continue.", e); + return; + } + if (children == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("The scanning start dir/sub dir " + fullPath + + " does not have childrens."); + } + return; + } + + for (HdfsFileStatus child : children.getPartialListing()) { + if (child.isFile()) { + service.addFileIdToProcess(new ItemInfo(startID, child.getFileId()), + false); + checkProcessingQueuesFree(); + } else { + String fullPathStr = child.getFullName(fullPath); + if (child.isDirectory()) { + if (!fullPathStr.endsWith(Path.SEPARATOR)) { + fullPathStr = fullPathStr + Path.SEPARATOR; + } + processPath(startID, fullPathStr); + } + } + } + + if (children.hasMore()) { + lastReturnedName = children.getLastName(); + } else { + return; + } + } + } + + private void checkProcessingQueuesFree() { + int remainingCapacity = remainingCapacity(); + // wait for queue to be free + while (remainingCapacity <= 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Waiting for storageMovementNeeded queue to be free!"); + } + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + remainingCapacity = remainingCapacity(); + } + } + + /** + * Returns queue remaining capacity. + */ + public int remainingCapacity() { + int size = service.processingQueueSize(); + if (size >= maxQueueLimitToScan) { + return 0; + } else { + return (maxQueueLimitToScan - size); + } + } + + @Override + public void scanAndCollectFileIds(Long inodeId) throws IOException { + if (dfs == null) { + dfs = getFS(service.getConf()); + } + processPath(inodeId, cxt.getFilePath(inodeId)); + service.markScanCompletedForPath(inodeId); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6980058e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/package-info.java new file mode 100644 index 0000000..f705df2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/package-info.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package provides a mechanism for satisfying the storage policy of a + * path. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.hdfs.server.sps; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6980058e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java index 9354044..e0bf410 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java @@ -71,6 +71,7 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils.LogCapturer; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,18 +94,41 @@ public class TestStoragePolicySatisfier { private static final String COLD = "COLD"; private static final Logger LOG = LoggerFactory.getLogger(TestStoragePolicySatisfier.class); - private final Configuration config = new HdfsConfiguration(); + private Configuration config = null; private StorageType[][] allDiskTypes = new StorageType[][]{{StorageType.DISK, StorageType.DISK}, {StorageType.DISK, StorageType.DISK}, {StorageType.DISK, StorageType.DISK}}; private MiniDFSCluster hdfsCluster = null; - final private int numOfDatanodes = 3; - final private int storagesPerDatanode = 2; - final private long capacity = 2 * 256 * 1024 * 1024; - final private String file = "/testMoveWhenStoragePolicyNotSatisfying"; private DistributedFileSystem dfs = null; - private static final int DEFAULT_BLOCK_SIZE = 1024; + public static final int NUM_OF_DATANODES = 3; + public static final int STORAGES_PER_DATANODE = 2; + public static final long CAPACITY = 2 * 256 * 1024 * 1024; + public static final String FILE = "/testMoveWhenStoragePolicyNotSatisfying"; + public static final int DEFAULT_BLOCK_SIZE = 1024; + + /** + * Sets hdfs cluster. + */ + public void setCluster(MiniDFSCluster cluster) { + this.hdfsCluster = cluster; + } + + /** + * @return conf. + */ + public Configuration getConf() { + return this.config; + } + + /** + * Gets distributed file system. + * + * @throws IOException + */ + public void getFS() throws IOException { + this.dfs = hdfsCluster.getFileSystem(); + } @After public void shutdownCluster() { @@ -113,14 +137,19 @@ public class TestStoragePolicySatisfier { } } - private void createCluster() throws IOException { + public void createCluster() throws IOException { config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, true); - hdfsCluster = startCluster(config, allDiskTypes, numOfDatanodes, - storagesPerDatanode, capacity); - dfs = hdfsCluster.getFileSystem(); - writeContent(file); + hdfsCluster = startCluster(config, allDiskTypes, NUM_OF_DATANODES, + STORAGES_PER_DATANODE, CAPACITY); + getFS(); + writeContent(FILE); + } + + @Before + public void setUp() { + config = new HdfsConfiguration(); } @Test(timeout = 300000) @@ -137,19 +166,19 @@ public class TestStoragePolicySatisfier { private void doTestWhenStoragePolicySetToCOLD() throws Exception { // Change policy to COLD - dfs.setStoragePolicy(new Path(file), COLD); + dfs.setStoragePolicy(new Path(FILE), COLD); StorageType[][] newtypes = new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}, {StorageType.ARCHIVE, StorageType.ARCHIVE}, {StorageType.ARCHIVE, StorageType.ARCHIVE}}; - startAdditionalDNs(config, 3, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); + startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes, + STORAGES_PER_DATANODE, CAPACITY, hdfsCluster); hdfsCluster.triggerHeartbeats(); - dfs.satisfyStoragePolicy(new Path(file)); + dfs.satisfyStoragePolicy(new Path(FILE)); // Wait till namenode notified about the block location details - DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 35000, + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 3, 35000, dfs); } @@ -159,7 +188,7 @@ public class TestStoragePolicySatisfier { try { createCluster(); // Change policy to ALL_SSD - dfs.setStoragePolicy(new Path(file), "ALL_SSD"); + dfs.setStoragePolicy(new Path(FILE), "ALL_SSD"); StorageType[][] newtypes = new StorageType[][]{{StorageType.SSD, StorageType.DISK}, @@ -168,14 +197,13 @@ public class TestStoragePolicySatisfier { // Making sure SDD based nodes added to cluster. Adding SSD based // datanodes. - startAdditionalDNs(config, 3, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); - dfs.satisfyStoragePolicy(new Path(file)); + startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes, + STORAGES_PER_DATANODE, CAPACITY, hdfsCluster); + dfs.satisfyStoragePolicy(new Path(FILE)); hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier Identified that block to move to SSD // areas - DFSTestUtil.waitExpectedStorageType( - file, StorageType.SSD, 3, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 3, 30000, dfs); } finally { shutdownCluster(); } @@ -187,23 +215,22 @@ public class TestStoragePolicySatisfier { try { createCluster(); // Change policy to ONE_SSD - dfs.setStoragePolicy(new Path(file), ONE_SSD); + dfs.setStoragePolicy(new Path(FILE), ONE_SSD); StorageType[][] newtypes = new StorageType[][]{{StorageType.SSD, StorageType.DISK}}; // Making sure SDD based nodes added to cluster. Adding SSD based // datanodes. - startAdditionalDNs(config, 1, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); - dfs.satisfyStoragePolicy(new Path(file)); + startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes, + STORAGES_PER_DATANODE, CAPACITY, hdfsCluster); + dfs.satisfyStoragePolicy(new Path(FILE)); hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier Identified that block to move to SSD // areas - DFSTestUtil.waitExpectedStorageType( - file, StorageType.SSD, 1, 30000, dfs); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.DISK, 2, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000, + dfs); } finally { shutdownCluster(); } @@ -218,23 +245,22 @@ public class TestStoragePolicySatisfier { try { createCluster(); // Change policy to ONE_SSD - dfs.setStoragePolicy(new Path(file), ONE_SSD); + dfs.setStoragePolicy(new Path(FILE), ONE_SSD); StorageType[][] newtypes = new StorageType[][]{{StorageType.SSD, StorageType.DISK}}; // Making sure SDD based nodes added to cluster. Adding SSD based // datanodes. - startAdditionalDNs(config, 1, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); - dfs.satisfyStoragePolicy(new Path(file)); + startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes, + STORAGES_PER_DATANODE, CAPACITY, hdfsCluster); + dfs.satisfyStoragePolicy(new Path(FILE)); hdfsCluster.triggerHeartbeats(); // Wait till the block is moved to SSD areas - DFSTestUtil.waitExpectedStorageType( - file, StorageType.SSD, 1, 30000, dfs); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.DISK, 2, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000, + dfs); waitForBlocksMovementAttemptReport(1, 30000); } finally { @@ -251,7 +277,7 @@ public class TestStoragePolicySatisfier { try { createCluster(); List files = new ArrayList<>(); - files.add(file); + files.add(FILE); // Creates 4 more files. Send all of them for satisfying the storage // policy together. @@ -271,8 +297,8 @@ public class TestStoragePolicySatisfier { // Making sure SDD based nodes added to cluster. Adding SSD based // datanodes. - startAdditionalDNs(config, 1, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); + startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes, + STORAGES_PER_DATANODE, CAPACITY, hdfsCluster); hdfsCluster.triggerHeartbeats(); for (String fileName : files) { @@ -300,21 +326,21 @@ public class TestStoragePolicySatisfier { HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(config), config); // Change policy to COLD - dfs.setStoragePolicy(new Path(file), COLD); + dfs.setStoragePolicy(new Path(FILE), COLD); StorageType[][] newtypes = new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE}, {StorageType.DISK, StorageType.ARCHIVE}, {StorageType.DISK, StorageType.ARCHIVE}}; - startAdditionalDNs(config, 3, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); + startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes, + STORAGES_PER_DATANODE, CAPACITY, hdfsCluster); - hdfsAdmin.satisfyStoragePolicy(new Path(file)); + hdfsAdmin.satisfyStoragePolicy(new Path(FILE)); hdfsCluster.triggerHeartbeats(); // Wait till namenode notified about the block location details - DFSTestUtil.waitExpectedStorageType( - file, StorageType.ARCHIVE, 3, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 3, 30000, + dfs); } finally { shutdownCluster(); } @@ -344,8 +370,8 @@ public class TestStoragePolicySatisfier { StorageType[][] newtypes = new StorageType[][]{{StorageType.SSD, StorageType.DISK}}; - startAdditionalDNs(config, 1, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); + startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes, + STORAGES_PER_DATANODE, CAPACITY, hdfsCluster); hdfsAdmin.satisfyStoragePolicy(new Path(subDir)); @@ -384,11 +410,11 @@ public class TestStoragePolicySatisfier { new HdfsAdmin(FileSystem.getDefaultUri(config), config); try { - hdfsAdmin.satisfyStoragePolicy(new Path(file)); + hdfsAdmin.satisfyStoragePolicy(new Path(FILE)); Assert.fail(String.format( "Should failed to satisfy storage policy " + "for %s since %s is set to false.", - file, DFS_STORAGE_POLICY_ENABLED_KEY)); + FILE, DFS_STORAGE_POLICY_ENABLED_KEY)); } catch (IOException e) { Assert.assertTrue(e.getMessage().contains(String.format( "Failed to satisfy storage policy since %s is set to false.", @@ -409,17 +435,17 @@ public class TestStoragePolicySatisfier { } try { - hdfsAdmin.satisfyStoragePolicy(new Path(file)); - hdfsAdmin.satisfyStoragePolicy(new Path(file)); - Assert.fail(String.format( - "Should failed to satisfy storage policy " - + "for %s ,since it has been " - + "added to satisfy movement queue.", file)); + hdfsAdmin.satisfyStoragePolicy(new Path(FILE)); + hdfsAdmin.satisfyStoragePolicy(new Path(FILE)); + Assert.fail(String.format("Should failed to satisfy storage policy " + + "for %s ,since it has been " + "added to satisfy movement queue.", + FILE)); } catch (IOException e) { GenericTestUtils.assertExceptionContains( String.format("Cannot request to call satisfy storage policy " + "on path %s, as this file/dir was already called for " - + "satisfying storage policy.", file), e); + + "satisfying storage policy.", FILE), + e); } } finally { shutdownCluster(); @@ -446,23 +472,23 @@ public class TestStoragePolicySatisfier { try { createCluster(); // Change policy to COLD - dfs.setStoragePolicy(new Path(file), COLD); + dfs.setStoragePolicy(new Path(FILE), COLD); StorageType[][] newtypes = new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}}; // Adding ARCHIVE based datanodes. - startAdditionalDNs(config, 1, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); + startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes, + STORAGES_PER_DATANODE, CAPACITY, hdfsCluster); - dfs.satisfyStoragePolicy(new Path(file)); + dfs.satisfyStoragePolicy(new Path(FILE)); hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier identified that block to move to // ARCHIVE area. - DFSTestUtil.waitExpectedStorageType( - file, StorageType.ARCHIVE, 1, 30000, dfs); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.DISK, 2, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 1, 30000, + dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000, + dfs); waitForBlocksMovementAttemptReport(1, 30000); } finally { @@ -489,22 +515,22 @@ public class TestStoragePolicySatisfier { try { createCluster(); // Change policy to COLD - dfs.setStoragePolicy(new Path(file), COLD); + dfs.setStoragePolicy(new Path(FILE), COLD); StorageType[][] newtypes = new StorageType[][]{{StorageType.DISK, StorageType.DISK}}; // Adding DISK based datanodes - startAdditionalDNs(config, 1, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); + startAdditionalDNs(config, 1, NUM_OF_DATANODES, newtypes, + STORAGES_PER_DATANODE, CAPACITY, hdfsCluster); - dfs.satisfyStoragePolicy(new Path(file)); + dfs.satisfyStoragePolicy(new Path(FILE)); hdfsCluster.triggerHeartbeats(); // No block movement will be scheduled as there is no target node // available with the required storage type. waitForAttemptedItems(1, 30000); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.DISK, 3, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000, + dfs); // Since there is no target node the item will get timed out and then // re-attempted. waitForAttemptedItems(1, 30000); @@ -628,8 +654,8 @@ public class TestStoragePolicySatisfier { {StorageType.ARCHIVE, StorageType.ARCHIVE}, {StorageType.ARCHIVE, StorageType.ARCHIVE}}; // Adding DISK based datanodes - startAdditionalDNs(config, 3, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); + startAdditionalDNs(config, 3, NUM_OF_DATANODES, newtypes, + STORAGES_PER_DATANODE, CAPACITY, hdfsCluster); dfs.satisfyStoragePolicy(new Path(file1)); hdfsCluster.triggerHeartbeats(); @@ -682,21 +708,21 @@ public class TestStoragePolicySatisfier { {StorageType.DISK, StorageType.DISK}, {StorageType.DISK, StorageType.ARCHIVE}}; hdfsCluster = startCluster(config, allDiskTypes, numOfDns, - storagesPerDatanode, capacity); + STORAGES_PER_DATANODE, CAPACITY); dfs = hdfsCluster.getFileSystem(); - writeContent(file, (short) 5); + writeContent(FILE, (short) 5); // Change policy to COLD - dfs.setStoragePolicy(new Path(file), COLD); + dfs.setStoragePolicy(new Path(FILE), COLD); - dfs.satisfyStoragePolicy(new Path(file)); + dfs.satisfyStoragePolicy(new Path(FILE)); hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier identified that block to move to // ARCHIVE area. - DFSTestUtil.waitExpectedStorageType( - file, StorageType.ARCHIVE, 2, 30000, dfs); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.DISK, 3, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 2, 30000, + dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000, + dfs); waitForBlocksMovementAttemptReport(1, 30000); } finally { @@ -720,20 +746,19 @@ public class TestStoragePolicySatisfier { config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, true); try { - hdfsCluster = startCluster(config, diskTypes, numOfDatanodes, - storagesPerDatanode, capacity); + hdfsCluster = startCluster(config, diskTypes, NUM_OF_DATANODES, + STORAGES_PER_DATANODE, CAPACITY); dfs = hdfsCluster.getFileSystem(); - writeContent(file); + writeContent(FILE); // Change policy to ONE_SSD - dfs.setStoragePolicy(new Path(file), ONE_SSD); + dfs.setStoragePolicy(new Path(FILE), ONE_SSD); - dfs.satisfyStoragePolicy(new Path(file)); + dfs.satisfyStoragePolicy(new Path(FILE)); hdfsCluster.triggerHeartbeats(); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.SSD, 1, 30000, dfs); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.DISK, 2, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000, + dfs); } finally { shutdownCluster(); @@ -760,19 +785,19 @@ public class TestStoragePolicySatisfier { true); try { hdfsCluster = startCluster(config, diskTypes, diskTypes.length, - storagesPerDatanode, capacity); + STORAGES_PER_DATANODE, CAPACITY); dfs = hdfsCluster.getFileSystem(); - writeContent(file); + writeContent(FILE); // Change policy to WARM - dfs.setStoragePolicy(new Path(file), "WARM"); - dfs.satisfyStoragePolicy(new Path(file)); + dfs.setStoragePolicy(new Path(FILE), "WARM"); + dfs.satisfyStoragePolicy(new Path(FILE)); hdfsCluster.triggerHeartbeats(); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.DISK, 1, 30000, dfs); - DFSTestUtil.waitExpectedStorageType( - file, StorageType.ARCHIVE, 2, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 1, 30000, + dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 2, 30000, + dfs); } finally { shutdownCluster(); } @@ -794,31 +819,31 @@ public class TestStoragePolicySatisfier { config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, true); hdfsCluster = startCluster(config, diskTypes, diskTypes.length, - storagesPerDatanode, capacity); + STORAGES_PER_DATANODE, CAPACITY); dfs = hdfsCluster.getFileSystem(); // 1. Write two replica on disk - DFSTestUtil.createFile(dfs, new Path(file), DEFAULT_BLOCK_SIZE, + DFSTestUtil.createFile(dfs, new Path(FILE), DEFAULT_BLOCK_SIZE, (short) 2, 0); // 2. Change policy to COLD, so third replica will be written to ARCHIVE. - dfs.setStoragePolicy(new Path(file), "COLD"); + dfs.setStoragePolicy(new Path(FILE), "COLD"); // 3.Change replication factor to 3. - dfs.setReplication(new Path(file), (short) 3); + dfs.setReplication(new Path(FILE), (short) 3); - DFSTestUtil - .waitExpectedStorageType(file, StorageType.DISK, 2, 30000, dfs); - DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 30000, + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 2, 30000, + dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.ARCHIVE, 1, 30000, dfs); // 4. Change policy to HOT, so we can move the all block to DISK. - dfs.setStoragePolicy(new Path(file), "HOT"); + dfs.setStoragePolicy(new Path(FILE), "HOT"); // 4. Satisfy the policy. - dfs.satisfyStoragePolicy(new Path(file)); + dfs.satisfyStoragePolicy(new Path(FILE)); // 5. Block should move successfully . - DFSTestUtil - .waitExpectedStorageType(file, StorageType.DISK, 3, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000, + dfs); } finally { shutdownCluster(); } @@ -840,13 +865,13 @@ public class TestStoragePolicySatisfier { true); long dnCapacity = 1024 * DEFAULT_BLOCK_SIZE + (2 * DEFAULT_BLOCK_SIZE - 1); try { - hdfsCluster = startCluster(config, diskTypes, numOfDatanodes, - storagesPerDatanode, dnCapacity); + hdfsCluster = startCluster(config, diskTypes, NUM_OF_DATANODES, + STORAGES_PER_DATANODE, dnCapacity); dfs = hdfsCluster.getFileSystem(); - writeContent(file); + writeContent(FILE); // Change policy to ONE_SSD - dfs.setStoragePolicy(new Path(file), ONE_SSD); + dfs.setStoragePolicy(new Path(FILE), ONE_SSD); Path filePath = new Path("/testChooseInSameDatanode"); final FSDataOutputStream out = dfs.create(filePath, false, 100, (short) 1, 2 * DEFAULT_BLOCK_SIZE); @@ -869,7 +894,7 @@ public class TestStoragePolicySatisfier { for (DataNode dataNode : dataNodes) { DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, true); } - dfs.satisfyStoragePolicy(new Path(file)); + dfs.satisfyStoragePolicy(new Path(FILE)); // Wait for items to be processed waitForAttemptedItems(1, 30000); @@ -887,9 +912,9 @@ public class TestStoragePolicySatisfier { } hdfsCluster.triggerHeartbeats(); - DFSTestUtil.waitExpectedStorageType(file, StorageType.DISK, 3, 30000, + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.DISK, 3, 30000, dfs); - DFSTestUtil.waitExpectedStorageType(file, StorageType.SSD, 0, 30000, dfs); + DFSTestUtil.waitExpectedStorageType(FILE, StorageType.SSD, 0, 30000, dfs); } finally { shutdownCluster(); } @@ -928,7 +953,7 @@ public class TestStoragePolicySatisfier { true); try { hdfsCluster = startCluster(config, diskTypes, diskTypes.length, - storagesPerDatanode, capacity); + STORAGES_PER_DATANODE, CAPACITY); dfs = hdfsCluster.getFileSystem(); dfs.enableErasureCodingPolicy( StripedFileTestUtil.getDefaultECPolicy().getName()); @@ -1029,8 +1054,7 @@ public class TestStoragePolicySatisfier { {StorageType.ARCHIVE, StorageType.DISK}, {StorageType.ARCHIVE, StorageType.DISK}, {StorageType.ARCHIVE, StorageType.DISK}}; - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) - .storageTypes(newtypes).build(); + cluster = startCluster(conf, newtypes, 3, 2, CAPACITY); cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); Path filePath = new Path("/zeroSizeFile"); @@ -1211,7 +1235,7 @@ public class TestStoragePolicySatisfier { config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, true); hdfsCluster = startCluster(config, diskTypes, diskTypes.length, - storagesPerDatanode, capacity); + STORAGES_PER_DATANODE, CAPACITY); dfs = hdfsCluster.getFileSystem(); createDirectoryTree(dfs); @@ -1245,7 +1269,7 @@ public class TestStoragePolicySatisfier { config.setInt(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 5); hdfsCluster = startCluster(config, diskTypes, diskTypes.length, - storagesPerDatanode, capacity); + STORAGES_PER_DATANODE, CAPACITY); dfs = hdfsCluster.getFileSystem(); createDirectoryTree(dfs); List files = getDFSListOfTree(); @@ -1284,7 +1308,7 @@ public class TestStoragePolicySatisfier { config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10); hdfsCluster = startCluster(config, diskTypes, diskTypes.length, - storagesPerDatanode, capacity); + STORAGES_PER_DATANODE, CAPACITY); dfs = hdfsCluster.getFileSystem(); createDirectoryTree(dfs); @@ -1312,8 +1336,7 @@ public class TestStoragePolicySatisfier { } }; - FileIdCollector fileIDCollector = - new IntraSPSNameNodeFileIdCollector(fsDir, sps); + FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt); sps.init(ctxt, fileIDCollector, null); sps.getStorageMovementQueue().activate(); @@ -1323,31 +1346,20 @@ public class TestStoragePolicySatisfier { //Wait for thread to reach U. Thread.sleep(1000); - dfs.delete(new Path("/root/D/L"), true); - // Remove 10 element and make queue free, So other traversing will start. - for (int i = 0; i < 10; i++) { - String path = expectedTraverseOrder.remove(0); - long trackId = sps.getStorageMovementQueue().get().getFileId(); - INode inode = fsDir.getInode(trackId); - assertTrue("Failed to traverse tree, expected " + path + " but got " - + inode.getFullPathName(), path.equals(inode.getFullPathName())); - } - //Wait to finish tree traverse - Thread.sleep(5000); - // Check other element traversed in order and R,S should not be added in - // queue which we already removed from expected list - for (String path : expectedTraverseOrder) { - long trackId = sps.getStorageMovementQueue().get().getFileId(); - INode inode = fsDir.getInode(trackId); - assertTrue("Failed to traverse tree, expected " + path + " but got " - + inode.getFullPathName(), path.equals(inode.getFullPathName())); - } + assertTraversal(expectedTraverseOrder, fsDir, sps); dfs.delete(new Path("/root"), true); } + public FileIdCollector createFileIdCollector(StoragePolicySatisfier sps, + Context ctxt) { + FileIdCollector fileIDCollector = new IntraSPSNameNodeFileIdCollector( + hdfsCluster.getNamesystem().getFSDirectory(), sps); + return fileIDCollector; + } + /** * Test traverse when root parent got deleted. * 1. Delete L when traversing Q @@ -1362,7 +1374,7 @@ public class TestStoragePolicySatisfier { config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); config.setInt(DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, 10); hdfsCluster = startCluster(config, diskTypes, diskTypes.length, - storagesPerDatanode, capacity); + STORAGES_PER_DATANODE, CAPACITY); dfs = hdfsCluster.getFileSystem(); createDirectoryTree(dfs); @@ -1378,7 +1390,6 @@ public class TestStoragePolicySatisfier { // Queue limit can control the traverse logic to wait for some free // entry in queue. After 10 files, traverse control will be on U. - // StoragePolicySatisfier sps = new StoragePolicySatisfier(config); StoragePolicySatisfier sps = new StoragePolicySatisfier(config); Context ctxt = new IntraSPSNameNodeContext(hdfsCluster.getNamesystem(), hdfsCluster.getNamesystem().getBlockManager(), sps) { @@ -1392,9 +1403,7 @@ public class TestStoragePolicySatisfier { return true; } }; - - FileIdCollector fileIDCollector = - new IntraSPSNameNodeFileIdCollector(fsDir, sps); + FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt); sps.init(ctxt, fileIDCollector, null); sps.getStorageMovementQueue().activate(); @@ -1407,6 +1416,13 @@ public class TestStoragePolicySatisfier { dfs.delete(new Path("/root/D/L"), true); + assertTraversal(expectedTraverseOrder, fsDir, sps); + dfs.delete(new Path("/root"), true); + } + + private void assertTraversal(List expectedTraverseOrder, + FSDirectory fsDir, StoragePolicySatisfier sps) + throws InterruptedException { // Remove 10 element and make queue free, So other traversing will start. for (int i = 0; i < 10; i++) { String path = expectedTraverseOrder.remove(0); @@ -1426,7 +1442,6 @@ public class TestStoragePolicySatisfier { assertTrue("Failed to traverse tree, expected " + path + " but got " + inode.getFullPathName(), path.equals(inode.getFullPathName())); } - dfs.delete(new Path("/root"), true); } /** @@ -1473,8 +1488,8 @@ public class TestStoragePolicySatisfier { StorageType[][] newtypes = new StorageType[][]{{StorageType.DISK, StorageType.SSD}, {StorageType.DISK, StorageType.SSD}}; - startAdditionalDNs(config, 2, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); + startAdditionalDNs(config, 2, NUM_OF_DATANODES, newtypes, + STORAGES_PER_DATANODE, CAPACITY, hdfsCluster); // increase replication factor to 4 for the first 10 files and thus // initiate replica tasks @@ -1772,7 +1787,7 @@ public class TestStoragePolicySatisfier { }, 100, timeout); } - private void writeContent(final String fileName) throws IOException { + public void writeContent(final String fileName) throws IOException { writeContent(fileName, (short) 3); } @@ -1805,7 +1820,7 @@ public class TestStoragePolicySatisfier { cluster.triggerHeartbeats(); } - private MiniDFSCluster startCluster(final Configuration conf, + public MiniDFSCluster startCluster(final Configuration conf, StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn, long nodeCapacity) throws IOException { long[][] capacities = new long[numberOfDatanodes][storagesPerDn]; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6980058e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java new file mode 100644 index 0000000..3ced34e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.sps; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.sps.Context; +import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector; +import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler; +import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext; +import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; +import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; +import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier; +import org.junit.Ignore; + +/** + * Tests the external sps service plugins. + */ +public class TestExternalStoragePolicySatisfier + extends TestStoragePolicySatisfier { + private StorageType[][] allDiskTypes = + new StorageType[][]{{StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}}; + + @Override + public void createCluster() throws IOException { + getConf().setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + getConf().setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, + true); + setCluster(startCluster(getConf(), allDiskTypes, NUM_OF_DATANODES, + STORAGES_PER_DATANODE, CAPACITY)); + getFS(); + writeContent(FILE); + } + + @Override + public MiniDFSCluster startCluster(final Configuration conf, + StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn, + long nodeCapacity) throws IOException { + long[][] capacities = new long[numberOfDatanodes][storagesPerDn]; + for (int i = 0; i < numberOfDatanodes; i++) { + for (int j = 0; j < storagesPerDn; j++) { + capacities[i][j] = nodeCapacity; + } + } + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numberOfDatanodes).storagesPerDatanode(storagesPerDn) + .storageTypes(storageTypes).storageCapacities(capacities).build(); + cluster.waitActive(); + if (conf.getBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, + false)) { + SPSService spsService = cluster.getNameNode().getNamesystem() + .getBlockManager().getSPSService(); + spsService.stopGracefully(); + + IntraSPSNameNodeContext context = new IntraSPSNameNodeContext( + cluster.getNameNode().getNamesystem(), + cluster.getNameNode().getNamesystem().getBlockManager(), cluster + .getNameNode().getNamesystem().getBlockManager().getSPSService()); + + spsService.init(context, + new ExternalSPSFileIDCollector(context, + cluster.getNameNode().getNamesystem().getBlockManager() + .getSPSService(), + 5), + new IntraSPSNameNodeBlockMoveTaskHandler( + cluster.getNameNode().getNamesystem().getBlockManager(), + cluster.getNameNode().getNamesystem())); + spsService.start(true); + } + return cluster; + } + + @Override + public FileIdCollector createFileIdCollector(StoragePolicySatisfier sps, + Context ctxt) { + return new ExternalSPSFileIDCollector(ctxt, sps, 5); + } + + /** + * This test need not run as external scan is not a batch based scanning right + * now. + */ + @Ignore("ExternalFileIdCollector is not batch based right now." + + " So, ignoring it.") + public void testBatchProcessingForSPSDirectory() throws Exception { + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org