Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AB5B0200D04 for ; Sun, 27 Aug 2017 09:15:33 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A9DDB1645F8; Sun, 27 Aug 2017 07:15:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 071E21645F0 for ; Sun, 27 Aug 2017 09:15:31 +0200 (CEST) Received: (qmail 69139 invoked by uid 500); 27 Aug 2017 07:15:30 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 68556 invoked by uid 99); 27 Aug 2017 07:15:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 27 Aug 2017 07:15:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 711BFF5F1D; Sun, 27 Aug 2017 07:15:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rakeshr@apache.org To: common-commits@hadoop.apache.org Date: Sun, 27 Aug 2017 07:15:51 -0000 Message-Id: In-Reply-To: <112934a642e8481caf7e783742a9ecd3@git.apache.org> References: <112934a642e8481caf7e783742a9ecd3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [24/50] [abbrv] hadoop git commit: HDFS-11248: [SPS]: Handle partial block location movements. Contributed by Rakesh R archived-at: Sun, 27 Aug 2017 07:15:33 -0000 HDFS-11248: [SPS]: Handle partial block location movements. Contributed by Rakesh R Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a09c7d43 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a09c7d43 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a09c7d43 Branch: refs/heads/HDFS-10285 Commit: a09c7d43d93a7cc43513104ef6c548fa669586d0 Parents: f18deae Author: Uma Maheswara Rao G Authored: Wed Dec 28 23:21:07 2016 -0800 Committer: Rakesh Radhakrishnan Committed: Sun Aug 27 11:54:36 2017 +0530 ---------------------------------------------------------------------- .../datanode/BlockStorageMovementTracker.java | 15 -- .../datanode/StoragePolicySatisfyWorker.java | 15 +- .../BlockStorageMovementAttemptedItems.java | 206 +++++++++++++----- .../server/namenode/StoragePolicySatisfier.java | 215 +++++++++++++------ .../TestBlockStorageMovementAttemptedItems.java | 101 ++++++++- .../namenode/TestStoragePolicySatisfier.java | 63 +++++- 6 files changed, 454 insertions(+), 161 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a09c7d43/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java index 2de88fc..bd35b09 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java @@ -28,7 +28,6 @@ import java.util.concurrent.Future; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult; -import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementStatus; import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsCompletionHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,20 +108,6 @@ public class BlockStorageMovementTracker implements Runnable { } } - /** - * Mark as block movement failure for the given trackId and blockId. - * - * @param trackId tracking id - * @param blockId block id - */ - void markBlockMovementFailure(long trackId, long blockId) { - LOG.debug("Mark as block movement failure for the given " - + "trackId:{} and blockId:{}", trackId, blockId); - BlockMovementResult result = new BlockMovementResult(trackId, blockId, null, - BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE); - addMovementResultToTrackIdList(result); - } - private List addMovementResultToTrackIdList( BlockMovementResult result) { long trackId = result.getTrackId(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a09c7d43/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java index 19f3fe2..10adbfd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java @@ -154,18 +154,9 @@ public class StoragePolicySatisfyWorker { Collection blockMovingInfos) { LOG.debug("Received BlockMovingTasks {}", blockMovingInfos); for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { - // Iterating backwards. This is to ensure that all the block src location - // which doesn't have a target node will be marked as failure before - // scheduling the block movement to valid target nodes. - for (int i = blkMovingInfo.getSources().length - 1; i >= 0; i--) { - if (i >= blkMovingInfo.getTargets().length) { - // Since there is no target selected for scheduling the block, - // just mark this block storage movement as failure. Later, namenode - // can take action on this. - movementTracker.markBlockMovementFailure(trackID, - blkMovingInfo.getBlock().getBlockId()); - continue; - } + assert blkMovingInfo.getSources().length == blkMovingInfo + .getTargets().length; + for (int i = 0; i < blkMovingInfo.getSources().length; i++) { DatanodeInfo target = blkMovingInfo.getTargets()[i]; BlockMovingTask blockMovingTask = new BlockMovingTask( trackID, blockPoolID, blkMovingInfo.getBlock(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/a09c7d43/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java index bb26082..ce97075 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java @@ -43,11 +43,14 @@ import com.google.common.annotations.VisibleForTesting; * automatically after timeout. The default timeout would be 30mins. */ public class BlockStorageMovementAttemptedItems { - public static final Logger LOG = + private static final Logger LOG = LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class); - // A map holds the items which are already taken for blocks movements - // processing and sent to DNs. - private final Map storageMovementAttemptedItems; + + /** + * A map holds the items which are already taken for blocks movements + * processing and sent to DNs. + */ + private final Map storageMovementAttemptedItems; private final List storageMovementAttemptedResults; private volatile boolean monitorRunning = true; private Daemon timerThread = null; @@ -83,10 +86,16 @@ public class BlockStorageMovementAttemptedItems { * * @param blockCollectionID * - tracking id / block collection id + * @param allBlockLocsAttemptedToSatisfy + * - failed to find matching target nodes to satisfy storage type for + * all the block locations of the given blockCollectionID */ - public void add(Long blockCollectionID) { + public void add(Long blockCollectionID, + boolean allBlockLocsAttemptedToSatisfy) { synchronized (storageMovementAttemptedItems) { - storageMovementAttemptedItems.put(blockCollectionID, monotonicNow()); + ItemInfo itemInfo = new ItemInfo(monotonicNow(), + allBlockLocsAttemptedToSatisfy); + storageMovementAttemptedItems.put(blockCollectionID, itemInfo); } } @@ -121,15 +130,62 @@ public class BlockStorageMovementAttemptedItems { */ public synchronized void stop() { monitorRunning = false; - timerThread.interrupt(); - try { - timerThread.join(3000); - } catch (InterruptedException ie) { + if (timerThread != null) { + timerThread.interrupt(); + try { + timerThread.join(3000); + } catch (InterruptedException ie) { + } } this.clearQueues(); } /** + * This class contains information of an attempted trackID. Information such + * as, (a)last attempted time stamp, (b)whether all the blocks in the trackID + * were attempted and blocks movement has been scheduled to satisfy storage + * policy. This is used by + * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}. + */ + private final static class ItemInfo { + private final long lastAttemptedTimeStamp; + private final boolean allBlockLocsAttemptedToSatisfy; + + /** + * ItemInfo constructor. + * + * @param lastAttemptedTimeStamp + * last attempted time stamp + * @param allBlockLocsAttemptedToSatisfy + * whether all the blocks in the trackID were attempted and blocks + * movement has been scheduled to satisfy storage policy + */ + private ItemInfo(long lastAttemptedTimeStamp, + boolean allBlockLocsAttemptedToSatisfy) { + this.lastAttemptedTimeStamp = lastAttemptedTimeStamp; + this.allBlockLocsAttemptedToSatisfy = allBlockLocsAttemptedToSatisfy; + } + + /** + * @return last attempted time stamp. + */ + private long getLastAttemptedTimeStamp() { + return lastAttemptedTimeStamp; + } + + /** + * @return true/false. True value represents that, all the block locations + * under the trackID has found matching target nodes to satisfy + * storage policy. False value represents that, trackID needed + * retries to satisfy the storage policy for some of the block + * locations. + */ + private boolean isAllBlockLocsAttemptedToSatisfy() { + return allBlockLocsAttemptedToSatisfy; + } + } + + /** * A monitor class for checking block storage movement result and long waiting * items periodically. */ @@ -147,76 +203,108 @@ public class BlockStorageMovementAttemptedItems { } } } + } - private void blocksStorageMovementUnReportedItemsCheck() { - synchronized (storageMovementAttemptedItems) { - Iterator> iter = - storageMovementAttemptedItems.entrySet().iterator(); - long now = monotonicNow(); - while (iter.hasNext()) { - Entry entry = iter.next(); - if (now > entry.getValue() + selfRetryTimeout) { - Long blockCollectionID = entry.getKey(); - synchronized (storageMovementAttemptedResults) { - boolean exist = isExistInResult(blockCollectionID); - if (!exist) { - blockStorageMovementNeeded.add(blockCollectionID); - } else { - LOG.info("Blocks storage movement results for the" - + " tracking id : " + blockCollectionID - + " is reported from one of the co-ordinating datanode." - + " So, the result will be processed soon."); - } + @VisibleForTesting + void blocksStorageMovementUnReportedItemsCheck() { + synchronized (storageMovementAttemptedItems) { + Iterator> iter = storageMovementAttemptedItems + .entrySet().iterator(); + long now = monotonicNow(); + while (iter.hasNext()) { + Entry entry = iter.next(); + ItemInfo itemInfo = entry.getValue(); + if (now > itemInfo.getLastAttemptedTimeStamp() + selfRetryTimeout) { + Long blockCollectionID = entry.getKey(); + synchronized (storageMovementAttemptedResults) { + if (!isExistInResult(blockCollectionID)) { + blockStorageMovementNeeded.add(blockCollectionID); iter.remove(); + LOG.info("TrackID: {} becomes timed out and moved to needed " + + "retries queue for next iteration.", blockCollectionID); + } else { + LOG.info("Blocks storage movement results for the" + + " tracking id : " + blockCollectionID + + " is reported from one of the co-ordinating datanode." + + " So, the result will be processed soon."); } } } - } + } + } - private boolean isExistInResult(Long blockCollectionID) { - Iterator iter = - storageMovementAttemptedResults.iterator(); - while (iter.hasNext()) { - BlocksStorageMovementResult storageMovementAttemptedResult = - iter.next(); - if (storageMovementAttemptedResult.getTrackId() == blockCollectionID) { - return true; - } + private boolean isExistInResult(Long blockCollectionID) { + Iterator iter = storageMovementAttemptedResults + .iterator(); + while (iter.hasNext()) { + BlocksStorageMovementResult storageMovementAttemptedResult = iter.next(); + if (storageMovementAttemptedResult.getTrackId() == blockCollectionID) { + return true; } - return false; } + return false; + } - private void blockStorageMovementResultCheck() { - synchronized (storageMovementAttemptedResults) { - Iterator iter = - storageMovementAttemptedResults.iterator(); - while (iter.hasNext()) { - BlocksStorageMovementResult storageMovementAttemptedResult = - iter.next(); + @VisibleForTesting + void blockStorageMovementResultCheck() { + synchronized (storageMovementAttemptedResults) { + Iterator resultsIter = + storageMovementAttemptedResults.iterator(); + while (resultsIter.hasNext()) { + // TrackID need to be retried in the following cases: + // 1) All or few scheduled block(s) movement has been failed. + // 2) All the scheduled block(s) movement has been succeeded but there + // are unscheduled block(s) movement in this trackID. Say, some of + // the blocks in the trackID couldn't finding any matching target node + // for scheduling block movement in previous SPS iteration. + BlocksStorageMovementResult storageMovementAttemptedResult = resultsIter + .next(); + synchronized (storageMovementAttemptedItems) { if (storageMovementAttemptedResult .getStatus() == BlocksStorageMovementResult.Status.FAILURE) { blockStorageMovementNeeded .add(storageMovementAttemptedResult.getTrackId()); - LOG.warn("Blocks storage movement results for the tracking id : " - + storageMovementAttemptedResult.getTrackId() + LOG.warn("Blocks storage movement results for the tracking id: {}" + " is reported from co-ordinating datanode, but result" - + " status is FAILURE. So, added for retry"); + + " status is FAILURE. So, added for retry", + storageMovementAttemptedResult.getTrackId()); } else { - synchronized (storageMovementAttemptedItems) { - storageMovementAttemptedItems - .remove(storageMovementAttemptedResult.getTrackId()); + ItemInfo itemInfo = storageMovementAttemptedItems + .get(storageMovementAttemptedResult.getTrackId()); + + // ItemInfo could be null. One case is, before the blocks movements + // result arrives the attempted trackID became timed out and then + // removed the trackID from the storageMovementAttemptedItems list. + // TODO: Need to ensure that trackID is added to the + // 'blockStorageMovementNeeded' queue for retries to handle the + // following condition. If all the block locations under the trackID + // are attempted and failed to find matching target nodes to satisfy + // storage policy in previous SPS iteration. + if (itemInfo != null + && !itemInfo.isAllBlockLocsAttemptedToSatisfy()) { + blockStorageMovementNeeded + .add(storageMovementAttemptedResult.getTrackId()); + LOG.warn("Blocks storage movement is SUCCESS for the track id: {}" + + " reported from co-ordinating datanode. But adding trackID" + + " back to retry queue as some of the blocks couldn't find" + + " matching target nodes in previous SPS iteration.", + storageMovementAttemptedResult.getTrackId()); + } else { + LOG.info("Blocks storage movement is SUCCESS for the track id: {}" + + " reported from co-ordinating datanode. But the trackID " + + "doesn't exists in storageMovementAttemptedItems list", + storageMovementAttemptedResult.getTrackId()); } - LOG.info("Blocks storage movement results for the tracking id : " - + storageMovementAttemptedResult.getTrackId() - + " is reported from co-ordinating datanode. " - + "The result status is SUCCESS."); } - iter.remove(); // remove from results as processed above + // Remove trackID from the attempted list, if any. + storageMovementAttemptedItems + .remove(storageMovementAttemptedResult.getTrackId()); } + // Remove trackID from results as processed above. + resultsIter.remove(); } - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a09c7d43/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java index 56a531f..26e0775 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java @@ -162,8 +162,15 @@ public class StoragePolicySatisfier implements Runnable { try { Long blockCollectionID = storageMovementNeeded.get(); if (blockCollectionID != null) { - computeAndAssignStorageMismatchedBlocksToDNs(blockCollectionID); - this.storageMovementsMonitor.add(blockCollectionID); + BlockCollection blockCollection = + namesystem.getBlockCollection(blockCollectionID); + // Check blockCollectionId existence. + if (blockCollection != null) { + boolean allBlockLocsAttemptedToSatisfy = + computeAndAssignStorageMismatchedBlocksToDNs(blockCollection); + this.storageMovementsMonitor.add(blockCollectionID, + allBlockLocsAttemptedToSatisfy); + } } // TODO: We can think to make this as configurable later, how frequently // we want to check block movements. @@ -192,20 +199,17 @@ public class StoragePolicySatisfier implements Runnable { } } - private void computeAndAssignStorageMismatchedBlocksToDNs( - long blockCollectionID) { - BlockCollection blockCollection = - namesystem.getBlockCollection(blockCollectionID); - if (blockCollection == null) { - return; - } + private boolean computeAndAssignStorageMismatchedBlocksToDNs( + BlockCollection blockCollection) { byte existingStoragePolicyID = blockCollection.getStoragePolicyID(); BlockStoragePolicy existingStoragePolicy = blockManager.getStoragePolicy(existingStoragePolicyID); if (!blockCollection.getLastBlock().isComplete()) { // Postpone, currently file is under construction // So, should we add back? or leave it to user - return; + LOG.info("BlockCollectionID: {} file is under construction. So, postpone" + + " this to the next retry iteration", blockCollection.getId()); + return true; } // First datanode will be chosen as the co-ordinator node for storage @@ -213,61 +217,87 @@ public class StoragePolicySatisfier implements Runnable { DatanodeDescriptor coordinatorNode = null; BlockInfo[] blocks = blockCollection.getBlocks(); List blockMovingInfos = new ArrayList(); + + // True value represents that, SPS is able to find matching target nodes + // to satisfy storage type for all the blocks locations of the given + // blockCollection. A false value represents that, blockCollection needed + // retries to satisfy the storage policy for some of the block locations. + boolean foundMatchingTargetNodesForAllBlocks = true; + for (int i = 0; i < blocks.length; i++) { BlockInfo blockInfo = blocks[i]; - List expectedStorageTypes = - existingStoragePolicy.chooseStorageTypes(blockInfo.getReplication()); - DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo); - StorageType[] storageTypes = new StorageType[storages.length]; - for (int j = 0; j < storages.length; j++) { - DatanodeStorageInfo datanodeStorageInfo = storages[j]; - StorageType storageType = datanodeStorageInfo.getStorageType(); - storageTypes[j] = storageType; - } - List existing = - new LinkedList(Arrays.asList(storageTypes)); - if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes, - existing, true)) { - List sourceWithStorageMap = - new ArrayList(); - List existingBlockStorages = - new ArrayList(Arrays.asList(storages)); - for (StorageType existingType : existing) { - Iterator iterator = - existingBlockStorages.iterator(); - while (iterator.hasNext()) { - DatanodeStorageInfo datanodeStorageInfo = iterator.next(); - StorageType storageType = datanodeStorageInfo.getStorageType(); - if (storageType == existingType) { - iterator.remove(); - sourceWithStorageMap.add(new StorageTypeNodePair(storageType, - datanodeStorageInfo.getDatanodeDescriptor())); - break; - } - } - } + List expectedStorageTypes = existingStoragePolicy + .chooseStorageTypes(blockInfo.getReplication()); + foundMatchingTargetNodesForAllBlocks |= computeBlockMovingInfos( + blockMovingInfos, blockInfo, expectedStorageTypes); + } - StorageTypeNodeMap locsForExpectedStorageTypes = - findTargetsForExpectedStorageTypes(expectedStorageTypes); - - BlockMovingInfo blockMovingInfo = - findSourceAndTargetToMove(blockInfo, existing, sourceWithStorageMap, - expectedStorageTypes, locsForExpectedStorageTypes); - if (coordinatorNode == null) { - // For now, first datanode will be chosen as the co-ordinator. Later - // this can be optimized if needed. - coordinatorNode = - (DatanodeDescriptor) blockMovingInfo.getSources()[0]; + assignBlockMovingInfosToCoordinatorDn(blockCollection.getId(), + blockMovingInfos, coordinatorNode); + return foundMatchingTargetNodesForAllBlocks; + } + + /** + * Compute the list of block moving information corresponding to the given + * blockId. This will check that each block location of the given block is + * satisfying the expected storage policy. If block location is not satisfied + * the policy then find out the target node with the expected storage type to + * satisfy the storage policy. + * + * @param blockMovingInfos + * - list of block source and target node pair + * @param blockInfo + * - block details + * @param expectedStorageTypes + * - list of expected storage type to satisfy the storage policy + * @return false if some of the block locations failed to find target node to + * satisfy the storage policy, true otherwise + */ + private boolean computeBlockMovingInfos( + List blockMovingInfos, BlockInfo blockInfo, + List expectedStorageTypes) { + boolean foundMatchingTargetNodesForBlock = true; + DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo); + StorageType[] storageTypes = new StorageType[storages.length]; + for (int j = 0; j < storages.length; j++) { + DatanodeStorageInfo datanodeStorageInfo = storages[j]; + StorageType storageType = datanodeStorageInfo.getStorageType(); + storageTypes[j] = storageType; + } + List existing = + new LinkedList(Arrays.asList(storageTypes)); + if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes, + existing, true)) { + List sourceWithStorageMap = + new ArrayList(); + List existingBlockStorages = + new ArrayList(Arrays.asList(storages)); + for (StorageType existingType : existing) { + Iterator iterator = + existingBlockStorages.iterator(); + while (iterator.hasNext()) { + DatanodeStorageInfo datanodeStorageInfo = iterator.next(); + StorageType storageType = datanodeStorageInfo.getStorageType(); + if (storageType == existingType) { + iterator.remove(); + sourceWithStorageMap.add(new StorageTypeNodePair(storageType, + datanodeStorageInfo.getDatanodeDescriptor())); + break; + } } - blockMovingInfos.add(blockMovingInfo); } - } - addBlockMovingInfosToCoordinatorDn(blockCollectionID, blockMovingInfos, - coordinatorNode); + StorageTypeNodeMap locsForExpectedStorageTypes = + findTargetsForExpectedStorageTypes(expectedStorageTypes); + + foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove( + blockMovingInfos, blockInfo, existing, sourceWithStorageMap, + expectedStorageTypes, locsForExpectedStorageTypes); + } + return foundMatchingTargetNodesForBlock; } - private void addBlockMovingInfosToCoordinatorDn(long blockCollectionID, + private void assignBlockMovingInfosToCoordinatorDn(long blockCollectionID, List blockMovingInfos, DatanodeDescriptor coordinatorNode) { @@ -278,6 +308,11 @@ public class StoragePolicySatisfier implements Runnable { return; } + // For now, first datanode will be chosen as the co-ordinator. Later + // this can be optimized if needed. + coordinatorNode = (DatanodeDescriptor) blockMovingInfos.get(0) + .getSources()[0]; + boolean needBlockStorageMovement = false; for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { // Check for atleast one block storage movement has been chosen @@ -301,6 +336,8 @@ public class StoragePolicySatisfier implements Runnable { * Find the good target node for each source node for which block storages was * misplaced. * + * @param blockMovingInfos + * - list of block source and target node pair * @param blockInfo * - Block * @param existing @@ -311,23 +348,49 @@ public class StoragePolicySatisfier implements Runnable { * - Expecting storages to move * @param locsForExpectedStorageTypes * - Available DNs for expected storage types - * @return list of block source and target node pair + * @return false if some of the block locations failed to find target node to + * satisfy the storage policy */ - private BlockMovingInfo findSourceAndTargetToMove(BlockInfo blockInfo, + private boolean findSourceAndTargetToMove( + List blockMovingInfos, BlockInfo blockInfo, List existing, List sourceWithStorageList, List expected, StorageTypeNodeMap locsForExpectedStorageTypes) { + boolean foundMatchingTargetNodesForBlock = true; List sourceNodes = new ArrayList<>(); List sourceStorageTypes = new ArrayList<>(); List targetNodes = new ArrayList<>(); List targetStorageTypes = new ArrayList<>(); List chosenNodes = new ArrayList<>(); + + // Looping over all the source node locations and choose the target + // storage within same node if possible. This is done separately to + // avoid choosing a target which already has this block. for (int i = 0; i < sourceWithStorageList.size(); i++) { StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i); StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode( existingTypeNodePair.dn, expected); + if (chosenTarget != null) { + sourceNodes.add(existingTypeNodePair.dn); + sourceStorageTypes.add(existingTypeNodePair.storageType); + targetNodes.add(chosenTarget.dn); + targetStorageTypes.add(chosenTarget.storageType); + chosenNodes.add(chosenTarget.dn); + // TODO: We can increment scheduled block count for this node? + } + } + // Looping over all the source node locations. Choose a remote target + // storage node if it was not found out within same node. + for (int i = 0; i < sourceWithStorageList.size(); i++) { + StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i); + StorageTypeNodePair chosenTarget = null; + // Chosen the target storage within same datanode. So just skipping this + // source node. + if (sourceNodes.contains(existingTypeNodePair.dn)) { + continue; + } if (chosenTarget == null && blockManager.getDatanodeManager() .getNetworkTopology().isNodeGroupAware()) { chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn, @@ -359,18 +422,40 @@ public class StoragePolicySatisfier implements Runnable { "Failed to choose target datanode for the required" + " storage types {}, block:{}, existing storage type:{}", expected, blockInfo, existingTypeNodePair.storageType); - sourceNodes.add(existingTypeNodePair.dn); - sourceStorageTypes.add(existingTypeNodePair.storageType); - // Imp: Not setting the target details, empty targets. Later, this is - // used as an indicator for retrying this block movement. + foundMatchingTargetNodesForBlock = false; } } - BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blockInfo, + + blockMovingInfos.addAll(getBlockMovingInfos(blockInfo, sourceNodes, + sourceStorageTypes, targetNodes, targetStorageTypes)); + return foundMatchingTargetNodesForBlock; + } + + private List getBlockMovingInfos(BlockInfo blockInfo, + List sourceNodes, List sourceStorageTypes, + List targetNodes, List targetStorageTypes) { + List blkMovingInfos = new ArrayList<>(); + // No source-target node pair exists. + if (sourceNodes.size() <= 0) { + return blkMovingInfos; + } + buildBlockMovingInfos(blockInfo, sourceNodes, sourceStorageTypes, + targetNodes, targetStorageTypes, blkMovingInfos); + return blkMovingInfos; + } + + private void buildBlockMovingInfos(BlockInfo blockInfo, + List sourceNodes, List sourceStorageTypes, + List targetNodes, List targetStorageTypes, + List blkMovingInfos) { + Block blk = new Block(blockInfo.getBlockId(), blockInfo.getNumBytes(), + blockInfo.getGenerationStamp()); + BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNodes.toArray(new DatanodeInfo[sourceNodes.size()]), targetNodes.toArray(new DatanodeInfo[targetNodes.size()]), sourceStorageTypes.toArray(new StorageType[sourceStorageTypes.size()]), targetStorageTypes.toArray(new StorageType[targetStorageTypes.size()])); - return blkMovingInfo; + blkMovingInfos.add(blkMovingInfo); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/a09c7d43/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java index 8c70d99..6641134 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java @@ -33,13 +33,13 @@ public class TestBlockStorageMovementAttemptedItems { private BlockStorageMovementAttemptedItems bsmAttemptedItems = null; private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles = null; + private final int selfRetryTimeout = 500; @Before public void setup() { unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(); - bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100, 500, - unsatisfiedStorageMovementFiles); - bsmAttemptedItems.start(); + bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100, + selfRetryTimeout, unsatisfiedStorageMovementFiles); } @After @@ -72,8 +72,9 @@ public class TestBlockStorageMovementAttemptedItems { @Test(timeout = 30000) public void testAddResultWithFailureResult() throws Exception { + bsmAttemptedItems.start(); // start block movement result monitor thread Long item = new Long(1234); - bsmAttemptedItems.add(item); + bsmAttemptedItems.add(item, true); bsmAttemptedItems.addResults( new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( item.longValue(), BlocksStorageMovementResult.Status.FAILURE)}); @@ -82,8 +83,9 @@ public class TestBlockStorageMovementAttemptedItems { @Test(timeout = 30000) public void testAddResultWithSucessResult() throws Exception { + bsmAttemptedItems.start(); // start block movement result monitor thread Long item = new Long(1234); - bsmAttemptedItems.add(item); + bsmAttemptedItems.add(item, true); bsmAttemptedItems.addResults( new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)}); @@ -92,10 +94,93 @@ public class TestBlockStorageMovementAttemptedItems { @Test(timeout = 30000) public void testNoResultAdded() throws Exception { + bsmAttemptedItems.start(); // start block movement result monitor thread Long item = new Long(1234); - bsmAttemptedItems.add(item); - // After selfretry timeout, it should be added back for retry - assertTrue(checkItemMovedForRetry(item, 600)); + bsmAttemptedItems.add(item, true); + // After self retry timeout, it should be added back for retry + assertTrue("Failed to add to the retry list", + checkItemMovedForRetry(item, 600)); + assertEquals("Failed to remove from the attempted list", 0, + bsmAttemptedItems.getAttemptedItemsCount()); } + /** + * Partial block movement with BlocksStorageMovementResult#SUCCESS. Here, + * first occurrence is #blockStorageMovementResultCheck() and then + * #blocksStorageMovementUnReportedItemsCheck(). + */ + @Test(timeout = 30000) + public void testPartialBlockMovementShouldBeRetried1() throws Exception { + Long item = new Long(1234); + bsmAttemptedItems.add(item, false); + bsmAttemptedItems.addResults( + new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( + item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)}); + + // start block movement result monitor thread + bsmAttemptedItems.start(); + assertTrue("Failed to add to the retry list", + checkItemMovedForRetry(item, 5000)); + assertEquals("Failed to remove from the attempted list", 0, + bsmAttemptedItems.getAttemptedItemsCount()); + } + + /** + * Partial block movement with BlocksStorageMovementResult#SUCCESS. Here, + * first occurrence is #blocksStorageMovementUnReportedItemsCheck() and then + * #blockStorageMovementResultCheck(). + */ + @Test(timeout = 30000) + public void testPartialBlockMovementShouldBeRetried2() throws Exception { + Long item = new Long(1234); + bsmAttemptedItems.add(item, false); + bsmAttemptedItems.addResults( + new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( + item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)}); + + Thread.sleep(selfRetryTimeout * 2); // Waiting to get timed out + + bsmAttemptedItems.blocksStorageMovementUnReportedItemsCheck(); + bsmAttemptedItems.blockStorageMovementResultCheck(); + + assertTrue("Failed to add to the retry list", + checkItemMovedForRetry(item, 5000)); + assertEquals("Failed to remove from the attempted list", 0, + bsmAttemptedItems.getAttemptedItemsCount()); + } + + /** + * Partial block movement with only BlocksStorageMovementResult#FAILURE result + * and storageMovementAttemptedItems list is empty. + */ + @Test(timeout = 30000) + public void testPartialBlockMovementShouldBeRetried3() throws Exception { + Long item = new Long(1234); + bsmAttemptedItems.addResults( + new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( + item.longValue(), BlocksStorageMovementResult.Status.FAILURE)}); + bsmAttemptedItems.blockStorageMovementResultCheck(); + assertTrue("Failed to add to the retry list", + checkItemMovedForRetry(item, 5000)); + assertEquals("Failed to remove from the attempted list", 0, + bsmAttemptedItems.getAttemptedItemsCount()); + } + + /** + * Partial block movement with BlocksStorageMovementResult#FAILURE result and + * storageMovementAttemptedItems. + */ + @Test(timeout = 30000) + public void testPartialBlockMovementShouldBeRetried4() throws Exception { + Long item = new Long(1234); + bsmAttemptedItems.add(item, false); + bsmAttemptedItems.addResults( + new BlocksStorageMovementResult[]{new BlocksStorageMovementResult( + item.longValue(), BlocksStorageMovementResult.Status.FAILURE)}); + bsmAttemptedItems.blockStorageMovementResultCheck(); + assertTrue("Failed to add to the retry list", + checkItemMovedForRetry(item, 5000)); + assertEquals("Failed to remove from the attempted list", 0, + bsmAttemptedItems.getAttemptedItemsCount()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a09c7d43/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java index 179b66b..718dbcb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java @@ -527,6 +527,59 @@ public class TestStoragePolicySatisfier { waitExpectedStorageType(file1, StorageType.DISK, 2, 30000); } + /** + * Tests to verify that for the given path, only few of the blocks or block + * src locations(src nodes) under the given path will be scheduled for block + * movement. + * + * For example, there are two block for a file: + * + * File1 => two blocks and default storage policy(HOT). + * blk_1[locations=A(DISK),B(DISK),C(DISK),D(DISK),E(DISK)], + * blk_2[locations=A(DISK),B(DISK),C(DISK),D(DISK),E(DISK)]. + * + * Now, set storage policy to COLD. + * Only two Dns are available with expected storage type ARCHIVE, say A, E. + * + * SPS will schedule block movement to the coordinator node with the details, + * blk_1[move A(DISK) -> A(ARCHIVE), move E(DISK) -> E(ARCHIVE)], + * blk_2[move A(DISK) -> A(ARCHIVE), move E(DISK) -> E(ARCHIVE)]. + */ + @Test(timeout = 300000) + public void testWhenOnlyFewSourceNodesHaveMatchingTargetNodes() + throws Exception { + try { + int numOfDns = 5; + config.setLong("dfs.block.size", 1024); + allDiskTypes = + new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.ARCHIVE}}; + hdfsCluster = startCluster(config, allDiskTypes, numOfDns, + storagesPerDatanode, capacity); + dfs = hdfsCluster.getFileSystem(); + writeContent(file, (short) 5); + + // Change policy to COLD + dfs.setStoragePolicy(new Path(file), "COLD"); + FSNamesystem namesystem = hdfsCluster.getNamesystem(); + INode inode = namesystem.getFSDirectory().getINode(file); + + namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); + hdfsCluster.triggerHeartbeats(); + // Wait till StorgePolicySatisfier identified that block to move to + // ARCHIVE area. + waitExpectedStorageType(file, StorageType.ARCHIVE, 2, 30000); + waitExpectedStorageType(file, StorageType.DISK, 3, 30000); + + waitForBlocksMovementResult(1, 30000); + } finally { + shutdownCluster(); + } + } + private String createFileAndSimulateFavoredNodes(int favoredNodesCount) throws IOException { ArrayList dns = hdfsCluster.getDataNodes(); @@ -561,7 +614,7 @@ public class TestStoragePolicySatisfier { DataNodeTestUtils.mockDatanodeBlkPinning(dn, true); favoredNodesCount--; if (favoredNodesCount <= 0) { - break;// marked favoredNodesCount number of pinned block location + break; // marked favoredNodesCount number of pinned block location } } return file1; @@ -600,8 +653,14 @@ public class TestStoragePolicySatisfier { } private void writeContent(final String fileName) throws IOException { + writeContent(fileName, (short) 3); + } + + private void writeContent(final String fileName, short replicatonFactor) + throws IOException { // write to DISK - final FSDataOutputStream out = dfs.create(new Path(fileName)); + final FSDataOutputStream out = dfs.create(new Path(fileName), + replicatonFactor); for (int i = 0; i < 1000; i++) { out.writeChars("t"); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org