Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BDF2C200D51 for ; Fri, 22 Dec 2017 08:37:50 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id BC92A160BFD; Fri, 22 Dec 2017 07:37:50 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 69247160C2D for ; Fri, 22 Dec 2017 08:37:48 +0100 (CET) Received: (qmail 35787 invoked by uid 500); 22 Dec 2017 07:37:47 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 35349 invoked by uid 99); 22 Dec 2017 07:37:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Dec 2017 07:37:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5A5EDE04DB; Fri, 22 Dec 2017 07:37:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: umamahesh@apache.org To: common-commits@hadoop.apache.org Date: Fri, 22 Dec 2017 07:37:48 -0000 Message-Id: In-Reply-To: <0afdcfed665d4615a3768e41b7d1b0d1@git.apache.org> References: <0afdcfed665d4615a3768e41b7d1b0d1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] hadoop git commit: HDFS-12955: [SPS]: Move SPS classes to a separate package. Contributed by Rakesh R. archived-at: Fri, 22 Dec 2017 07:37:50 -0000 HDFS-12955: [SPS]: Move SPS classes to a separate package. Contributed by Rakesh R. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/34588002 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/34588002 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/34588002 Branch: refs/heads/HDFS-10285 Commit: 34588002e0b980181f04be70efc26bd405afc810 Parents: 73f7db6 Author: Uma Maheswara Rao G Authored: Thu Dec 21 23:35:56 2017 -0800 Committer: Uma Maheswara Rao G Committed: Thu Dec 21 23:35:56 2017 -0800 ---------------------------------------------------------------------- .../server/blockmanagement/BlockManager.java | 6 +- .../BlockStorageMovementAttemptedItems.java | 241 --- .../namenode/BlockStorageMovementNeeded.java | 574 ------ .../hdfs/server/namenode/FSNamesystem.java | 1 + .../hdfs/server/namenode/IntraNNSPSContext.java | 41 + .../server/namenode/StoragePolicySatisfier.java | 973 ---------- .../TestBlockStorageMovementAttemptedItems.java | 196 -- .../namenode/TestStoragePolicySatisfier.java | 1775 ------------------ ...stStoragePolicySatisfierWithStripedFile.java | 580 ------ 9 files changed, 46 insertions(+), 4341 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/34588002/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index a503c4b..79f9f7f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -89,11 +89,12 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; import org.apache.hadoop.hdfs.server.namenode.INodesInPath; +import org.apache.hadoop.hdfs.server.namenode.IntraNNSPSContext; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; -import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; +import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; @@ -479,7 +480,8 @@ public class BlockManager implements BlockStatsMXBean { conf.getBoolean( DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT); - sps = new StoragePolicySatisfier(namesystem, this, conf); + StoragePolicySatisfier.Context spsctxt = new IntraNNSPSContext(namesystem); + sps = new StoragePolicySatisfier(namesystem, this, conf, spsctxt); blockTokenSecretManager = createBlockTokenSecretManager(conf); this.maxCorruptFilesReturned = conf.getInt( http://git-wip-us.apache.org/repos/asf/hadoop/blob/34588002/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java deleted file mode 100644 index 643255f..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java +++ /dev/null @@ -1,241 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode; - -import static org.apache.hadoop.util.Time.monotonicNow; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; - -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.AttemptedItemInfo; -import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo; -import org.apache.hadoop.util.Daemon; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; - -/** - * A monitor class for checking whether block storage movements attempt - * completed or not. If this receives block storage movement attempt - * status(either success or failure) from DN then it will just remove the - * entries from tracking. If there is no DN reports about movement attempt - * finished for a longer time period, then such items will retries automatically - * after timeout. The default timeout would be 5 minutes. - */ -public class BlockStorageMovementAttemptedItems { - private static final Logger LOG = - LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class); - - /** - * A map holds the items which are already taken for blocks movements - * processing and sent to DNs. - */ - private final List storageMovementAttemptedItems; - private final List movementFinishedBlocks; - private volatile boolean monitorRunning = true; - private Daemon timerThread = null; - // - // It might take anywhere between 5 to 10 minutes before - // a request is timed out. - // - private long selfRetryTimeout = 5 * 60 * 1000; - - // - // It might take anywhere between 1 to 2 minutes before - // a request is timed out. - // - private long minCheckTimeout = 1 * 60 * 1000; // minimum value - private BlockStorageMovementNeeded blockStorageMovementNeeded; - - public BlockStorageMovementAttemptedItems(long recheckTimeout, - long selfRetryTimeout, - BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) { - if (recheckTimeout > 0) { - this.minCheckTimeout = Math.min(minCheckTimeout, recheckTimeout); - } - - this.selfRetryTimeout = selfRetryTimeout; - this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles; - storageMovementAttemptedItems = new ArrayList<>(); - movementFinishedBlocks = new ArrayList<>(); - } - - /** - * Add item to block storage movement attempted items map which holds the - * tracking/blockCollection id versus time stamp. - * - * @param itemInfo - * - tracking info - */ - public void add(AttemptedItemInfo itemInfo) { - synchronized (storageMovementAttemptedItems) { - storageMovementAttemptedItems.add(itemInfo); - } - } - - /** - * Add the storage movement attempt finished blocks to - * storageMovementFinishedBlocks. - * - * @param moveAttemptFinishedBlks - * storage movement attempt finished blocks - */ - public void addReportedMovedBlocks(Block[] moveAttemptFinishedBlks) { - if (moveAttemptFinishedBlks.length == 0) { - return; - } - synchronized (movementFinishedBlocks) { - movementFinishedBlocks.addAll(Arrays.asList(moveAttemptFinishedBlks)); - } - } - - /** - * Starts the monitor thread. - */ - public synchronized void start() { - monitorRunning = true; - timerThread = new Daemon(new BlocksStorageMovementAttemptMonitor()); - timerThread.setName("BlocksStorageMovementAttemptMonitor"); - timerThread.start(); - } - - /** - * Sets running flag to false. Also, this will interrupt monitor thread and - * clear all the queued up tasks. - */ - public synchronized void stop() { - monitorRunning = false; - if (timerThread != null) { - timerThread.interrupt(); - } - this.clearQueues(); - } - - /** - * Timed wait to stop monitor thread. - */ - synchronized void stopGracefully() { - if (timerThread == null) { - return; - } - if (monitorRunning) { - stop(); - } - try { - timerThread.join(3000); - } catch (InterruptedException ie) { - } - } - - /** - * A monitor class for checking block storage movement attempt status and long - * waiting items periodically. - */ - private class BlocksStorageMovementAttemptMonitor implements Runnable { - @Override - public void run() { - while (monitorRunning) { - try { - blockStorageMovementReportedItemsCheck(); - blocksStorageMovementUnReportedItemsCheck(); - Thread.sleep(minCheckTimeout); - } catch (InterruptedException ie) { - LOG.info("BlocksStorageMovementAttemptMonitor thread " - + "is interrupted.", ie); - } catch (IOException ie) { - LOG.warn("BlocksStorageMovementAttemptMonitor thread " - + "received exception and exiting.", ie); - } - } - } - } - - @VisibleForTesting - void blocksStorageMovementUnReportedItemsCheck() { - synchronized (storageMovementAttemptedItems) { - Iterator iter = storageMovementAttemptedItems - .iterator(); - long now = monotonicNow(); - while (iter.hasNext()) { - AttemptedItemInfo itemInfo = iter.next(); - if (now > itemInfo.getLastAttemptedOrReportedTime() - + selfRetryTimeout) { - Long blockCollectionID = itemInfo.getTrackId(); - synchronized (movementFinishedBlocks) { - ItemInfo candidate = new ItemInfo(itemInfo.getStartId(), - blockCollectionID, itemInfo.getRetryCount() + 1); - blockStorageMovementNeeded.add(candidate); - iter.remove(); - LOG.info("TrackID: {} becomes timed out and moved to needed " - + "retries queue for next iteration.", blockCollectionID); - } - } - } - - } - } - - @VisibleForTesting - void blockStorageMovementReportedItemsCheck() throws IOException { - synchronized (movementFinishedBlocks) { - Iterator finishedBlksIter = movementFinishedBlocks.iterator(); - while (finishedBlksIter.hasNext()) { - Block blk = finishedBlksIter.next(); - synchronized (storageMovementAttemptedItems) { - Iterator iterator = storageMovementAttemptedItems - .iterator(); - while (iterator.hasNext()) { - AttemptedItemInfo attemptedItemInfo = iterator.next(); - attemptedItemInfo.getBlocks().remove(blk); - if (attemptedItemInfo.getBlocks().isEmpty()) { - // TODO: try add this at front of the Queue, so that this element - // gets the chance first and can be cleaned from queue quickly as - // all movements already done. - blockStorageMovementNeeded.add(new ItemInfo(attemptedItemInfo - .getStartId(), attemptedItemInfo.getTrackId(), - attemptedItemInfo.getRetryCount() + 1)); - iterator.remove(); - } - } - } - // Remove attempted blocks from movementFinishedBlocks list. - finishedBlksIter.remove(); - } - } - } - - @VisibleForTesting - public int getMovementFinishedBlocksCount() { - return movementFinishedBlocks.size(); - } - - @VisibleForTesting - public int getAttemptedItemsCount() { - return storageMovementAttemptedItems.size(); - } - - public void clearQueues() { - movementFinishedBlocks.clear(); - storageMovementAttemptedItems.clear(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/34588002/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java deleted file mode 100644 index 89bcbff..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java +++ /dev/null @@ -1,574 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode; - -import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus; -import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; -import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo; -import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo; -import org.apache.hadoop.util.Daemon; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; - -/** - * A Class to track the block collection IDs (Inode's ID) for which physical - * storage movement needed as per the Namespace and StorageReports from DN. - * It scan the pending directories for which storage movement is required and - * schedule the block collection IDs for movement. It track the info of - * scheduled items and remove the SPS xAttr from the file/Directory once - * movement is success. - */ -@InterfaceAudience.Private -public class BlockStorageMovementNeeded { - - public static final Logger LOG = - LoggerFactory.getLogger(BlockStorageMovementNeeded.class); - - private final Queue storageMovementNeeded = - new LinkedList(); - - /** - * Map of startId and number of child's. Number of child's indicate the - * number of files pending to satisfy the policy. - */ - private final Map pendingWorkForDirectory = - new HashMap(); - - private final Map spsStatus = - new ConcurrentHashMap<>(); - - private final Namesystem namesystem; - - // List of pending dir to satisfy the policy - private final Queue spsDirsToBeTraveresed = new LinkedList(); - - private final StoragePolicySatisfier sps; - - private Daemon inodeIdCollector; - - private final int maxQueuedItem; - - // Amount of time to cache the SUCCESS status of path before turning it to - // NOT_AVAILABLE. - private static long statusClearanceElapsedTimeMs = 300000; - - public BlockStorageMovementNeeded(Namesystem namesystem, - StoragePolicySatisfier sps, int queueLimit) { - this.namesystem = namesystem; - this.sps = sps; - this.maxQueuedItem = queueLimit; - } - - /** - * Add the candidate to tracking list for which storage movement - * expected if necessary. - * - * @param trackInfo - * - track info for satisfy the policy - */ - public synchronized void add(ItemInfo trackInfo) { - spsStatus.put(trackInfo.getStartId(), - new StoragePolicySatisfyPathStatusInfo( - StoragePolicySatisfyPathStatus.IN_PROGRESS)); - storageMovementNeeded.add(trackInfo); - } - - /** - * Add the itemInfo to tracking list for which storage movement - * expected if necessary. - * @param startId - * - start id - * @param itemInfoList - * - List of child in the directory - */ - @VisibleForTesting - public synchronized void addAll(long startId, - List itemInfoList, boolean scanCompleted) { - storageMovementNeeded.addAll(itemInfoList); - DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId); - if (pendingWork == null) { - pendingWork = new DirPendingWorkInfo(); - pendingWorkForDirectory.put(startId, pendingWork); - } - pendingWork.addPendingWorkCount(itemInfoList.size()); - if (scanCompleted) { - pendingWork.markScanCompleted(); - } - } - - /** - * Gets the block collection id for which storage movements check necessary - * and make the movement if required. - * - * @return block collection ID - */ - public synchronized ItemInfo get() { - return storageMovementNeeded.poll(); - } - - public synchronized void addToPendingDirQueue(long id) { - spsStatus.put(id, new StoragePolicySatisfyPathStatusInfo( - StoragePolicySatisfyPathStatus.PENDING)); - spsDirsToBeTraveresed.add(id); - // Notify waiting FileInodeIdCollector thread about the newly - // added SPS path. - synchronized (spsDirsToBeTraveresed) { - spsDirsToBeTraveresed.notify(); - } - } - - /** - * Returns queue remaining capacity. - */ - public synchronized int remainingCapacity() { - int size = storageMovementNeeded.size(); - if (size >= maxQueuedItem) { - return 0; - } else { - return (maxQueuedItem - size); - } - } - - /** - * Returns queue size. - */ - public synchronized int size() { - return storageMovementNeeded.size(); - } - - public synchronized void clearAll() { - spsDirsToBeTraveresed.clear(); - storageMovementNeeded.clear(); - pendingWorkForDirectory.clear(); - } - - /** - * Decrease the pending child count for directory once one file blocks moved - * successfully. Remove the SPS xAttr if pending child count is zero. - */ - public synchronized void removeItemTrackInfo(ItemInfo trackInfo, - boolean isSuccess) throws IOException { - if (trackInfo.isDir()) { - // If track is part of some start inode then reduce the pending - // directory work count. - long startId = trackInfo.getStartId(); - INode inode = namesystem.getFSDirectory().getInode(startId); - if (inode == null) { - // directory deleted just remove it. - this.pendingWorkForDirectory.remove(startId); - updateStatus(startId, isSuccess); - } else { - DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId); - if (pendingWork != null) { - pendingWork.decrementPendingWorkCount(); - if (pendingWork.isDirWorkDone()) { - namesystem.removeXattr(startId, XATTR_SATISFY_STORAGE_POLICY); - pendingWorkForDirectory.remove(startId); - pendingWork.setFailure(!isSuccess); - updateStatus(startId, pendingWork.isPolicySatisfied()); - } - pendingWork.setFailure(isSuccess); - } - } - } else { - // Remove xAttr if trackID doesn't exist in - // storageMovementAttemptedItems or file policy satisfied. - namesystem.removeXattr(trackInfo.getTrackId(), - XATTR_SATISFY_STORAGE_POLICY); - updateStatus(trackInfo.getStartId(), isSuccess); - } - } - - public synchronized void clearQueue(long trackId) { - spsDirsToBeTraveresed.remove(trackId); - Iterator iterator = storageMovementNeeded.iterator(); - while (iterator.hasNext()) { - ItemInfo next = iterator.next(); - if (next.getStartId() == trackId) { - iterator.remove(); - } - } - pendingWorkForDirectory.remove(trackId); - } - - /** - * Mark inode status as SUCCESS in map. - */ - private void updateStatus(long startId, boolean isSuccess){ - StoragePolicySatisfyPathStatusInfo spsStatusInfo = - spsStatus.get(startId); - if (spsStatusInfo == null) { - spsStatusInfo = new StoragePolicySatisfyPathStatusInfo(); - spsStatus.put(startId, spsStatusInfo); - } - - if (isSuccess) { - spsStatusInfo.setSuccess(); - } else { - spsStatusInfo.setFailure(); - } - } - - /** - * Clean all the movements in spsDirsToBeTraveresed/storageMovementNeeded - * and notify to clean up required resources. - * @throws IOException - */ - public synchronized void clearQueuesWithNotification() { - // Remove xAttr from directories - Long trackId; - while ((trackId = spsDirsToBeTraveresed.poll()) != null) { - try { - // Remove xAttr for file - namesystem.removeXattr(trackId, XATTR_SATISFY_STORAGE_POLICY); - } catch (IOException ie) { - LOG.warn("Failed to remove SPS xattr for track id " + trackId, ie); - } - } - - // File's directly added to storageMovementNeeded, So try to remove - // xAttr for file - ItemInfo itemInfo; - while ((itemInfo = storageMovementNeeded.poll()) != null) { - try { - // Remove xAttr for file - if (!itemInfo.isDir()) { - namesystem.removeXattr(itemInfo.getTrackId(), - XATTR_SATISFY_STORAGE_POLICY); - } - } catch (IOException ie) { - LOG.warn( - "Failed to remove SPS xattr for track id " - + itemInfo.getTrackId(), ie); - } - } - this.clearAll(); - } - - /** - * Take dir tack ID from the spsDirsToBeTraveresed queue and collect child - * ID's to process for satisfy the policy. - */ - private class StorageMovementPendingInodeIdCollector extends FSTreeTraverser - implements Runnable { - - private int remainingCapacity = 0; - - private List currentBatch = new ArrayList<>(maxQueuedItem); - - StorageMovementPendingInodeIdCollector(FSDirectory dir) { - super(dir); - } - - @Override - public void run() { - LOG.info("Starting FileInodeIdCollector!."); - long lastStatusCleanTime = 0; - while (namesystem.isRunning() && sps.isRunning()) { - try { - if (!namesystem.isInSafeMode()) { - FSDirectory fsd = namesystem.getFSDirectory(); - Long startINodeId = spsDirsToBeTraveresed.poll(); - if (startINodeId == null) { - // Waiting for SPS path - synchronized (spsDirsToBeTraveresed) { - spsDirsToBeTraveresed.wait(5000); - } - } else { - INode startInode = fsd.getInode(startINodeId); - if (startInode != null) { - try { - remainingCapacity = remainingCapacity(); - spsStatus.put(startINodeId, - new StoragePolicySatisfyPathStatusInfo( - StoragePolicySatisfyPathStatus.IN_PROGRESS)); - readLock(); - traverseDir(startInode.asDirectory(), startINodeId, - HdfsFileStatus.EMPTY_NAME, - new SPSTraverseInfo(startINodeId)); - } finally { - readUnlock(); - } - // Mark startInode traverse is done - addAll(startInode.getId(), currentBatch, true); - currentBatch.clear(); - - // check if directory was empty and no child added to queue - DirPendingWorkInfo dirPendingWorkInfo = - pendingWorkForDirectory.get(startInode.getId()); - if (dirPendingWorkInfo.isDirWorkDone()) { - namesystem.removeXattr(startInode.getId(), - XATTR_SATISFY_STORAGE_POLICY); - pendingWorkForDirectory.remove(startInode.getId()); - updateStatus(startInode.getId(), true); - } - } - } - //Clear the SPS status if status is in SUCCESS more than 5 min. - if (Time.monotonicNow() - - lastStatusCleanTime > statusClearanceElapsedTimeMs) { - lastStatusCleanTime = Time.monotonicNow(); - cleanSpsStatus(); - } - } - } catch (Throwable t) { - LOG.warn("Exception while loading inodes to satisfy the policy", t); - } - } - } - - private synchronized void cleanSpsStatus() { - for (Iterator> it = - spsStatus.entrySet().iterator(); it.hasNext();) { - Entry entry = it.next(); - if (entry.getValue().canRemove()) { - it.remove(); - } - } - } - - @Override - protected void checkPauseForTesting() throws InterruptedException { - // TODO implement if needed - } - - @Override - protected boolean processFileInode(INode inode, TraverseInfo traverseInfo) - throws IOException, InterruptedException { - assert getFSDirectory().hasReadLock(); - if (LOG.isTraceEnabled()) { - LOG.trace("Processing {} for statisy the policy", - inode.getFullPathName()); - } - if (!inode.isFile()) { - return false; - } - if (inode.isFile() && inode.asFile().numBlocks() != 0) { - currentBatch.add(new ItemInfo( - ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId())); - remainingCapacity--; - } - return true; - } - - @Override - protected boolean canSubmitCurrentBatch() { - return remainingCapacity <= 0; - } - - @Override - protected void checkINodeReady(long startId) throws IOException { - FSNamesystem fsn = ((FSNamesystem) namesystem); - fsn.checkNameNodeSafeMode("NN is in safe mode," - + "cannot satisfy the policy."); - // SPS work should be cancelled when NN goes to standby. Just - // double checking for sanity. - fsn.checkOperation(NameNode.OperationCategory.WRITE); - } - - @Override - protected void submitCurrentBatch(long startId) - throws IOException, InterruptedException { - // Add current child's to queue - addAll(startId, currentBatch, false); - currentBatch.clear(); - } - - @Override - protected void throttle() throws InterruptedException { - assert !getFSDirectory().hasReadLock(); - assert !namesystem.hasReadLock(); - if (LOG.isDebugEnabled()) { - LOG.debug("StorageMovementNeeded queue remaining capacity is zero," - + " waiting for some free slots."); - } - remainingCapacity = remainingCapacity(); - // wait for queue to be free - while (remainingCapacity <= 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Waiting for storageMovementNeeded queue to be free!"); - } - Thread.sleep(5000); - remainingCapacity = remainingCapacity(); - } - } - - @Override - protected boolean canTraverseDir(INode inode) throws IOException { - return true; - } - } - - /** - * Info for directory recursive scan. - */ - public static class DirPendingWorkInfo { - - private int pendingWorkCount = 0; - private boolean fullyScanned = false; - private boolean success = true; - - /** - * Increment the pending work count for directory. - */ - public synchronized void addPendingWorkCount(int count) { - this.pendingWorkCount = this.pendingWorkCount + count; - } - - /** - * Decrement the pending work count for directory one track info is - * completed. - */ - public synchronized void decrementPendingWorkCount() { - this.pendingWorkCount--; - } - - /** - * Return true if all the pending work is done and directory fully - * scanned, otherwise false. - */ - public synchronized boolean isDirWorkDone() { - return (pendingWorkCount <= 0 && fullyScanned); - } - - /** - * Mark directory scan is completed. - */ - public synchronized void markScanCompleted() { - this.fullyScanned = true; - } - - /** - * Return true if all the files block movement is success, otherwise false. - */ - public boolean isPolicySatisfied() { - return success; - } - - /** - * Set directory SPS status failed. - */ - public void setFailure(boolean failure) { - this.success = this.success || failure; - } - } - - public void init() { - inodeIdCollector = new Daemon(new StorageMovementPendingInodeIdCollector( - namesystem.getFSDirectory())); - inodeIdCollector.setName("FileInodeIdCollector"); - inodeIdCollector.start(); - } - - public void close() { - if (inodeIdCollector != null) { - inodeIdCollector.interrupt(); - } - } - - class SPSTraverseInfo extends TraverseInfo { - private long startId; - - SPSTraverseInfo(long startId) { - this.startId = startId; - } - - public long getStartId() { - return startId; - } - } - - /** - * Represent the file/directory block movement status. - */ - static class StoragePolicySatisfyPathStatusInfo { - private StoragePolicySatisfyPathStatus status = - StoragePolicySatisfyPathStatus.NOT_AVAILABLE; - private long lastStatusUpdateTime; - - StoragePolicySatisfyPathStatusInfo() { - this.lastStatusUpdateTime = 0; - } - - StoragePolicySatisfyPathStatusInfo(StoragePolicySatisfyPathStatus status) { - this.status = status; - this.lastStatusUpdateTime = 0; - } - - private void setSuccess() { - this.status = StoragePolicySatisfyPathStatus.SUCCESS; - this.lastStatusUpdateTime = Time.monotonicNow(); - } - - private void setFailure() { - this.status = StoragePolicySatisfyPathStatus.FAILURE; - this.lastStatusUpdateTime = Time.monotonicNow(); - } - - private StoragePolicySatisfyPathStatus getStatus() { - return status; - } - - /** - * Return true if SUCCESS status cached more then 5 min. - */ - private boolean canRemove() { - return (StoragePolicySatisfyPathStatus.SUCCESS == status - || StoragePolicySatisfyPathStatus.FAILURE == status) - && (Time.monotonicNow() - - lastStatusUpdateTime) > statusClearanceElapsedTimeMs; - } - } - - public StoragePolicySatisfyPathStatus getStatus(long id) { - StoragePolicySatisfyPathStatusInfo spsStatusInfo = spsStatus.get(id); - if(spsStatusInfo == null){ - return StoragePolicySatisfyPathStatus.NOT_AVAILABLE; - } - return spsStatusInfo.getStatus(); - } - - @VisibleForTesting - public static void setStatusClearanceElapsedTimeMs( - long statusClearanceElapsedTimeMs) { - BlockStorageMovementNeeded.statusClearanceElapsedTimeMs = - statusClearanceElapsedTimeMs; - } - - @VisibleForTesting - public static long getStatusClearanceElapsedTimeMs() { - return statusClearanceElapsedTimeMs; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/34588002/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 897e61e..b19d2c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -257,6 +257,7 @@ import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager; +import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; http://git-wip-us.apache.org/repos/asf/hadoop/blob/34588002/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java new file mode 100644 index 0000000..111cabb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/IntraNNSPSContext.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier; + +/** + * This class is the Namenode implementation for analyzing the file blocks which + * are expecting to change its storages and assigning the block storage + * movements to satisfy the storage policy. + */ +// TODO: Now, added one API which is required for sps package. Will refine +// this interface via HDFS-12911. +public class IntraNNSPSContext implements StoragePolicySatisfier.Context { + private final Namesystem namesystem; + + public IntraNNSPSContext(Namesystem namesystem) { + this.namesystem = namesystem; + } + + @Override + public int getNumLiveDataNodes() { + return namesystem.getFSDirectory().getBlockManager().getDatanodeManager() + .getNumLiveDataNodes(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/34588002/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java deleted file mode 100644 index 972e744..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java +++ /dev/null @@ -1,973 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode; - -import static org.apache.hadoop.util.Time.monotonicNow; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.EnumMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfyPathStatus; -import org.apache.hadoop.hdfs.server.balancer.Matcher; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped.StorageAndBlockIndex; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; -import org.apache.hadoop.hdfs.server.protocol.StorageReport; -import org.apache.hadoop.hdfs.util.StripedBlockUtil; -import org.apache.hadoop.util.Daemon; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; - -/** - * Setting storagePolicy on a file after the file write will only update the new - * storage policy type in Namespace, but physical block storage movement will - * not happen until user runs "Mover Tool" explicitly for such files. The - * StoragePolicySatisfier Daemon thread implemented for addressing the case - * where users may want to physically move the blocks by HDFS itself instead of - * running mover tool explicitly. Just calling client API to - * satisfyStoragePolicy on a file/dir will automatically trigger to move its - * physical storage locations as expected in asynchronous manner. Here Namenode - * will pick the file blocks which are expecting to change its storages, then it - * will build the mapping of source block location and expected storage type and - * location to move. After that this class will also prepare commands to send to - * Datanode for processing the physical block movements. - */ -@InterfaceAudience.Private -public class StoragePolicySatisfier implements Runnable { - public static final Logger LOG = - LoggerFactory.getLogger(StoragePolicySatisfier.class); - private Daemon storagePolicySatisfierThread; - private final Namesystem namesystem; - private final BlockManager blockManager; - private final BlockStorageMovementNeeded storageMovementNeeded; - private final BlockStorageMovementAttemptedItems storageMovementsMonitor; - private volatile boolean isRunning = false; - private int spsWorkMultiplier; - private long blockCount = 0L; - private int blockMovementMaxRetry; - /** - * Represents the collective analysis status for all blocks. - */ - private static class BlocksMovingAnalysis { - - enum Status { - // Represents that, the analysis skipped due to some conditions. A such - // condition is if block collection is in incomplete state. - ANALYSIS_SKIPPED_FOR_RETRY, - // Represents that few or all blocks found respective target to do - // the storage movement. - BLOCKS_TARGETS_PAIRED, - // Represents that none of the blocks found respective target to do - // the storage movement. - NO_BLOCKS_TARGETS_PAIRED, - // Represents that, none of the blocks found for block storage movements. - BLOCKS_ALREADY_SATISFIED, - // Represents that, the analysis skipped due to some conditions. - // Example conditions are if no blocks really exists in block collection - // or - // if analysis is not required on ec files with unsuitable storage - // policies - BLOCKS_TARGET_PAIRING_SKIPPED, - // Represents that, All the reported blocks are satisfied the policy but - // some of the blocks are low redundant. - FEW_LOW_REDUNDANCY_BLOCKS - } - - private Status status = null; - private List assignedBlocks = null; - - BlocksMovingAnalysis(Status status, List blockMovingInfo) { - this.status = status; - this.assignedBlocks = blockMovingInfo; - } - } - - public StoragePolicySatisfier(final Namesystem namesystem, - final BlockManager blkManager, Configuration conf) { - this.namesystem = namesystem; - this.storageMovementNeeded = new BlockStorageMovementNeeded(namesystem, - this, conf.getInt( - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT)); - this.blockManager = blkManager; - this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems( - conf.getLong( - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT), - conf.getLong( - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY, - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT), - storageMovementNeeded); - this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(conf); - this.blockMovementMaxRetry = conf.getInt( - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_KEY, - DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT); - } - - /** - * Start storage policy satisfier demon thread. Also start block storage - * movements monitor for retry the attempts if needed. - */ - public synchronized void start(boolean reconfigStart) { - isRunning = true; - if (checkIfMoverRunning()) { - isRunning = false; - LOG.error( - "Stopping StoragePolicySatisfier thread " + "as Mover ID file " - + HdfsServerConstants.MOVER_ID_PATH.toString() - + " been opened. Maybe a Mover instance is running!"); - return; - } - if (reconfigStart) { - LOG.info("Starting StoragePolicySatisfier, as admin requested to " - + "start it."); - } else { - LOG.info("Starting StoragePolicySatisfier."); - } - - // Ensure that all the previously submitted block movements(if any) have to - // be stopped in all datanodes. - addDropSPSWorkCommandsToAllDNs(); - storageMovementNeeded.init(); - storagePolicySatisfierThread = new Daemon(this); - storagePolicySatisfierThread.setName("StoragePolicySatisfier"); - storagePolicySatisfierThread.start(); - this.storageMovementsMonitor.start(); - } - - /** - * Disables storage policy satisfier by stopping its services. - * - * @param forceStop - * true represents that it should stop SPS service by clearing all - * pending SPS work - */ - public synchronized void disable(boolean forceStop) { - isRunning = false; - - if (storagePolicySatisfierThread == null) { - return; - } - - storageMovementNeeded.close(); - - storagePolicySatisfierThread.interrupt(); - this.storageMovementsMonitor.stop(); - if (forceStop) { - storageMovementNeeded.clearQueuesWithNotification(); - addDropSPSWorkCommandsToAllDNs(); - } else { - LOG.info("Stopping StoragePolicySatisfier."); - } - } - - /** - * Timed wait to stop storage policy satisfier daemon threads. - */ - public synchronized void stopGracefully() { - if (isRunning) { - disable(true); - } - this.storageMovementsMonitor.stopGracefully(); - - if (storagePolicySatisfierThread == null) { - return; - } - try { - storagePolicySatisfierThread.join(3000); - } catch (InterruptedException ie) { - } - } - - /** - * Check whether StoragePolicySatisfier is running. - * @return true if running - */ - public boolean isRunning() { - return isRunning; - } - - // Return true if a Mover instance is running - private boolean checkIfMoverRunning() { - String moverId = HdfsServerConstants.MOVER_ID_PATH.toString(); - return namesystem.isFileOpenedForWrite(moverId); - } - - /** - * Adding drop commands to all datanodes to stop performing the satisfier - * block movements, if any. - */ - private void addDropSPSWorkCommandsToAllDNs() { - this.blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs(); - } - - @Override - public void run() { - while (namesystem.isRunning() && isRunning) { - try { - if (!namesystem.isInSafeMode()) { - ItemInfo itemInfo = storageMovementNeeded.get(); - if (itemInfo != null) { - if(itemInfo.getRetryCount() >= blockMovementMaxRetry){ - LOG.info("Failed to satisfy the policy after " - + blockMovementMaxRetry + " retries. Removing inode " - + itemInfo.getTrackId() + " from the queue"); - storageMovementNeeded.removeItemTrackInfo(itemInfo, false); - continue; - } - long trackId = itemInfo.getTrackId(); - BlockCollection blockCollection; - BlocksMovingAnalysis status = null; - try { - namesystem.readLock(); - blockCollection = namesystem.getBlockCollection(trackId); - // Check blockCollectionId existence. - if (blockCollection == null) { - // File doesn't exists (maybe got deleted), remove trackId from - // the queue - storageMovementNeeded.removeItemTrackInfo(itemInfo, true); - } else { - status = - analyseBlocksStorageMovementsAndAssignToDN( - blockCollection); - } - } finally { - namesystem.readUnlock(); - } - if (blockCollection != null) { - switch (status.status) { - // Just add to monitor, so it will be retried after timeout - case ANALYSIS_SKIPPED_FOR_RETRY: - // Just add to monitor, so it will be tracked for report and - // be removed on storage movement attempt finished report. - case BLOCKS_TARGETS_PAIRED: - this.storageMovementsMonitor.add(new AttemptedItemInfo(itemInfo - .getStartId(), itemInfo.getTrackId(), monotonicNow(), - status.assignedBlocks, itemInfo.getRetryCount())); - break; - case NO_BLOCKS_TARGETS_PAIRED: - if (LOG.isDebugEnabled()) { - LOG.debug("Adding trackID " + trackId - + " back to retry queue as none of the blocks" - + " found its eligible targets."); - } - itemInfo.retryCount++; - this.storageMovementNeeded.add(itemInfo); - break; - case FEW_LOW_REDUNDANCY_BLOCKS: - if (LOG.isDebugEnabled()) { - LOG.debug("Adding trackID " + trackId - + " back to retry queue as some of the blocks" - + " are low redundant."); - } - this.storageMovementNeeded.add(itemInfo); - break; - // Just clean Xattrs - case BLOCKS_TARGET_PAIRING_SKIPPED: - case BLOCKS_ALREADY_SATISFIED: - default: - LOG.info("Block analysis skipped or blocks already satisfied" - + " with storages. So, Cleaning up the Xattrs."); - storageMovementNeeded.removeItemTrackInfo(itemInfo, true); - break; - } - } - } - } - int numLiveDn = namesystem.getFSDirectory().getBlockManager() - .getDatanodeManager().getNumLiveDataNodes(); - if (storageMovementNeeded.size() == 0 - || blockCount > (numLiveDn * spsWorkMultiplier)) { - Thread.sleep(3000); - blockCount = 0L; - } - } catch (Throwable t) { - handleException(t); - } - } - } - - private void handleException(Throwable t) { - // double check to avoid entering into synchronized block. - if (isRunning) { - synchronized (this) { - if (isRunning) { - isRunning = false; - // Stopping monitor thread and clearing queues as well - this.clearQueues(); - this.storageMovementsMonitor.stopGracefully(); - if (!namesystem.isRunning()) { - LOG.info("Stopping StoragePolicySatisfier."); - if (!(t instanceof InterruptedException)) { - LOG.info("StoragePolicySatisfier received an exception" - + " while shutting down.", t); - } - return; - } - } - } - } - LOG.error("StoragePolicySatisfier thread received runtime exception. " - + "Stopping Storage policy satisfier work", t); - return; - } - - private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN( - BlockCollection blockCollection) { - BlocksMovingAnalysis.Status status = - BlocksMovingAnalysis.Status.BLOCKS_ALREADY_SATISFIED; - byte existingStoragePolicyID = blockCollection.getStoragePolicyID(); - BlockStoragePolicy existingStoragePolicy = - blockManager.getStoragePolicy(existingStoragePolicyID); - if (!blockCollection.getLastBlock().isComplete()) { - // Postpone, currently file is under construction - // So, should we add back? or leave it to user - LOG.info("BlockCollectionID: {} file is under construction. So, postpone" - + " this to the next retry iteration", blockCollection.getId()); - return new BlocksMovingAnalysis( - BlocksMovingAnalysis.Status.ANALYSIS_SKIPPED_FOR_RETRY, - new ArrayList<>()); - } - - BlockInfo[] blocks = blockCollection.getBlocks(); - if (blocks.length == 0) { - LOG.info("BlockCollectionID: {} file is not having any blocks." - + " So, skipping the analysis.", blockCollection.getId()); - return new BlocksMovingAnalysis( - BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED, - new ArrayList<>()); - } - List blockMovingInfos = new ArrayList(); - - for (int i = 0; i < blocks.length; i++) { - BlockInfo blockInfo = blocks[i]; - List expectedStorageTypes; - if (blockInfo.isStriped()) { - if (ErasureCodingPolicyManager - .checkStoragePolicySuitableForECStripedMode( - existingStoragePolicyID)) { - expectedStorageTypes = existingStoragePolicy - .chooseStorageTypes((short) blockInfo.getCapacity()); - } else { - // Currently we support only limited policies (HOT, COLD, ALLSSD) - // for EC striped mode files. SPS will ignore to move the blocks if - // the storage policy is not in EC Striped mode supported policies - LOG.warn("The storage policy " + existingStoragePolicy.getName() - + " is not suitable for Striped EC files. " - + "So, ignoring to move the blocks"); - return new BlocksMovingAnalysis( - BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED, - new ArrayList<>()); - } - } else { - expectedStorageTypes = existingStoragePolicy - .chooseStorageTypes(blockInfo.getReplication()); - } - - DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo); - StorageType[] storageTypes = new StorageType[storages.length]; - for (int j = 0; j < storages.length; j++) { - DatanodeStorageInfo datanodeStorageInfo = storages[j]; - StorageType storageType = datanodeStorageInfo.getStorageType(); - storageTypes[j] = storageType; - } - List existing = - new LinkedList(Arrays.asList(storageTypes)); - if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes, - existing, true)) { - boolean blocksPaired = computeBlockMovingInfos(blockMovingInfos, - blockInfo, expectedStorageTypes, existing, storages); - if (blocksPaired) { - status = BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED; - } else { - // none of the blocks found its eligible targets for satisfying the - // storage policy. - status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED; - } - } else { - if (blockManager.hasLowRedundancyBlocks(blockCollection)) { - status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS; - } - } - } - - List assignedBlockIds = new ArrayList(); - for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { - // Check for at least one block storage movement has been chosen - if (blkMovingInfo.getTarget() != null) { - // assign block storage movement task to the target node - ((DatanodeDescriptor) blkMovingInfo.getTarget()) - .addBlocksToMoveStorage(blkMovingInfo); - LOG.debug("BlockMovingInfo: {}", blkMovingInfo); - assignedBlockIds.add(blkMovingInfo.getBlock()); - blockCount++; - } - } - return new BlocksMovingAnalysis(status, assignedBlockIds); - } - - /** - * Compute the list of block moving information corresponding to the given - * blockId. This will check that each block location of the given block is - * satisfying the expected storage policy. If block location is not satisfied - * the policy then find out the target node with the expected storage type to - * satisfy the storage policy. - * - * @param blockMovingInfos - * - list of block source and target node pair - * @param blockInfo - * - block details - * @param expectedStorageTypes - * - list of expected storage type to satisfy the storage policy - * @param existing - * - list to get existing storage types - * @param storages - * - available storages - * @return false if some of the block locations failed to find target node to - * satisfy the storage policy, true otherwise - */ - private boolean computeBlockMovingInfos( - List blockMovingInfos, BlockInfo blockInfo, - List expectedStorageTypes, List existing, - DatanodeStorageInfo[] storages) { - boolean foundMatchingTargetNodesForBlock = true; - if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes, - existing, true)) { - List sourceWithStorageMap = - new ArrayList(); - List existingBlockStorages = - new ArrayList(Arrays.asList(storages)); - // if expected type exists in source node already, local movement would be - // possible, so lets find such sources first. - Iterator iterator = existingBlockStorages.iterator(); - while (iterator.hasNext()) { - DatanodeStorageInfo datanodeStorageInfo = iterator.next(); - if (checkSourceAndTargetTypeExists( - datanodeStorageInfo.getDatanodeDescriptor(), existing, - expectedStorageTypes)) { - sourceWithStorageMap - .add(new StorageTypeNodePair(datanodeStorageInfo.getStorageType(), - datanodeStorageInfo.getDatanodeDescriptor())); - iterator.remove(); - existing.remove(datanodeStorageInfo.getStorageType()); - } - } - - // Let's find sources for existing types left. - for (StorageType existingType : existing) { - iterator = existingBlockStorages.iterator(); - while (iterator.hasNext()) { - DatanodeStorageInfo datanodeStorageInfo = iterator.next(); - StorageType storageType = datanodeStorageInfo.getStorageType(); - if (storageType == existingType) { - iterator.remove(); - sourceWithStorageMap.add(new StorageTypeNodePair(storageType, - datanodeStorageInfo.getDatanodeDescriptor())); - break; - } - } - } - - StorageTypeNodeMap locsForExpectedStorageTypes = - findTargetsForExpectedStorageTypes(expectedStorageTypes); - - foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove( - blockMovingInfos, blockInfo, sourceWithStorageMap, - expectedStorageTypes, locsForExpectedStorageTypes); - } - return foundMatchingTargetNodesForBlock; - } - - /** - * Find the good target node for each source node for which block storages was - * misplaced. - * - * @param blockMovingInfos - * - list of block source and target node pair - * @param blockInfo - * - Block - * @param sourceWithStorageList - * - Source Datanode with storages list - * @param expected - * - Expecting storages to move - * @param locsForExpectedStorageTypes - * - Available DNs for expected storage types - * @return false if some of the block locations failed to find target node to - * satisfy the storage policy - */ - private boolean findSourceAndTargetToMove( - List blockMovingInfos, BlockInfo blockInfo, - List sourceWithStorageList, - List expected, - StorageTypeNodeMap locsForExpectedStorageTypes) { - boolean foundMatchingTargetNodesForBlock = true; - List excludeNodes = new ArrayList<>(); - - // Looping over all the source node locations and choose the target - // storage within same node if possible. This is done separately to - // avoid choosing a target which already has this block. - for (int i = 0; i < sourceWithStorageList.size(); i++) { - StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i); - - // Check whether the block replica is already placed in the expected - // storage type in this source datanode. - if (!expected.contains(existingTypeNodePair.storageType)) { - StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode( - blockInfo, existingTypeNodePair.dn, expected); - if (chosenTarget != null) { - if (blockInfo.isStriped()) { - buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn, - existingTypeNodePair.storageType, chosenTarget.dn, - chosenTarget.storageType, blockMovingInfos); - } else { - buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn, - existingTypeNodePair.storageType, chosenTarget.dn, - chosenTarget.storageType, blockMovingInfos); - } - expected.remove(chosenTarget.storageType); - // TODO: We can increment scheduled block count for this node? - } - } - // To avoid choosing this excludeNodes as targets later - excludeNodes.add(existingTypeNodePair.dn); - } - - // Looping over all the source node locations. Choose a remote target - // storage node if it was not found out within same node. - for (int i = 0; i < sourceWithStorageList.size(); i++) { - StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i); - StorageTypeNodePair chosenTarget = null; - // Chosen the target storage within same datanode. So just skipping this - // source node. - if (checkIfAlreadyChosen(blockMovingInfos, existingTypeNodePair.dn)) { - continue; - } - if (chosenTarget == null && blockManager.getDatanodeManager() - .getNetworkTopology().isNodeGroupAware()) { - chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn, - expected, Matcher.SAME_NODE_GROUP, locsForExpectedStorageTypes, - excludeNodes); - } - - // Then, match nodes on the same rack - if (chosenTarget == null) { - chosenTarget = - chooseTarget(blockInfo, existingTypeNodePair.dn, expected, - Matcher.SAME_RACK, locsForExpectedStorageTypes, excludeNodes); - } - - if (chosenTarget == null) { - chosenTarget = - chooseTarget(blockInfo, existingTypeNodePair.dn, expected, - Matcher.ANY_OTHER, locsForExpectedStorageTypes, excludeNodes); - } - if (null != chosenTarget) { - if (blockInfo.isStriped()) { - buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn, - existingTypeNodePair.storageType, chosenTarget.dn, - chosenTarget.storageType, blockMovingInfos); - } else { - buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn, - existingTypeNodePair.storageType, chosenTarget.dn, - chosenTarget.storageType, blockMovingInfos); - } - - expected.remove(chosenTarget.storageType); - excludeNodes.add(chosenTarget.dn); - // TODO: We can increment scheduled block count for this node? - } else { - LOG.warn( - "Failed to choose target datanode for the required" - + " storage types {}, block:{}, existing storage type:{}", - expected, blockInfo, existingTypeNodePair.storageType); - } - } - - if (expected.size() > 0) { - foundMatchingTargetNodesForBlock = false; - } - - return foundMatchingTargetNodesForBlock; - } - - private boolean checkIfAlreadyChosen(List blockMovingInfos, - DatanodeDescriptor dn) { - for (BlockMovingInfo blockMovingInfo : blockMovingInfos) { - if (blockMovingInfo.getSource().equals(dn)) { - return true; - } - } - return false; - } - - private void buildContinuousBlockMovingInfos(BlockInfo blockInfo, - DatanodeInfo sourceNode, StorageType sourceStorageType, - DatanodeInfo targetNode, StorageType targetStorageType, - List blkMovingInfos) { - Block blk = new Block(blockInfo.getBlockId(), blockInfo.getNumBytes(), - blockInfo.getGenerationStamp()); - BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode, - targetNode, sourceStorageType, targetStorageType); - blkMovingInfos.add(blkMovingInfo); - } - - private void buildStripedBlockMovingInfos(BlockInfo blockInfo, - DatanodeInfo sourceNode, StorageType sourceStorageType, - DatanodeInfo targetNode, StorageType targetStorageType, - List blkMovingInfos) { - // For a striped block, it needs to construct internal block at the given - // index of a block group. Here it is iterating over all the block indices - // and construct internal blocks which can be then considered for block - // movement. - BlockInfoStriped sBlockInfo = (BlockInfoStriped) blockInfo; - for (StorageAndBlockIndex si : sBlockInfo.getStorageAndIndexInfos()) { - if (si.getBlockIndex() >= 0) { - DatanodeDescriptor dn = si.getStorage().getDatanodeDescriptor(); - if (sourceNode.equals(dn)) { - // construct internal block - long blockId = blockInfo.getBlockId() + si.getBlockIndex(); - long numBytes = StripedBlockUtil.getInternalBlockLength( - sBlockInfo.getNumBytes(), sBlockInfo.getCellSize(), - sBlockInfo.getDataBlockNum(), si.getBlockIndex()); - Block blk = new Block(blockId, numBytes, - blockInfo.getGenerationStamp()); - BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode, - targetNode, sourceStorageType, targetStorageType); - blkMovingInfos.add(blkMovingInfo); - } - } - } - } - - /** - * Choose the target storage within same datanode if possible. - * - * @param block - * - block info - * @param source - * - source datanode - * @param targetTypes - * - list of target storage types - */ - private StorageTypeNodePair chooseTargetTypeInSameNode(Block block, - DatanodeDescriptor source, List targetTypes) { - for (StorageType t : targetTypes) { - DatanodeStorageInfo chooseStorage4Block = - source.chooseStorage4Block(t, block.getNumBytes()); - if (chooseStorage4Block != null) { - return new StorageTypeNodePair(t, source); - } - } - return null; - } - - private StorageTypeNodePair chooseTarget(Block block, - DatanodeDescriptor source, List targetTypes, Matcher matcher, - StorageTypeNodeMap locsForExpectedStorageTypes, - List excludeNodes) { - for (StorageType t : targetTypes) { - List nodesWithStorages = - locsForExpectedStorageTypes.getNodesWithStorages(t); - if (nodesWithStorages == null || nodesWithStorages.isEmpty()) { - continue; // no target nodes with the required storage type. - } - Collections.shuffle(nodesWithStorages); - for (DatanodeDescriptor target : nodesWithStorages) { - if (!excludeNodes.contains(target) && matcher.match( - blockManager.getDatanodeManager().getNetworkTopology(), source, - target)) { - if (null != target.chooseStorage4Block(t, block.getNumBytes())) { - return new StorageTypeNodePair(t, target); - } - } - } - } - return null; - } - - private static class StorageTypeNodePair { - private StorageType storageType = null; - private DatanodeDescriptor dn = null; - - StorageTypeNodePair(StorageType storageType, DatanodeDescriptor dn) { - this.storageType = storageType; - this.dn = dn; - } - } - - private StorageTypeNodeMap findTargetsForExpectedStorageTypes( - List expected) { - StorageTypeNodeMap targetMap = new StorageTypeNodeMap(); - List reports = blockManager.getDatanodeManager() - .getDatanodeListForReport(DatanodeReportType.LIVE); - for (DatanodeDescriptor dn : reports) { - StorageReport[] storageReports = dn.getStorageReports(); - for (StorageReport storageReport : storageReports) { - StorageType t = storageReport.getStorage().getStorageType(); - if (expected.contains(t)) { - final long maxRemaining = getMaxRemaining(dn.getStorageReports(), t); - if (maxRemaining > 0L) { - targetMap.add(t, dn); - } - } - } - } - return targetMap; - } - - private static long getMaxRemaining(StorageReport[] storageReports, - StorageType t) { - long max = 0L; - for (StorageReport r : storageReports) { - if (r.getStorage().getStorageType() == t) { - if (r.getRemaining() > max) { - max = r.getRemaining(); - } - } - } - return max; - } - - private boolean checkSourceAndTargetTypeExists(DatanodeDescriptor dn, - List existing, List expectedStorageTypes) { - DatanodeStorageInfo[] allDNStorageInfos = dn.getStorageInfos(); - boolean isExpectedTypeAvailable = false; - boolean isExistingTypeAvailable = false; - for (DatanodeStorageInfo dnInfo : allDNStorageInfos) { - StorageType storageType = dnInfo.getStorageType(); - if (existing.contains(storageType)) { - isExistingTypeAvailable = true; - } - if (expectedStorageTypes.contains(storageType)) { - isExpectedTypeAvailable = true; - } - } - return isExistingTypeAvailable && isExpectedTypeAvailable; - } - - private static class StorageTypeNodeMap { - private final EnumMap> typeNodeMap = - new EnumMap>(StorageType.class); - - private void add(StorageType t, DatanodeDescriptor dn) { - List nodesWithStorages = getNodesWithStorages(t); - LinkedList value = null; - if (nodesWithStorages == null) { - value = new LinkedList(); - value.add(dn); - typeNodeMap.put(t, value); - } else { - nodesWithStorages.add(dn); - } - } - - /** - * @param type - * - Storage type - * @return datanodes which has the given storage type - */ - private List getNodesWithStorages(StorageType type) { - return typeNodeMap.get(type); - } - } - - /** - * Receives set of storage movement attempt finished blocks report. - * - * @param moveAttemptFinishedBlks - * set of storage movement attempt finished blocks. - */ - void handleStorageMovementAttemptFinishedBlks( - BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks) { - if (moveAttemptFinishedBlks.getBlocks().length <= 0) { - return; - } - storageMovementsMonitor - .addReportedMovedBlocks(moveAttemptFinishedBlks.getBlocks()); - } - - @VisibleForTesting - BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() { - return storageMovementsMonitor; - } - - /** - * Clear the queues from to be storage movement needed lists and items tracked - * in storage movement monitor. - */ - public void clearQueues() { - LOG.warn("Clearing all the queues from StoragePolicySatisfier. So, " - + "user requests on satisfying block storages would be discarded."); - storageMovementNeeded.clearAll(); - } - - /** - * Set file inode in queue for which storage movement needed for its blocks. - * - * @param inodeId - * - file inode/blockcollection id. - */ - public void satisfyStoragePolicy(Long inodeId) { - //For file startId and trackId is same - storageMovementNeeded.add(new ItemInfo(inodeId, inodeId)); - if (LOG.isDebugEnabled()) { - LOG.debug("Added track info for inode {} to block " - + "storageMovementNeeded queue", inodeId); - } - } - - public void addInodeToPendingDirQueue(long id) { - storageMovementNeeded.addToPendingDirQueue(id); - } - - /** - * Clear queues for given track id. - */ - public void clearQueue(long trackId) { - storageMovementNeeded.clearQueue(trackId); - } - - /** - * ItemInfo is a file info object for which need to satisfy the - * policy. - */ - public static class ItemInfo { - private long startId; - private long trackId; - private int retryCount; - - public ItemInfo(long startId, long trackId) { - this.startId = startId; - this.trackId = trackId; - //set 0 when item is getting added first time in queue. - this.retryCount = 0; - } - - public ItemInfo(long startId, long trackId, int retryCount) { - this.startId = startId; - this.trackId = trackId; - this.retryCount = retryCount; - } - - /** - * Return the start inode id of the current track Id. - */ - public long getStartId() { - return startId; - } - - /** - * Return the File inode Id for which needs to satisfy the policy. - */ - public long getTrackId() { - return trackId; - } - - /** - * Returns true if the tracking path is a directory, false otherwise. - */ - public boolean isDir() { - return (startId != trackId); - } - - /** - * Get the attempted retry count of the block for satisfy the policy. - */ - public int getRetryCount() { - return retryCount; - } - } - - /** - * This class contains information of an attempted blocks and its last - * attempted or reported time stamp. This is used by - * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}. - */ - final static class AttemptedItemInfo extends ItemInfo { - private long lastAttemptedOrReportedTime; - private final List blocks; - - /** - * AttemptedItemInfo constructor. - * - * @param rootId - * rootId for trackId - * @param trackId - * trackId for file. - * @param lastAttemptedOrReportedTime - * last attempted or reported time - */ - AttemptedItemInfo(long rootId, long trackId, - long lastAttemptedOrReportedTime, - List blocks, int retryCount) { - super(rootId, trackId, retryCount); - this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime; - this.blocks = blocks; - } - - /** - * @return last attempted or reported time stamp. - */ - long getLastAttemptedOrReportedTime() { - return lastAttemptedOrReportedTime; - } - - /** - * Update lastAttemptedOrReportedTime, so that the expiration time will be - * postponed to future. - */ - void touchLastReportedTimeStamp() { - this.lastAttemptedOrReportedTime = monotonicNow(); - } - - List getBlocks() { - return this.blocks; - } - - } - - public StoragePolicySatisfyPathStatus checkStoragePolicySatisfyPathStatus( - String path) throws IOException { - INode inode = namesystem.getFSDirectory().getINode(path); - return storageMovementNeeded.getStatus(inode.getId()); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/34588002/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java deleted file mode 100644 index d4ccb3e..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java +++ /dev/null @@ -1,196 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode; - -import static org.apache.hadoop.util.Time.monotonicNow; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.AttemptedItemInfo; - -import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - -/** - * Tests that block storage movement attempt failures are reported from DN and - * processed them correctly or not. - */ -public class TestBlockStorageMovementAttemptedItems { - - private BlockStorageMovementAttemptedItems bsmAttemptedItems = null; - private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles = null; - private final int selfRetryTimeout = 500; - - @Before - public void setup() throws Exception { - unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded( - Mockito.mock(Namesystem.class), - Mockito.mock(StoragePolicySatisfier.class), 100); - bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100, - selfRetryTimeout, unsatisfiedStorageMovementFiles); - } - - @After - public void teardown() { - if (bsmAttemptedItems != null) { - bsmAttemptedItems.stop(); - bsmAttemptedItems.stopGracefully(); - } - } - - private boolean checkItemMovedForRetry(Long item, long retryTimeout) - throws InterruptedException { - long stopTime = monotonicNow() + (retryTimeout * 2); - boolean isItemFound = false; - while (monotonicNow() < (stopTime)) { - ItemInfo ele = null; - while ((ele = unsatisfiedStorageMovementFiles.get()) != null) { - if (item == ele.getTrackId()) { - isItemFound = true; - break; - } - } - if (!isItemFound) { - Thread.sleep(100); - } else { - break; - } - } - return isItemFound; - } - - /** - * Verify that moved blocks reporting should queued up the block info. - */ - @Test(timeout = 30000) - public void testAddReportedMoveAttemptFinishedBlocks() throws Exception { - bsmAttemptedItems.start(); // start block movement result monitor thread - Long item = new Long(1234); - List blocks = new ArrayList(); - blocks.add(new Block(item)); - bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0)); - Block[] blockArray = new Block[blocks.size()]; - blocks.toArray(blockArray); - bsmAttemptedItems.addReportedMovedBlocks(blockArray); - assertEquals("Failed to receive result!", 1, - bsmAttemptedItems.getMovementFinishedBlocksCount()); - } - - /** - * Verify empty moved blocks reporting queue. - */ - @Test(timeout = 30000) - public void testNoBlockMovementAttemptFinishedReportAdded() throws Exception { - bsmAttemptedItems.start(); // start block movement report monitor thread - Long item = new Long(1234); - List blocks = new ArrayList<>(); - blocks.add(new Block(item)); - bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0)); - assertEquals("Shouldn't receive result", 0, - bsmAttemptedItems.getMovementFinishedBlocksCount()); - assertEquals("Item doesn't exist in the attempted list", 1, - bsmAttemptedItems.getAttemptedItemsCount()); - } - - /** - * Partial block movement with - * BlockMovementStatus#DN_BLK_STORAGE_MOVEMENT_SUCCESS. Here, first occurrence - * is #blockStorageMovementReportedItemsCheck() and then - * #blocksStorageMovementUnReportedItemsCheck(). - */ - @Test(timeout = 30000) - public void testPartialBlockMovementShouldBeRetried1() throws Exception { - Long item = new Long(1234); - List blocks = new ArrayList<>(); - blocks.add(new Block(item)); - blocks.add(new Block(5678L)); - Long trackID = 0L; - bsmAttemptedItems - .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0)); - Block[] blksMovementReport = new Block[1]; - blksMovementReport[0] = new Block(item); - bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport); - - // start block movement report monitor thread - bsmAttemptedItems.start(); - assertTrue("Failed to add to the retry list", - checkItemMovedForRetry(trackID, 5000)); - assertEquals("Failed to remove from the attempted list", 0, - bsmAttemptedItems.getAttemptedItemsCount()); - } - - /** - * Partial block movement. Here, first occurrence is - * #blocksStorageMovementUnReportedItemsCheck() and then - * #blockStorageMovementReportedItemsCheck(). - */ - @Test(timeout = 30000) - public void testPartialBlockMovementShouldBeRetried2() throws Exception { - Long item = new Long(1234); - Long trackID = 0L; - List blocks = new ArrayList<>(); - blocks.add(new Block(item)); - bsmAttemptedItems - .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0)); - Block[] blksMovementReport = new Block[1]; - blksMovementReport[0] = new Block(item); - bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport); - - Thread.sleep(selfRetryTimeout * 2); // Waiting to get timed out - - bsmAttemptedItems.blocksStorageMovementUnReportedItemsCheck(); - bsmAttemptedItems.blockStorageMovementReportedItemsCheck(); - - assertTrue("Failed to add to the retry list", - checkItemMovedForRetry(trackID, 5000)); - assertEquals("Failed to remove from the attempted list", 0, - bsmAttemptedItems.getAttemptedItemsCount()); - } - - /** - * Partial block movement with only BlocksStorageMoveAttemptFinished report - * and storageMovementAttemptedItems list is empty. - */ - @Test(timeout = 30000) - public void testPartialBlockMovementWithEmptyAttemptedQueue() - throws Exception { - Long item = new Long(1234); - Long trackID = 0L; - List blocks = new ArrayList<>(); - blocks.add(new Block(item)); - bsmAttemptedItems - .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0)); - Block[] blksMovementReport = new Block[1]; - blksMovementReport[0] = new Block(item); - bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport); - assertFalse( - "Should not add in queue again if it is not there in" - + " storageMovementAttemptedItems", - checkItemMovedForRetry(trackID, 5000)); - assertEquals("Failed to remove from the attempted list", 1, - bsmAttemptedItems.getAttemptedItemsCount()); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org