From common-commits-return-85599-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Mon Jul 16 11:23:41 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id D763F18076D for ; Mon, 16 Jul 2018 11:23:39 +0200 (CEST) Received: (qmail 69811 invoked by uid 500); 16 Jul 2018 09:23:35 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 69274 invoked by uid 99); 16 Jul 2018 09:23:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Jul 2018 09:23:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 24D8CE10F5; Mon, 16 Jul 2018 09:23:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rakeshr@apache.org To: common-commits@hadoop.apache.org Date: Mon, 16 Jul 2018 09:23:49 -0000 Message-Id: <8b9c441be64c44c8b02a2429a8c1fcfa@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [16/50] [abbrv] hadoop git commit: HDFS-12225: [SPS]: Optimize extended attributes for tracking SPS movements. Contributed by Surendra Singh Lilhore. HDFS-12225: [SPS]: Optimize extended attributes for tracking SPS movements. Contributed by Surendra Singh Lilhore. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/71849149 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/71849149 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/71849149 Branch: refs/heads/HDFS-10285 Commit: 718491497aac108d4102b3ea8d7bd634e285d0f2 Parents: 135febf Author: Uma Maheswara Rao G Authored: Wed Aug 23 15:37:03 2017 -0700 Committer: Rakesh Radhakrishnan Committed: Sun Jul 15 20:19:52 2018 +0530 ---------------------------------------------------------------------- .../server/blockmanagement/BlockManager.java | 21 +- .../server/blockmanagement/DatanodeManager.java | 14 +- .../hdfs/server/datanode/BPOfferService.java | 1 + .../BlockStorageMovementAttemptedItems.java | 95 +++++--- .../namenode/BlockStorageMovementNeeded.java | 233 ++++++++++++++++++- .../namenode/FSDirSatisfyStoragePolicyOp.java | 91 +++----- .../hdfs/server/namenode/FSDirXAttrOp.java | 11 +- .../hdfs/server/namenode/FSDirectory.java | 2 +- .../hdfs/server/namenode/FSNamesystem.java | 2 +- .../server/namenode/StoragePolicySatisfier.java | 108 ++++++--- .../TestStoragePolicySatisfyWorker.java | 5 +- .../TestBlockStorageMovementAttemptedItems.java | 34 +-- .../TestPersistentStoragePolicySatisfier.java | 104 +++++++++ .../namenode/TestStoragePolicySatisfier.java | 127 +++++----- 14 files changed, 589 insertions(+), 259 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/71849149/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index a442a92..0ee558a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -89,7 +89,6 @@ import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; import org.apache.hadoop.hdfs.server.namenode.INodesInPath; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; -import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementNeeded; import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; @@ -431,9 +430,6 @@ public class BlockManager implements BlockStatsMXBean { private final StoragePolicySatisfier sps; private final boolean storagePolicyEnabled; private boolean spsEnabled; - private final BlockStorageMovementNeeded storageMovementNeeded = - new BlockStorageMovementNeeded(); - /** Minimum live replicas needed for the datanode to be transitioned * from ENTERING_MAINTENANCE to IN_MAINTENANCE. */ @@ -480,8 +476,7 @@ public class BlockManager implements BlockStatsMXBean { conf.getBoolean( DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT); - sps = new StoragePolicySatisfier(namesystem, storageMovementNeeded, this, - conf); + sps = new StoragePolicySatisfier(namesystem, this, conf); blockTokenSecretManager = createBlockTokenSecretManager(conf); providedStorageMap = new ProvidedStorageMap(namesystem, this, conf); @@ -5009,20 +5004,6 @@ public class BlockManager implements BlockStatsMXBean { } /** - * Set file block collection for which storage movement needed for its blocks. - * - * @param id - * - file block collection id. - */ - public void satisfyStoragePolicy(long id) { - storageMovementNeeded.add(id); - if (LOG.isDebugEnabled()) { - LOG.debug("Added block collection id {} to block " - + "storageMovementNeeded queue", id); - } - } - - /** * Gets the storage policy satisfier instance. * * @return sps http://git-wip-us.apache.org/repos/asf/hadoop/blob/71849149/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 2d7c80e..c8d31fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1751,6 +1751,13 @@ public class DatanodeManager { } } + if (nodeinfo.shouldDropSPSWork()) { + cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND); + // Set back to false to indicate that the new value has been sent to the + // datanode. + nodeinfo.setDropSPSWork(false); + } + // check pending block storage movement tasks BlockStorageMovementInfosBatch blkStorageMovementInfosBatch = nodeinfo .getBlocksToMoveStorages(); @@ -1762,13 +1769,6 @@ public class DatanodeManager { blkStorageMovementInfosBatch.getBlockMovingInfo())); } - if (nodeinfo.shouldDropSPSWork()) { - cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND); - // Set back to false to indicate that the new value has been sent to the - // datanode. - nodeinfo.setDropSPSWork(false); - } - if (!cmds.isEmpty()) { return cmds.toArray(new DatanodeCommand[cmds.size()]); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/71849149/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 79109b7..9308471 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -837,6 +837,7 @@ class BPOfferService { case DatanodeProtocol.DNA_UNCACHE: case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION: case DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT: + case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND: LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction()); break; default: http://git-wip-us.apache.org/repos/asf/hadoop/blob/71849149/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java index 37833e2..278b62b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo; import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status; import org.apache.hadoop.util.Daemon; @@ -54,7 +55,7 @@ public class BlockStorageMovementAttemptedItems { * A map holds the items which are already taken for blocks movements * processing and sent to DNs. */ - private final Map storageMovementAttemptedItems; + private final Map storageMovementAttemptedItems; private final List storageMovementAttemptedResults; private volatile boolean monitorRunning = true; private Daemon timerThread = null; @@ -91,18 +92,19 @@ public class BlockStorageMovementAttemptedItems { * Add item to block storage movement attempted items map which holds the * tracking/blockCollection id versus time stamp. * - * @param blockCollectionID - * - tracking id / block collection id + * @param itemInfo + * - tracking info * @param allBlockLocsAttemptedToSatisfy - * - failed to find matching target nodes to satisfy storage type for - * all the block locations of the given blockCollectionID + * - failed to find matching target nodes to satisfy storage type + * for all the block locations of the given blockCollectionID */ - public void add(Long blockCollectionID, - boolean allBlockLocsAttemptedToSatisfy) { + public void add(ItemInfo itemInfo, boolean allBlockLocsAttemptedToSatisfy) { synchronized (storageMovementAttemptedItems) { - ItemInfo itemInfo = new ItemInfo(monotonicNow(), + AttemptedItemInfo attemptedItemInfo = new AttemptedItemInfo( + itemInfo.getRootId(), itemInfo.getTrackId(), monotonicNow(), allBlockLocsAttemptedToSatisfy); - storageMovementAttemptedItems.put(blockCollectionID, itemInfo); + storageMovementAttemptedItems.put(itemInfo.getTrackId(), + attemptedItemInfo); } } @@ -167,21 +169,27 @@ public class BlockStorageMovementAttemptedItems { * satisfy storage policy. This is used by * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}. */ - private final static class ItemInfo { + private final static class AttemptedItemInfo extends ItemInfo { private long lastAttemptedOrReportedTime; private final boolean allBlockLocsAttemptedToSatisfy; /** - * ItemInfo constructor. + * AttemptedItemInfo constructor. * + * @param rootId + * rootId for trackId + * @param trackId + * trackId for file. * @param lastAttemptedOrReportedTime * last attempted or reported time * @param allBlockLocsAttemptedToSatisfy * whether all the blocks in the trackID were attempted and blocks * movement has been scheduled to satisfy storage policy */ - private ItemInfo(long lastAttemptedOrReportedTime, + private AttemptedItemInfo(long rootId, long trackId, + long lastAttemptedOrReportedTime, boolean allBlockLocsAttemptedToSatisfy) { + super(rootId, trackId); this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime; this.allBlockLocsAttemptedToSatisfy = allBlockLocsAttemptedToSatisfy; } @@ -211,6 +219,7 @@ public class BlockStorageMovementAttemptedItems { private void touchLastReportedTimeStamp() { this.lastAttemptedOrReportedTime = monotonicNow(); } + } /** @@ -239,18 +248,20 @@ public class BlockStorageMovementAttemptedItems { @VisibleForTesting void blocksStorageMovementUnReportedItemsCheck() { synchronized (storageMovementAttemptedItems) { - Iterator> iter = storageMovementAttemptedItems - .entrySet().iterator(); + Iterator> iter = + storageMovementAttemptedItems.entrySet().iterator(); long now = monotonicNow(); while (iter.hasNext()) { - Entry entry = iter.next(); - ItemInfo itemInfo = entry.getValue(); + Entry entry = iter.next(); + AttemptedItemInfo itemInfo = entry.getValue(); if (now > itemInfo.getLastAttemptedOrReportedTime() + selfRetryTimeout) { Long blockCollectionID = entry.getKey(); synchronized (storageMovementAttemptedResults) { if (!isExistInResult(blockCollectionID)) { - blockStorageMovementNeeded.add(blockCollectionID); + ItemInfo candidate = new ItemInfo( + itemInfo.getRootId(), blockCollectionID); + blockStorageMovementNeeded.add(candidate); iter.remove(); LOG.info("TrackID: {} becomes timed out and moved to needed " + "retries queue for next iteration.", blockCollectionID); @@ -297,17 +308,30 @@ public class BlockStorageMovementAttemptedItems { synchronized (storageMovementAttemptedItems) { Status status = storageMovementAttemptedResult.getStatus(); long trackId = storageMovementAttemptedResult.getTrackId(); - ItemInfo itemInfo; + AttemptedItemInfo attemptedItemInfo = storageMovementAttemptedItems + .get(trackId); + // itemInfo is null means no root for trackId, using trackId only as + // root and handling it in + // blockStorageMovementNeeded#removeIteamTrackInfo() for cleaning + // the xAttr + ItemInfo itemInfo = new ItemInfo((attemptedItemInfo != null) + ? attemptedItemInfo.getRootId() : trackId, trackId); switch (status) { case FAILURE: - blockStorageMovementNeeded.add(trackId); - LOG.warn("Blocks storage movement results for the tracking id: {}" - + " is reported from co-ordinating datanode, but result" - + " status is FAILURE. So, added for retry", trackId); + if (attemptedItemInfo != null) { + blockStorageMovementNeeded.add(itemInfo); + LOG.warn("Blocks storage movement results for the tracking id:" + + "{} is reported from co-ordinating datanode, but result" + + " status is FAILURE. So, added for retry", trackId); + } else { + LOG.info("Blocks storage movement is FAILURE for the track" + + " id {}. But the trackID doesn't exists in" + + " storageMovementAttemptedItems list.", trackId); + blockStorageMovementNeeded + .removeItemTrackInfo(itemInfo); + } break; case SUCCESS: - itemInfo = storageMovementAttemptedItems.get(trackId); - // ItemInfo could be null. One case is, before the blocks movements // result arrives the attempted trackID became timed out and then // removed the trackID from the storageMovementAttemptedItems list. @@ -318,33 +342,32 @@ public class BlockStorageMovementAttemptedItems { // storage policy in previous SPS iteration. String msg = "Blocks storage movement is SUCCESS for the track id: " + trackId + " reported from co-ordinating datanode."; - if (itemInfo != null) { - if (!itemInfo.isAllBlockLocsAttemptedToSatisfy()) { - blockStorageMovementNeeded.add(trackId); + if (attemptedItemInfo != null) { + if (!attemptedItemInfo.isAllBlockLocsAttemptedToSatisfy()) { + blockStorageMovementNeeded + .add(new ItemInfo(attemptedItemInfo.getRootId(), trackId)); LOG.warn("{} But adding trackID back to retry queue as some of" + " the blocks couldn't find matching target nodes in" + " previous SPS iteration.", msg); } else { LOG.info(msg); - // Remove xattr for the track id. - this.sps.postBlkStorageMovementCleanup( - storageMovementAttemptedResult.getTrackId()); + blockStorageMovementNeeded + .removeItemTrackInfo(itemInfo); } } else { LOG.info("{} But the trackID doesn't exists in " + "storageMovementAttemptedItems list", msg); - // Remove xattr for the track id. - this.sps.postBlkStorageMovementCleanup( - storageMovementAttemptedResult.getTrackId()); + blockStorageMovementNeeded + .removeItemTrackInfo(itemInfo); } break; case IN_PROGRESS: isInprogress = true; - itemInfo = storageMovementAttemptedItems + attemptedItemInfo = storageMovementAttemptedItems .get(storageMovementAttemptedResult.getTrackId()); - if(itemInfo != null){ + if(attemptedItemInfo != null){ // update the attempted expiration time to next cycle. - itemInfo.touchLastReportedTimeStamp(); + attemptedItemInfo.touchLastReportedTimeStamp(); } break; default: http://git-wip-us.apache.org/repos/asf/hadoop/blob/71849149/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java index 3241e6d..41a3a6c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java @@ -17,28 +17,86 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.Queue; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo; +import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; +import org.apache.hadoop.util.Daemon; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * A Class to track the block collection IDs for which physical storage movement - * needed as per the Namespace and StorageReports from DN. + * A Class to track the block collection IDs (Inode's ID) for which physical + * storage movement needed as per the Namespace and StorageReports from DN. + * It scan the pending directories for which storage movement is required and + * schedule the block collection IDs for movement. It track the info of + * scheduled items and remove the SPS xAttr from the file/Directory once + * movement is success. */ @InterfaceAudience.Private public class BlockStorageMovementNeeded { - private final Queue storageMovementNeeded = new LinkedList(); + + public static final Logger LOG = + LoggerFactory.getLogger(BlockStorageMovementNeeded.class); + + private final Queue storageMovementNeeded = + new LinkedList(); /** - * Add the block collection id to tracking list for which storage movement + * Map of rootId and number of child's. Number of child's indicate the number + * of files pending to satisfy the policy. + */ + private final Map pendingWorkForDirectory = + new HashMap(); + + private final Namesystem namesystem; + + // List of pending dir to satisfy the policy + private final Queue spsDirsToBeTraveresed = new LinkedList(); + + private final StoragePolicySatisfier sps; + + private Daemon fileInodeIdCollector; + + public BlockStorageMovementNeeded(Namesystem namesystem, + StoragePolicySatisfier sps) { + this.namesystem = namesystem; + this.sps = sps; + } + + /** + * Add the candidate to tracking list for which storage movement * expected if necessary. * - * @param blockCollectionID - * - block collection id, which is nothing but inode id. + * @param trackInfo + * - track info for satisfy the policy */ - public synchronized void add(Long blockCollectionID) { - storageMovementNeeded.add(blockCollectionID); + public synchronized void add(ItemInfo trackInfo) { + storageMovementNeeded.add(trackInfo); + } + + /** + * Add the itemInfo to tracking list for which storage movement + * expected if necessary. + * @param rootId + * - root inode id + * @param itemInfoList + * - List of child in the directory + */ + private synchronized void addAll(Long rootId, + List itemInfoList) { + storageMovementNeeded.addAll(itemInfoList); + pendingWorkForDirectory.put(rootId, itemInfoList.size()); } /** @@ -47,11 +105,168 @@ public class BlockStorageMovementNeeded { * * @return block collection ID */ - public synchronized Long get() { + public synchronized ItemInfo get() { return storageMovementNeeded.poll(); } + public synchronized void addToPendingDirQueue(long id) { + spsDirsToBeTraveresed.add(id); + // Notify waiting FileInodeIdCollector thread about the newly + // added SPS path. + synchronized (spsDirsToBeTraveresed) { + spsDirsToBeTraveresed.notify(); + } + } + public synchronized void clearAll() { + spsDirsToBeTraveresed.clear(); storageMovementNeeded.clear(); + pendingWorkForDirectory.clear(); + } + + /** + * Decrease the pending child count for directory once one file blocks moved + * successfully. Remove the SPS xAttr if pending child count is zero. + */ + public synchronized void removeItemTrackInfo(ItemInfo trackInfo) + throws IOException { + if (trackInfo.isDir()) { + // If track is part of some root then reduce the pending directory work + // count. + long rootId = trackInfo.getRootId(); + INode inode = namesystem.getFSDirectory().getInode(rootId); + if (inode == null) { + // directory deleted just remove it. + this.pendingWorkForDirectory.remove(rootId); + } else { + if (pendingWorkForDirectory.get(rootId) != null) { + Integer pendingWork = pendingWorkForDirectory.get(rootId) - 1; + pendingWorkForDirectory.put(rootId, pendingWork); + if (pendingWork <= 0) { + namesystem.removeXattr(rootId, XATTR_SATISFY_STORAGE_POLICY); + pendingWorkForDirectory.remove(rootId); + } + } + } + } else { + // Remove xAttr if trackID doesn't exist in + // storageMovementAttemptedItems or file policy satisfied. + namesystem.removeXattr(trackInfo.getTrackId(), + XATTR_SATISFY_STORAGE_POLICY); + } + } + + public synchronized void clearQueue(long trackId) { + spsDirsToBeTraveresed.remove(trackId); + Iterator iterator = storageMovementNeeded.iterator(); + while (iterator.hasNext()) { + ItemInfo next = iterator.next(); + if (next.getRootId() == trackId) { + iterator.remove(); + } + } + pendingWorkForDirectory.remove(trackId); + } + + /** + * Clean all the movements in spsDirsToBeTraveresed/storageMovementNeeded + * and notify to clean up required resources. + * @throws IOException + */ + public synchronized void clearQueuesWithNotification() { + // Remove xAttr from directories + Long trackId; + while ((trackId = spsDirsToBeTraveresed.poll()) != null) { + try { + // Remove xAttr for file + namesystem.removeXattr(trackId, XATTR_SATISFY_STORAGE_POLICY); + } catch (IOException ie) { + LOG.warn("Failed to remove SPS xattr for track id " + trackId, ie); + } + } + + // File's directly added to storageMovementNeeded, So try to remove + // xAttr for file + ItemInfo itemInfo; + while ((itemInfo = storageMovementNeeded.poll()) != null) { + try { + // Remove xAttr for file + if (!itemInfo.isDir()) { + namesystem.removeXattr(itemInfo.getTrackId(), + XATTR_SATISFY_STORAGE_POLICY); + } + } catch (IOException ie) { + LOG.warn( + "Failed to remove SPS xattr for track id " + + itemInfo.getTrackId(), ie); + } + } + this.clearAll(); + } + + /** + * Take dir tack ID from the spsDirsToBeTraveresed queue and collect child + * ID's to process for satisfy the policy. + */ + private class FileInodeIdCollector implements Runnable { + @Override + public void run() { + LOG.info("Starting FileInodeIdCollector!."); + while (namesystem.isRunning() && sps.isRunning()) { + try { + if (!namesystem.isInSafeMode()) { + FSDirectory fsd = namesystem.getFSDirectory(); + Long rootINodeId = spsDirsToBeTraveresed.poll(); + if (rootINodeId == null) { + // Waiting for SPS path + synchronized (spsDirsToBeTraveresed) { + spsDirsToBeTraveresed.wait(5000); + } + } else { + INode rootInode = fsd.getInode(rootINodeId); + if (rootInode != null) { + // TODO : HDFS-12291 + // 1. Implement an efficient recursive directory iteration + // mechanism and satisfies storage policy for all the files + // under the given directory. + // 2. Process files in batches,so datanodes workload can be + // handled. + List itemInfoList = + new ArrayList<>(); + for (INode childInode : rootInode.asDirectory() + .getChildrenList(Snapshot.CURRENT_STATE_ID)) { + if (childInode.isFile() + && childInode.asFile().numBlocks() != 0) { + itemInfoList.add( + new ItemInfo(rootINodeId, childInode.getId())); + } + } + if (itemInfoList.isEmpty()) { + // satisfy track info is empty, so remove the xAttr from the + // directory + namesystem.removeXattr(rootINodeId, + XATTR_SATISFY_STORAGE_POLICY); + } + addAll(rootINodeId, itemInfoList); + } + } + } + } catch (Throwable t) { + LOG.warn("Exception while loading inodes to satisfy the policy", t); + } + } + } + } + + public void start() { + fileInodeIdCollector = new Daemon(new FileInodeIdCollector()); + fileInodeIdCollector.setName("FileInodeIdCollector"); + fileInodeIdCollector.start(); + } + + public void stop() { + if (fileInodeIdCollector != null) { + fileInodeIdCollector.interrupt(); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/71849149/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java index bd4e5ed..fb6eec9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; import java.io.IOException; -import java.util.ArrayList; import java.util.EnumSet; import java.util.List; @@ -31,6 +30,7 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hdfs.XAttrHelper; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp; +import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import com.google.common.collect.Lists; @@ -60,10 +60,24 @@ final class FSDirSatisfyStoragePolicyOp { if (fsd.isPermissionEnabled()) { fsd.checkPathAccess(pc, iip, FsAction.WRITE); } - XAttr satisfyXAttr = unprotectedSatisfyStoragePolicy(iip, bm, fsd); - if (satisfyXAttr != null) { + INode inode = FSDirectory.resolveLastINode(iip); + if (inodeHasSatisfyXAttr(inode)) { + throw new IOException( + "Cannot request to call satisfy storage policy on path " + + inode.getFullPathName() + + ", as this file/dir was already called for satisfying " + + "storage policy."); + } + if (unprotectedSatisfyStoragePolicy(inode, fsd)) { + XAttr satisfyXAttr = XAttrHelper + .buildXAttr(XATTR_SATISFY_STORAGE_POLICY); List xAttrs = Lists.newArrayListWithCapacity(1); xAttrs.add(satisfyXAttr); + List existingXAttrs = XAttrStorage.readINodeXAttrs(inode); + List newXAttrs = FSDirXAttrOp.setINodeXAttrs(fsd, existingXAttrs, + xAttrs, EnumSet.of(XAttrSetFlag.CREATE)); + XAttrStorage.updateINodeXAttrs(inode, newXAttrs, + iip.getLatestSnapshotId()); fsd.getEditLog().logSetXAttrs(src, xAttrs, logRetryCache); } } finally { @@ -72,62 +86,29 @@ final class FSDirSatisfyStoragePolicyOp { return fsd.getAuditFileInfo(iip); } - static XAttr unprotectedSatisfyStoragePolicy(INodesInPath iip, - BlockManager bm, FSDirectory fsd) throws IOException { - - final INode inode = FSDirectory.resolveLastINode(iip); - final int snapshotId = iip.getLatestSnapshotId(); - final List candidateNodes = new ArrayList<>(); - - // TODO: think about optimization here, label the dir instead - // of the sub-files of the dir. + static boolean unprotectedSatisfyStoragePolicy(INode inode, FSDirectory fsd) { if (inode.isFile() && inode.asFile().numBlocks() != 0) { - candidateNodes.add(inode); - } else if (inode.isDirectory()) { - for (INode node : inode.asDirectory().getChildrenList(snapshotId)) { - if (node.isFile() && node.asFile().numBlocks() != 0) { - candidateNodes.add(node); - } - } - } - - if (candidateNodes.isEmpty()) { - return null; + // Adding directly in the storageMovementNeeded queue, So it can + // get more priority compare to directory. + fsd.getBlockManager().getStoragePolicySatisfier() + .satisfyStoragePolicy(inode.getId()); + return true; + } else if (inode.isDirectory() + && inode.asDirectory().getChildrenNum(Snapshot.CURRENT_STATE_ID) > 0) { + // Adding directory in the pending queue, so FileInodeIdCollector process + // directory child in batch and recursively + fsd.getBlockManager().getStoragePolicySatisfier() + .addInodeToPendingDirQueue(inode.getId()); + return true; } - // If node has satisfy xattr, then stop adding it - // to satisfy movement queue. - if (inodeHasSatisfyXAttr(candidateNodes)) { - throw new IOException( - "Cannot request to call satisfy storage policy on path " - + iip.getPath() - + ", as this file/dir was already called for satisfying " - + "storage policy."); - } - - final List xattrs = Lists.newArrayListWithCapacity(1); - final XAttr satisfyXAttr = XAttrHelper - .buildXAttr(XATTR_SATISFY_STORAGE_POLICY); - xattrs.add(satisfyXAttr); - - for (INode node : candidateNodes) { - bm.satisfyStoragePolicy(node.getId()); - List existingXAttrs = XAttrStorage.readINodeXAttrs(node); - List newXAttrs = FSDirXAttrOp.setINodeXAttrs(fsd, existingXAttrs, - xattrs, EnumSet.of(XAttrSetFlag.CREATE)); - XAttrStorage.updateINodeXAttrs(node, newXAttrs, snapshotId); - } - return satisfyXAttr; + return false; } - private static boolean inodeHasSatisfyXAttr(List candidateNodes) { - // If the node is a directory and one of the child files - // has satisfy xattr, then return true for this directory. - for (INode inode : candidateNodes) { - final XAttrFeature f = inode.getXAttrFeature(); - if (inode.isFile() && f != null - && f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null) { - return true; - } + private static boolean inodeHasSatisfyXAttr(INode inode) { + final XAttrFeature f = inode.getXAttrFeature(); + if (inode.isFile() && f != null + && f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null) { + return true; } return false; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/71849149/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java index 3c6f837..459e697 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java @@ -206,6 +206,14 @@ class FSDirXAttrOp { List newXAttrs = filterINodeXAttrs(existingXAttrs, toRemove, removedXAttrs); if (existingXAttrs.size() != newXAttrs.size()) { + for (XAttr xattr : toRemove) { + if (XATTR_SATISFY_STORAGE_POLICY + .equals(XAttrHelper.getPrefixedName(xattr))) { + fsd.getBlockManager().getStoragePolicySatisfier() + .clearQueue(inode.getId()); + break; + } + } XAttrStorage.updateINodeXAttrs(inode, newXAttrs, snapshotId); return removedXAttrs; } @@ -297,8 +305,7 @@ class FSDirXAttrOp { // Add inode id to movement queue if xattrs contain satisfy xattr. if (XATTR_SATISFY_STORAGE_POLICY.equals(xaName)) { - FSDirSatisfyStoragePolicyOp.unprotectedSatisfyStoragePolicy(iip, - fsd.getBlockManager(), fsd); + FSDirSatisfyStoragePolicyOp.unprotectedSatisfyStoragePolicy(inode, fsd); continue; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/71849149/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 1a06105..35341d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -1415,7 +1415,7 @@ public class FSDirectory implements Closeable { if (xattr == null) { return; } - getBlockManager().satisfyStoragePolicy(inode.getId()); + FSDirSatisfyStoragePolicyOp.unprotectedSatisfyStoragePolicy(inode, this); } private void addEncryptionZone(INodeWithAdditionalFields inode, http://git-wip-us.apache.org/repos/asf/hadoop/blob/71849149/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 9e434c3..558d09b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -1318,7 +1318,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, writeLock(); try { if (blockManager != null) { - blockManager.stopSPS(true); + blockManager.stopSPS(false); } stopSecretManager(); leaseManager.stopMonitor(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/71849149/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java index 3165813..48d0598 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; - -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -106,10 +103,10 @@ public class StoragePolicySatisfier implements Runnable { } public StoragePolicySatisfier(final Namesystem namesystem, - final BlockStorageMovementNeeded storageMovementNeeded, final BlockManager blkManager, Configuration conf) { this.namesystem = namesystem; - this.storageMovementNeeded = storageMovementNeeded; + this.storageMovementNeeded = new BlockStorageMovementNeeded(namesystem, + this); this.blockManager = blkManager; this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems( conf.getLong( @@ -146,7 +143,7 @@ public class StoragePolicySatisfier implements Runnable { // Ensure that all the previously submitted block movements(if any) have to // be stopped in all datanodes. addDropSPSWorkCommandsToAllDNs(); - + storageMovementNeeded.start(); storagePolicySatisfierThread = new Daemon(this); storagePolicySatisfierThread.setName("StoragePolicySatisfier"); storagePolicySatisfierThread.start(); @@ -162,14 +159,17 @@ public class StoragePolicySatisfier implements Runnable { */ public synchronized void disable(boolean forceStop) { isRunning = false; + if (storagePolicySatisfierThread == null) { return; } + storageMovementNeeded.stop(); + storagePolicySatisfierThread.interrupt(); this.storageMovementsMonitor.stop(); if (forceStop) { - this.clearQueuesWithNotification(); + storageMovementNeeded.clearQueuesWithNotification(); addDropSPSWorkCommandsToAllDNs(); } else { LOG.info("Stopping StoragePolicySatisfier."); @@ -184,6 +184,7 @@ public class StoragePolicySatisfier implements Runnable { disable(true); } this.storageMovementsMonitor.stopGracefully(); + if (storagePolicySatisfierThread == null) { return; } @@ -220,10 +221,11 @@ public class StoragePolicySatisfier implements Runnable { while (namesystem.isRunning() && isRunning) { try { if (!namesystem.isInSafeMode()) { - Long blockCollectionID = storageMovementNeeded.get(); - if (blockCollectionID != null) { + ItemInfo itemInfo = storageMovementNeeded.get(); + if (itemInfo != null) { + long trackId = itemInfo.getTrackId(); BlockCollection blockCollection = - namesystem.getBlockCollection(blockCollectionID); + namesystem.getBlockCollection(trackId); // Check blockCollectionId existence. if (blockCollection != null) { BlocksMovingAnalysisStatus status = @@ -234,21 +236,21 @@ public class StoragePolicySatisfier implements Runnable { // Just add to monitor, so it will be tracked for result and // be removed on successful storage movement result. case ALL_BLOCKS_TARGETS_PAIRED: - this.storageMovementsMonitor.add(blockCollectionID, true); + this.storageMovementsMonitor.add(itemInfo, true); break; // Add to monitor with allBlcoksAttemptedToSatisfy flag false, so // that it will be tracked and still it will be consider for retry // as analysis was not found targets for storage movement blocks. case FEW_BLOCKS_TARGETS_PAIRED: - this.storageMovementsMonitor.add(blockCollectionID, false); + this.storageMovementsMonitor.add(itemInfo, false); break; case FEW_LOW_REDUNDANCY_BLOCKS: if (LOG.isDebugEnabled()) { - LOG.debug("Adding trackID " + blockCollectionID + LOG.debug("Adding trackID " + trackId + " back to retry queue as some of the blocks" + " are low redundant."); } - this.storageMovementNeeded.add(blockCollectionID); + this.storageMovementNeeded.add(itemInfo); break; // Just clean Xattrs case BLOCKS_TARGET_PAIRING_SKIPPED: @@ -256,9 +258,13 @@ public class StoragePolicySatisfier implements Runnable { default: LOG.info("Block analysis skipped or blocks already satisfied" + " with storages. So, Cleaning up the Xattrs."); - postBlkStorageMovementCleanup(blockCollectionID); + storageMovementNeeded.removeItemTrackInfo(itemInfo); break; } + } else { + // File doesn't exists (maybe got deleted), remove trackId from + // the queue + storageMovementNeeded.removeItemTrackInfo(itemInfo); } } } @@ -828,31 +834,63 @@ public class StoragePolicySatisfier implements Runnable { } /** - * Clean all the movements in storageMovementNeeded and notify - * to clean up required resources. - * @throws IOException + * Set file inode in queue for which storage movement needed for its blocks. + * + * @param inodeId + * - file inode/blockcollection id. */ - private void clearQueuesWithNotification() { - Long id; - while ((id = storageMovementNeeded.get()) != null) { - try { - postBlkStorageMovementCleanup(id); - } catch (IOException ie) { - LOG.warn("Failed to remove SPS " - + "xattr for collection id " + id, ie); - } + public void satisfyStoragePolicy(Long inodeId) { + //For file rootId and trackId is same + storageMovementNeeded.add(new ItemInfo(inodeId, inodeId)); + if (LOG.isDebugEnabled()) { + LOG.debug("Added track info for inode {} to block " + + "storageMovementNeeded queue", inodeId); } } + public void addInodeToPendingDirQueue(long id) { + storageMovementNeeded.addToPendingDirQueue(id); + } + + /** + * Clear queues for given track id. + */ + public void clearQueue(long trackId) { + storageMovementNeeded.clearQueue(trackId); + } + /** - * When block movement has been finished successfully, some additional - * operations should be notified, for example, SPS xattr should be - * removed. - * @param trackId track id i.e., block collection id. - * @throws IOException + * ItemInfo is a file info object for which need to satisfy the + * policy. */ - public void postBlkStorageMovementCleanup(long trackId) - throws IOException { - this.namesystem.removeXattr(trackId, XATTR_SATISFY_STORAGE_POLICY); + public static class ItemInfo { + private long rootId; + private long trackId; + + public ItemInfo(long rootId, long trackId) { + this.rootId = rootId; + this.trackId = trackId; + } + + /** + * Return the root of the current track Id. + */ + public long getRootId() { + return rootId; + } + + /** + * Return the File inode Id for which needs to satisfy the policy. + */ + public long getTrackId() { + return trackId; + } + + /** + * Returns true if the tracking path is a directory, false otherwise. + */ + public boolean isDir() { + return (rootId != trackId); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/71849149/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java index 402d4d1..b84b1d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; @@ -115,9 +114,7 @@ public class TestStoragePolicySatisfyWorker { // move to ARCHIVE dfs.setStoragePolicy(new Path(file), "COLD"); - FSNamesystem namesystem = cluster.getNamesystem(); - INode inode = namesystem.getFSDirectory().getINode(file); - namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); + dfs.satisfyStoragePolicy(new Path(file)); cluster.triggerHeartbeats(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/71849149/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java index 04a63ac..55ebf9c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.apache.hadoop.util.Time.monotonicNow; import static org.junit.Assert.*; +import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo; import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.junit.After; import org.junit.Before; @@ -38,7 +39,9 @@ public class TestBlockStorageMovementAttemptedItems { @Before public void setup() throws Exception { - unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(); + unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded( + Mockito.mock(Namesystem.class), + Mockito.mock(StoragePolicySatisfier.class)); StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class); bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100, selfRetryTimeout, unsatisfiedStorageMovementFiles, sps); @@ -57,9 +60,9 @@ public class TestBlockStorageMovementAttemptedItems { long stopTime = monotonicNow() + (retryTimeout * 2); boolean isItemFound = false; while (monotonicNow() < (stopTime)) { - Long ele = null; + ItemInfo ele = null; while ((ele = unsatisfiedStorageMovementFiles.get()) != null) { - if (item.longValue() == ele.longValue()) { + if (item == ele.getTrackId()) { isItemFound = true; break; } @@ -77,7 +80,7 @@ public class TestBlockStorageMovementAttemptedItems { public void testAddResultWithFailureResult() throws Exception { bsmAttemptedItems.start(); // start block movement result monitor thread Long item = new Long(1234); - bsmAttemptedItems.add(item, true); + bsmAttemptedItems.add(new ItemInfo(0L, item), true); bsmAttemptedItems.addResults( new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( item.longValue(), BlocksStorageMovementResult.Status.FAILURE)}); @@ -88,7 +91,7 @@ public class TestBlockStorageMovementAttemptedItems { public void testAddResultWithSucessResult() throws Exception { bsmAttemptedItems.start(); // start block movement result monitor thread Long item = new Long(1234); - bsmAttemptedItems.add(item, true); + bsmAttemptedItems.add(new ItemInfo(0L, item), true); bsmAttemptedItems.addResults( new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)}); @@ -99,7 +102,7 @@ public class TestBlockStorageMovementAttemptedItems { public void testNoResultAdded() throws Exception { bsmAttemptedItems.start(); // start block movement result monitor thread Long item = new Long(1234); - bsmAttemptedItems.add(item, true); + bsmAttemptedItems.add(new ItemInfo(0L, item), true); // After self retry timeout, it should be added back for retry assertTrue("Failed to add to the retry list", checkItemMovedForRetry(item, 600)); @@ -115,7 +118,7 @@ public class TestBlockStorageMovementAttemptedItems { @Test(timeout = 30000) public void testPartialBlockMovementShouldBeRetried1() throws Exception { Long item = new Long(1234); - bsmAttemptedItems.add(item, false); + bsmAttemptedItems.add(new ItemInfo(0L, item), false); bsmAttemptedItems.addResults( new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)}); @@ -136,7 +139,7 @@ public class TestBlockStorageMovementAttemptedItems { @Test(timeout = 30000) public void testPartialBlockMovementShouldBeRetried2() throws Exception { Long item = new Long(1234); - bsmAttemptedItems.add(item, false); + bsmAttemptedItems.add(new ItemInfo(0L, item), false); bsmAttemptedItems.addResults( new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)}); @@ -153,17 +156,20 @@ public class TestBlockStorageMovementAttemptedItems { } /** - * Partial block movement with only BlocksStorageMovementResult#FAILURE result - * and storageMovementAttemptedItems list is empty. + * Partial block movement with only BlocksStorageMovementResult#FAILURE + * result and storageMovementAttemptedItems list is empty. */ @Test(timeout = 30000) - public void testPartialBlockMovementShouldBeRetried3() throws Exception { + public void testPartialBlockMovementWithEmptyAttemptedQueue() + throws Exception { Long item = new Long(1234); bsmAttemptedItems.addResults( new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( - item.longValue(), BlocksStorageMovementResult.Status.FAILURE)}); + item, BlocksStorageMovementResult.Status.FAILURE)}); bsmAttemptedItems.blockStorageMovementResultCheck(); - assertTrue("Failed to add to the retry list", + assertFalse( + "Should not add in queue again if it is not there in" + + " storageMovementAttemptedItems", checkItemMovedForRetry(item, 5000)); assertEquals("Failed to remove from the attempted list", 0, bsmAttemptedItems.getAttemptedItemsCount()); @@ -176,7 +182,7 @@ public class TestBlockStorageMovementAttemptedItems { @Test(timeout = 30000) public void testPartialBlockMovementShouldBeRetried4() throws Exception { Long item = new Long(1234); - bsmAttemptedItems.add(item, false); + bsmAttemptedItems.add(new ItemInfo(0L, item), false); bsmAttemptedItems.addResults( new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( item.longValue(), BlocksStorageMovementResult.Status.FAILURE)}); http://git-wip-us.apache.org/repos/asf/hadoop/blob/71849149/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java index 8516ea0..e7b9148 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java @@ -20,16 +20,22 @@ package org.apache.hadoop.hdfs.server.namenode; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.Test; +import com.google.common.base.Supplier; + import java.io.IOException; +import java.util.List; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; import static org.junit.Assert.*; @@ -482,6 +488,104 @@ public class TestPersistentStoragePolicySatisfier { } /** + * Test SPS xAttr on directory. xAttr should be removed from the directory + * once all the files blocks moved to specific storage. + */ + @Test(timeout = 300000) + public void testSPSxAttrWhenSpsCalledForDir() throws Exception { + try { + clusterSetUp(); + Path parent = new Path("/parent"); + // create parent dir + fs.mkdirs(parent); + + // create 10 child files + for (int i = 0; i < 5; i++) { + DFSTestUtil.createFile(fs, new Path(parent, "f" + i), 1024, (short) 3, + 0); + } + + // Set storage policy for parent directory + fs.setStoragePolicy(parent, "COLD"); + + // Stop one DN so we can check the SPS xAttr for directory. + DataNodeProperties stopDataNode = cluster.stopDataNode(0); + + fs.satisfyStoragePolicy(parent); + + // Check xAttr for parent directory + FSNamesystem namesystem = cluster.getNamesystem(); + INode inode = namesystem.getFSDirectory().getINode("/parent"); + XAttrFeature f = inode.getXAttrFeature(); + assertTrue("SPS xAttr should be exist", + f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null); + + // check for the child, SPS xAttr should not be there + for (int i = 0; i < 5; i++) { + inode = namesystem.getFSDirectory().getINode("/parent/f" + i); + f = inode.getXAttrFeature(); + assertTrue(f == null); + } + + cluster.restartDataNode(stopDataNode, false); + + // wait and check all the file block moved in ARCHIVE + for (int i = 0; i < 5; i++) { + DFSTestUtil.waitExpectedStorageType("/parent/f" + i, + StorageType.ARCHIVE, 3, 30000, cluster.getFileSystem()); + } + DFSTestUtil.waitForXattrRemoved("/parent", XATTR_SATISFY_STORAGE_POLICY, + namesystem, 10000); + } finally { + clusterShutdown(); + } + + } + + /** + * Test SPS xAttr on file. xAttr should be removed from the file + * once all the blocks moved to specific storage. + */ + @Test(timeout = 300000) + public void testSPSxAttrWhenSpsCalledForFile() throws Exception { + try { + clusterSetUp(); + Path file = new Path("/file"); + DFSTestUtil.createFile(fs, file, 1024, (short) 3, 0); + + // Set storage policy for file + fs.setStoragePolicy(file, "COLD"); + + // Stop one DN so we can check the SPS xAttr for file. + DataNodeProperties stopDataNode = cluster.stopDataNode(0); + + fs.satisfyStoragePolicy(file); + + // Check xAttr for parent directory + FSNamesystem namesystem = cluster.getNamesystem(); + INode inode = namesystem.getFSDirectory().getINode("/file"); + XAttrFeature f = inode.getXAttrFeature(); + assertTrue("SPS xAttr should be exist", + f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null); + + cluster.restartDataNode(stopDataNode, false); + + // wait and check all the file block moved in ARCHIVE + DFSTestUtil.waitExpectedStorageType("/file", StorageType.ARCHIVE, 3, + 30000, cluster.getFileSystem()); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + List existingXAttrs = XAttrStorage.readINodeXAttrs(inode); + return !existingXAttrs.contains(XATTR_SATISFY_STORAGE_POLICY); + } + }, 100, 10000); + } finally { + clusterShutdown(); + } + } + + /** * Restart the hole env and trigger the DataNode's heart beats. * @throws Exception */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/71849149/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java index 2536834..3375590 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java @@ -119,8 +119,6 @@ public class TestStoragePolicySatisfier { private void doTestWhenStoragePolicySetToCOLD() throws Exception { // Change policy to COLD dfs.setStoragePolicy(new Path(file), COLD); - FSNamesystem namesystem = hdfsCluster.getNamesystem(); - INode inode = namesystem.getFSDirectory().getINode(file); StorageType[][] newtypes = new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}, @@ -129,7 +127,7 @@ public class TestStoragePolicySatisfier { startAdditionalDNs(config, 3, numOfDatanodes, newtypes, storagesPerDatanode, capacity, hdfsCluster); - namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); + dfs.satisfyStoragePolicy(new Path(file)); hdfsCluster.triggerHeartbeats(); // Wait till namenode notified about the block location details @@ -144,8 +142,6 @@ public class TestStoragePolicySatisfier { createCluster(); // Change policy to ALL_SSD dfs.setStoragePolicy(new Path(file), "ALL_SSD"); - FSNamesystem namesystem = hdfsCluster.getNamesystem(); - INode inode = namesystem.getFSDirectory().getINode(file); StorageType[][] newtypes = new StorageType[][]{{StorageType.SSD, StorageType.DISK}, @@ -156,7 +152,7 @@ public class TestStoragePolicySatisfier { // datanodes. startAdditionalDNs(config, 3, numOfDatanodes, newtypes, storagesPerDatanode, capacity, hdfsCluster); - namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); + dfs.satisfyStoragePolicy(new Path(file)); hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier Identified that block to move to SSD // areas @@ -174,8 +170,6 @@ public class TestStoragePolicySatisfier { createCluster(); // Change policy to ONE_SSD dfs.setStoragePolicy(new Path(file), ONE_SSD); - FSNamesystem namesystem = hdfsCluster.getNamesystem(); - INode inode = namesystem.getFSDirectory().getINode(file); StorageType[][] newtypes = new StorageType[][]{{StorageType.SSD, StorageType.DISK}}; @@ -184,7 +178,7 @@ public class TestStoragePolicySatisfier { // datanodes. startAdditionalDNs(config, 1, numOfDatanodes, newtypes, storagesPerDatanode, capacity, hdfsCluster); - namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); + dfs.satisfyStoragePolicy(new Path(file)); hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier Identified that block to move to SSD // areas @@ -207,8 +201,6 @@ public class TestStoragePolicySatisfier { createCluster(); // Change policy to ONE_SSD dfs.setStoragePolicy(new Path(file), ONE_SSD); - FSNamesystem namesystem = hdfsCluster.getNamesystem(); - INode inode = namesystem.getFSDirectory().getINode(file); StorageType[][] newtypes = new StorageType[][]{{StorageType.SSD, StorageType.DISK}}; @@ -217,7 +209,7 @@ public class TestStoragePolicySatisfier { // datanodes. startAdditionalDNs(config, 1, numOfDatanodes, newtypes, storagesPerDatanode, capacity, hdfsCluster); - namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); + dfs.satisfyStoragePolicy(new Path(file)); hdfsCluster.triggerHeartbeats(); // Wait till the block is moved to SSD areas @@ -250,13 +242,10 @@ public class TestStoragePolicySatisfier { files.add(file1); writeContent(file1); } - FSNamesystem namesystem = hdfsCluster.getNamesystem(); - List blockCollectionIds = new ArrayList<>(); // Change policy to ONE_SSD for (String fileName : files) { dfs.setStoragePolicy(new Path(fileName), ONE_SSD); - INode inode = namesystem.getFSDirectory().getINode(fileName); - blockCollectionIds.add(inode.getId()); + dfs.satisfyStoragePolicy(new Path(fileName)); } StorageType[][] newtypes = @@ -266,9 +255,6 @@ public class TestStoragePolicySatisfier { // datanodes. startAdditionalDNs(config, 1, numOfDatanodes, newtypes, storagesPerDatanode, capacity, hdfsCluster); - for (long inodeId : blockCollectionIds) { - namesystem.getBlockManager().satisfyStoragePolicy(inodeId); - } hdfsCluster.triggerHeartbeats(); for (String fileName : files) { @@ -279,7 +265,7 @@ public class TestStoragePolicySatisfier { fileName, StorageType.DISK, 2, 30000, dfs); } - waitForBlocksMovementResult(blockCollectionIds.size(), 30000); + waitForBlocksMovementResult(files.size(), 30000); } finally { shutdownCluster(); } @@ -441,8 +427,6 @@ public class TestStoragePolicySatisfier { createCluster(); // Change policy to COLD dfs.setStoragePolicy(new Path(file), COLD); - FSNamesystem namesystem = hdfsCluster.getNamesystem(); - INode inode = namesystem.getFSDirectory().getINode(file); StorageType[][] newtypes = new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}}; @@ -451,7 +435,7 @@ public class TestStoragePolicySatisfier { startAdditionalDNs(config, 1, numOfDatanodes, newtypes, storagesPerDatanode, capacity, hdfsCluster); - namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); + dfs.satisfyStoragePolicy(new Path(file)); hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier identified that block to move to // ARCHIVE area. @@ -486,8 +470,6 @@ public class TestStoragePolicySatisfier { createCluster(); // Change policy to COLD dfs.setStoragePolicy(new Path(file), COLD); - FSNamesystem namesystem = hdfsCluster.getNamesystem(); - INode inode = namesystem.getFSDirectory().getINode(file); StorageType[][] newtypes = new StorageType[][]{{StorageType.DISK, StorageType.DISK}}; @@ -495,7 +477,7 @@ public class TestStoragePolicySatisfier { startAdditionalDNs(config, 1, numOfDatanodes, newtypes, storagesPerDatanode, capacity, hdfsCluster); - namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); + dfs.satisfyStoragePolicy(new Path(file)); hdfsCluster.triggerHeartbeats(); // No block movement will be scheduled as there is no target node @@ -600,47 +582,51 @@ public class TestStoragePolicySatisfier { */ @Test(timeout = 120000) public void testMoveWithBlockPinning() throws Exception { - config.setBoolean(DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED, true); - config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, - true); - hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(3) - .storageTypes( - new StorageType[][] {{StorageType.DISK, StorageType.DISK}, - {StorageType.DISK, StorageType.DISK}, - {StorageType.DISK, StorageType.DISK}}) - .build(); - - hdfsCluster.waitActive(); - dfs = hdfsCluster.getFileSystem(); + try{ + config.setBoolean(DFSConfigKeys.DFS_DATANODE_BLOCK_PINNING_ENABLED, true); + config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, + true); + hdfsCluster = new MiniDFSCluster.Builder(config).numDataNodes(3) + .storageTypes( + new StorageType[][] {{StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}}) + .build(); - // create a file with replication factor 3 and mark 2 pinned block - // locations. - final String file1 = createFileAndSimulateFavoredNodes(2); + hdfsCluster.waitActive(); + dfs = hdfsCluster.getFileSystem(); - // Change policy to COLD - dfs.setStoragePolicy(new Path(file1), COLD); - FSNamesystem namesystem = hdfsCluster.getNamesystem(); - INode inode = namesystem.getFSDirectory().getINode(file1); + // create a file with replication factor 3 and mark 2 pinned block + // locations. + final String file1 = createFileAndSimulateFavoredNodes(2); - StorageType[][] newtypes = - new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}, - {StorageType.ARCHIVE, StorageType.ARCHIVE}, - {StorageType.ARCHIVE, StorageType.ARCHIVE}}; - // Adding DISK based datanodes - startAdditionalDNs(config, 3, numOfDatanodes, newtypes, - storagesPerDatanode, capacity, hdfsCluster); + // Change policy to COLD + dfs.setStoragePolicy(new Path(file1), COLD); - namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); - hdfsCluster.triggerHeartbeats(); + StorageType[][] newtypes = + new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}}; + // Adding DISK based datanodes + startAdditionalDNs(config, 3, numOfDatanodes, newtypes, + storagesPerDatanode, capacity, hdfsCluster); - // No block movement will be scheduled as there is no target node available - // with the required storage type. - waitForAttemptedItems(1, 30000); - waitForBlocksMovementResult(1, 30000); - DFSTestUtil.waitExpectedStorageType( - file1, StorageType.ARCHIVE, 1, 30000, dfs); - DFSTestUtil.waitExpectedStorageType( - file1, StorageType.DISK, 2, 30000, dfs); + dfs.satisfyStoragePolicy(new Path(file1)); + hdfsCluster.triggerHeartbeats(); + + // No block movement will be scheduled as there is no target node + // available with the required storage type. + waitForAttemptedItems(1, 30000); + waitForBlocksMovementResult(1, 30000); + DFSTestUtil.waitExpectedStorageType( + file1, StorageType.ARCHIVE, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType( + file1, StorageType.DISK, 2, 30000, dfs); + } finally { + if (hdfsCluster != null) { + hdfsCluster.shutdown(); + } + } } /** @@ -682,10 +668,8 @@ public class TestStoragePolicySatisfier { // Change policy to COLD dfs.setStoragePolicy(new Path(file), COLD); - FSNamesystem namesystem = hdfsCluster.getNamesystem(); - INode inode = namesystem.getFSDirectory().getINode(file); - namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); + dfs.satisfyStoragePolicy(new Path(file)); hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier identified that block to move to // ARCHIVE area. @@ -723,10 +707,8 @@ public class TestStoragePolicySatisfier { // Change policy to ONE_SSD dfs.setStoragePolicy(new Path(file), ONE_SSD); - FSNamesystem namesystem = hdfsCluster.getNamesystem(); - INode inode = namesystem.getFSDirectory().getINode(file); - namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); + dfs.satisfyStoragePolicy(new Path(file)); hdfsCluster.triggerHeartbeats(); DFSTestUtil.waitExpectedStorageType( file, StorageType.SSD, 1, 30000, dfs); @@ -764,10 +746,7 @@ public class TestStoragePolicySatisfier { // Change policy to WARM dfs.setStoragePolicy(new Path(file), "WARM"); - FSNamesystem namesystem = hdfsCluster.getNamesystem(); - INode inode = namesystem.getFSDirectory().getINode(file); - - namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); + dfs.satisfyStoragePolicy(new Path(file)); hdfsCluster.triggerHeartbeats(); DFSTestUtil.waitExpectedStorageType( @@ -848,8 +827,6 @@ public class TestStoragePolicySatisfier { // Change policy to ONE_SSD dfs.setStoragePolicy(new Path(file), ONE_SSD); - FSNamesystem namesystem = hdfsCluster.getNamesystem(); - INode inode = namesystem.getFSDirectory().getINode(file); Path filePath = new Path("/testChooseInSameDatanode"); final FSDataOutputStream out = dfs.create(filePath, false, 100, (short) 1, 2 * DEFAULT_BLOCK_SIZE); @@ -872,7 +849,7 @@ public class TestStoragePolicySatisfier { for (DataNode dataNode : dataNodes) { DataNodeTestUtils.setHeartbeatsDisabledForTests(dataNode, true); } - namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); + dfs.satisfyStoragePolicy(new Path(file)); // Wait for items to be processed waitForAttemptedItems(1, 30000); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org