From common-commits-return-78151-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Mon Jan 29 05:12:42 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id CE02B18064F for ; Mon, 29 Jan 2018 05:12:42 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id BDF32160C53; Mon, 29 Jan 2018 04:12:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E35E8160C56 for ; Mon, 29 Jan 2018 05:12:40 +0100 (CET) Received: (qmail 91707 invoked by uid 500); 29 Jan 2018 04:12:14 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 88073 invoked by uid 99); 29 Jan 2018 04:12:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Jan 2018 04:12:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7C8A5F4DEF; Mon, 29 Jan 2018 04:12:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rakeshr@apache.org To: common-commits@hadoop.apache.org Date: Mon, 29 Jan 2018 04:12:36 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [31/50] [abbrv] hadoop git commit: HDFS-12570: [SPS]: Refactor Co-ordinator datanode logic to track the block storage movements. Contributed by Rakesh R. HDFS-12570: [SPS]: Refactor Co-ordinator datanode logic to track the block storage movements. Contributed by Rakesh R. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/776af85f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/776af85f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/776af85f Branch: refs/heads/HDFS-10285 Commit: 776af85f8e5a8af30e9d255db1e43a0b51beb589 Parents: e94f1de Author: Uma Maheswara Rao G Authored: Thu Oct 12 17:17:51 2017 -0700 Committer: Rakesh Radhakrishnan Committed: Mon Jan 29 09:21:13 2018 +0530 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 8 +- .../DatanodeProtocolClientSideTranslatorPB.java | 12 +- .../DatanodeProtocolServerSideTranslatorPB.java | 4 +- .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 150 +++----- .../blockmanagement/DatanodeDescriptor.java | 50 ++- .../server/blockmanagement/DatanodeManager.java | 104 ++++-- .../hdfs/server/datanode/BPOfferService.java | 3 +- .../hdfs/server/datanode/BPServiceActor.java | 33 +- .../datanode/BlockStorageMovementTracker.java | 80 ++--- .../datanode/StoragePolicySatisfyWorker.java | 214 ++++-------- .../BlockStorageMovementAttemptedItems.java | 299 ++++------------ .../BlockStorageMovementInfosBatch.java | 61 ---- .../hdfs/server/namenode/FSNamesystem.java | 11 +- .../hdfs/server/namenode/NameNodeRpcServer.java | 7 +- .../server/namenode/StoragePolicySatisfier.java | 343 ++++++++++--------- .../protocol/BlockStorageMovementCommand.java | 99 ++---- .../BlocksStorageMoveAttemptFinished.java | 48 +++ .../protocol/BlocksStorageMovementResult.java | 74 ---- .../hdfs/server/protocol/DatanodeProtocol.java | 5 +- .../src/main/proto/DatanodeProtocol.proto | 30 +- .../src/main/resources/hdfs-default.xml | 21 +- .../src/site/markdown/ArchivalStorage.md | 6 +- .../TestNameNodePrunesMissingStorages.java | 5 +- .../datanode/InternalDataNodeTestUtils.java | 4 +- .../server/datanode/TestBPOfferService.java | 4 +- .../hdfs/server/datanode/TestBlockRecovery.java | 4 +- .../server/datanode/TestDataNodeLifeline.java | 6 +- .../TestDatanodeProtocolRetryPolicy.java | 4 +- .../server/datanode/TestFsDatasetCache.java | 4 +- .../TestStoragePolicySatisfyWorker.java | 52 ++- .../hdfs/server/datanode/TestStorageReport.java | 4 +- .../server/namenode/NNThroughputBenchmark.java | 6 +- .../hdfs/server/namenode/NameNodeAdapter.java | 4 +- .../TestBlockStorageMovementAttemptedItems.java | 145 ++++---- .../hdfs/server/namenode/TestDeadDatanode.java | 4 +- .../namenode/TestStoragePolicySatisfier.java | 115 ++++++- ...stStoragePolicySatisfierWithStripedFile.java | 20 +- 37 files changed, 908 insertions(+), 1135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/776af85f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index c435739..328df87 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -613,11 +613,15 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY = "dfs.storage.policy.satisfier.recheck.timeout.millis"; public static final int DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT = - 5 * 60 * 1000; + 1 * 60 * 1000; public static final String DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY = "dfs.storage.policy.satisfier.self.retry.timeout.millis"; public static final int DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT = - 20 * 60 * 1000; + 5 * 60 * 1000; + public static final String DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY = + "dfs.storage.policy.satisfier.low.max-streams.preference"; + public static final boolean DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_DEFAULT = + false; public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address"; public static final int DFS_DATANODE_DEFAULT_PORT = 9866; http://git-wip-us.apache.org/repos/asf/hadoop/blob/776af85f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 9dd87d0..dcc0705 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -48,7 +48,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlock import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -140,7 +140,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements boolean requestFullBlockReportLease, @Nonnull SlowPeerReports slowPeers, @Nonnull SlowDiskReports slowDisks, - BlocksStorageMovementResult[] blksMovementResults) throws IOException { + BlocksStorageMoveAttemptFinished storageMovementFinishedBlks) + throws IOException { HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder() .setRegistration(PBHelper.convert(registration)) .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount) @@ -165,8 +166,11 @@ public class DatanodeProtocolClientSideTranslatorPB implements } // Adding blocks movement results to the heart beat request. - builder.addAllBlksMovementResults( - PBHelper.convertBlksMovResults(blksMovementResults)); + if (storageMovementFinishedBlks != null + && storageMovementFinishedBlks.getBlocks() != null) { + builder.setStorageMoveAttemptFinishedBlks( + PBHelper.convertBlksMovReport(storageMovementFinishedBlks)); + } HeartbeatResponseProto resp; try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/776af85f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java index 40458ef..b5bb80a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java @@ -123,8 +123,8 @@ public class DatanodeProtocolServerSideTranslatorPB implements volumeFailureSummary, request.getRequestFullBlockReportLease(), PBHelper.convertSlowPeerInfo(request.getSlowPeersList()), PBHelper.convertSlowDiskInfo(request.getSlowDisksList()), - PBHelper.convertBlksMovResults( - request.getBlksMovementResultsList())); + PBHelper.convertBlksMovReport( + request.getStorageMoveAttemptFinishedBlks())); } catch (IOException e) { throw new ServiceException(e); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/776af85f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 996b986..38f72c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBand import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECReconstructionCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockMovingInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto; @@ -56,11 +57,11 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerRepo import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementCommandProto; -import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementProto; -import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlocksStorageMovementResultProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlocksStorageMoveAttemptFinishedProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ProvidedStorageLocationProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto; @@ -104,8 +105,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStr import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations; @@ -971,59 +971,27 @@ public class PBHelper { return SlowDiskReports.create(slowDisksMap); } - public static BlocksStorageMovementResult[] convertBlksMovResults( - List protos) { - BlocksStorageMovementResult[] results = - new BlocksStorageMovementResult[protos.size()]; - for (int i = 0; i < protos.size(); i++) { - BlocksStorageMovementResultProto resultProto = protos.get(i); - BlocksStorageMovementResult.Status status; - switch (resultProto.getStatus()) { - case SUCCESS: - status = Status.SUCCESS; - break; - case FAILURE: - status = Status.FAILURE; - break; - case IN_PROGRESS: - status = Status.IN_PROGRESS; - break; - default: - throw new AssertionError("Unknown status: " + resultProto.getStatus()); - } - results[i] = new BlocksStorageMovementResult(resultProto.getTrackID(), - status); + public static BlocksStorageMoveAttemptFinished convertBlksMovReport( + BlocksStorageMoveAttemptFinishedProto proto) { + + List blocksList = proto.getBlocksList(); + Block[] blocks = new Block[blocksList.size()]; + for (int i = 0; i < blocksList.size(); i++) { + BlockProto blkProto = blocksList.get(i); + blocks[i] = PBHelperClient.convert(blkProto); } - return results; + return new BlocksStorageMoveAttemptFinished(blocks); } - public static List convertBlksMovResults( - BlocksStorageMovementResult[] blocksMovementResults) { - List blocksMovementResultsProto = - new ArrayList<>(); - BlocksStorageMovementResultProto.Builder builder = - BlocksStorageMovementResultProto.newBuilder(); - for (int i = 0; i < blocksMovementResults.length; i++) { - BlocksStorageMovementResult report = blocksMovementResults[i]; - builder.setTrackID(report.getTrackId()); - BlocksStorageMovementResultProto.Status status; - switch (report.getStatus()) { - case SUCCESS: - status = BlocksStorageMovementResultProto.Status.SUCCESS; - break; - case FAILURE: - status = BlocksStorageMovementResultProto.Status.FAILURE; - break; - case IN_PROGRESS: - status = BlocksStorageMovementResultProto.Status.IN_PROGRESS; - break; - default: - throw new AssertionError("Unknown status: " + report.getStatus()); - } - builder.setStatus(status); - blocksMovementResultsProto.add(builder.build()); + public static BlocksStorageMoveAttemptFinishedProto convertBlksMovReport( + BlocksStorageMoveAttemptFinished blocksMoveAttemptFinished) { + BlocksStorageMoveAttemptFinishedProto.Builder builder = + BlocksStorageMoveAttemptFinishedProto.newBuilder(); + Block[] blocks = blocksMoveAttemptFinished.getBlocks(); + for (Block block : blocks) { + builder.addBlocks(PBHelperClient.convert(block)); } - return blocksMovementResultsProto; + return builder.build(); } public static JournalInfo convert(JournalInfoProto info) { @@ -1211,34 +1179,34 @@ public class PBHelper { BlockStorageMovementCommandProto.Builder builder = BlockStorageMovementCommandProto.newBuilder(); - builder.setTrackID(blkStorageMovementCmd.getTrackID()); builder.setBlockPoolId(blkStorageMovementCmd.getBlockPoolId()); Collection blockMovingInfos = blkStorageMovementCmd .getBlockMovingTasks(); for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { - builder.addBlockStorageMovement( - convertBlockMovingInfo(blkMovingInfo)); + builder.addBlockMovingInfo(convertBlockMovingInfo(blkMovingInfo)); } return builder.build(); } - private static BlockStorageMovementProto convertBlockMovingInfo( + private static BlockMovingInfoProto convertBlockMovingInfo( BlockMovingInfo blkMovingInfo) { - BlockStorageMovementProto.Builder builder = BlockStorageMovementProto + BlockMovingInfoProto.Builder builder = BlockMovingInfoProto .newBuilder(); builder.setBlock(PBHelperClient.convert(blkMovingInfo.getBlock())); - DatanodeInfo[] sourceDnInfos = blkMovingInfo.getSources(); - builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos)); + DatanodeInfo sourceDnInfo = blkMovingInfo.getSource(); + builder.setSourceDnInfo(PBHelperClient.convert(sourceDnInfo)); - DatanodeInfo[] targetDnInfos = blkMovingInfo.getTargets(); - builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos)); + DatanodeInfo targetDnInfo = blkMovingInfo.getTarget(); + builder.setTargetDnInfo(PBHelperClient.convert(targetDnInfo)); - StorageType[] sourceStorageTypes = blkMovingInfo.getSourceStorageTypes(); - builder.setSourceStorageTypes(convertStorageTypesProto(sourceStorageTypes)); + StorageType sourceStorageType = blkMovingInfo.getSourceStorageType(); + builder.setSourceStorageType( + PBHelperClient.convertStorageType(sourceStorageType)); - StorageType[] targetStorageTypes = blkMovingInfo.getTargetStorageTypes(); - builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes)); + StorageType targetStorageType = blkMovingInfo.getTargetStorageType(); + builder.setTargetStorageType( + PBHelperClient.convertStorageType(targetStorageType)); return builder.build(); } @@ -1246,42 +1214,38 @@ public class PBHelper { private static DatanodeCommand convert( BlockStorageMovementCommandProto blkStorageMovementCmdProto) { Collection blockMovingInfos = new ArrayList<>(); - List blkSPSatisfyList = - blkStorageMovementCmdProto.getBlockStorageMovementList(); - for (BlockStorageMovementProto blkSPSatisfy : blkSPSatisfyList) { + List blkSPSatisfyList = + blkStorageMovementCmdProto.getBlockMovingInfoList(); + for (BlockMovingInfoProto blkSPSatisfy : blkSPSatisfyList) { blockMovingInfos.add(convertBlockMovingInfo(blkSPSatisfy)); } return new BlockStorageMovementCommand( DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT, - blkStorageMovementCmdProto.getTrackID(), blkStorageMovementCmdProto.getBlockPoolId(), blockMovingInfos); } private static BlockMovingInfo convertBlockMovingInfo( - BlockStorageMovementProto blockStoragePolicySatisfyProto) { - BlockProto blockProto = blockStoragePolicySatisfyProto.getBlock(); + BlockMovingInfoProto blockStorageMovingInfoProto) { + BlockProto blockProto = blockStorageMovingInfoProto.getBlock(); Block block = PBHelperClient.convert(blockProto); - DatanodeInfosProto sourceDnInfosProto = blockStoragePolicySatisfyProto - .getSourceDnInfos(); - DatanodeInfo[] sourceDnInfos = PBHelperClient.convert(sourceDnInfosProto); - - DatanodeInfosProto targetDnInfosProto = blockStoragePolicySatisfyProto - .getTargetDnInfos(); - DatanodeInfo[] targetDnInfos = PBHelperClient.convert(targetDnInfosProto); - - StorageTypesProto srcStorageTypesProto = blockStoragePolicySatisfyProto - .getSourceStorageTypes(); - StorageType[] srcStorageTypes = PBHelperClient.convertStorageTypes( - srcStorageTypesProto.getStorageTypesList(), - srcStorageTypesProto.getStorageTypesList().size()); - - StorageTypesProto targetStorageTypesProto = blockStoragePolicySatisfyProto - .getTargetStorageTypes(); - StorageType[] targetStorageTypes = PBHelperClient.convertStorageTypes( - targetStorageTypesProto.getStorageTypesList(), - targetStorageTypesProto.getStorageTypesList().size()); - return new BlockMovingInfo(block, sourceDnInfos, targetDnInfos, - srcStorageTypes, targetStorageTypes); + DatanodeInfoProto sourceDnInfoProto = blockStorageMovingInfoProto + .getSourceDnInfo(); + DatanodeInfo sourceDnInfo = PBHelperClient.convert(sourceDnInfoProto); + + DatanodeInfoProto targetDnInfoProto = blockStorageMovingInfoProto + .getTargetDnInfo(); + DatanodeInfo targetDnInfo = PBHelperClient.convert(targetDnInfoProto); + StorageTypeProto srcStorageTypeProto = blockStorageMovingInfoProto + .getSourceStorageType(); + StorageType srcStorageType = PBHelperClient + .convertStorageType(srcStorageTypeProto); + + StorageTypeProto targetStorageTypeProto = blockStorageMovingInfoProto + .getTargetStorageType(); + StorageType targetStorageType = PBHelperClient + .convertStorageType(targetStorageTypeProto); + return new BlockMovingInfo(block, sourceDnInfo, targetDnInfo, + srcStorageType, targetStorageType); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/776af85f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 0311b02..f9a76b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementInfosBatch; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; @@ -212,7 +211,7 @@ public class DatanodeDescriptor extends DatanodeInfo { * A queue of blocks corresponding to trackID for moving its storage * placements by this datanode. */ - private final Queue storageMovementBlocks = + private final Queue storageMovementBlocks = new LinkedList<>(); private volatile boolean dropSPSWork = false; @@ -1079,30 +1078,45 @@ public class DatanodeDescriptor extends DatanodeInfo { /** * Add the block infos which needs to move its storage locations. * - * @param trackID - * - unique identifier which will be used for tracking the given set - * of blocks movement completion. - * @param storageMismatchedBlocks - * - storage mismatched block infos + * @param blkMovingInfo + * - storage mismatched block info */ - public void addBlocksToMoveStorage(long trackID, - List storageMismatchedBlocks) { + public void addBlocksToMoveStorage(BlockMovingInfo blkMovingInfo) { synchronized (storageMovementBlocks) { - storageMovementBlocks.offer( - new BlockStorageMovementInfosBatch(trackID, storageMismatchedBlocks)); + storageMovementBlocks.offer(blkMovingInfo); } } /** - * @return block infos which needs to move its storage locations. This returns - * list of blocks under one trackId. + * Return the number of blocks queued up for movement. */ - public BlockStorageMovementInfosBatch getBlocksToMoveStorages() { + public int getNumberOfBlocksToMoveStorages() { + return storageMovementBlocks.size(); + } + + /** + * Get the blocks to move to satisfy the storage media type. + * + * @param numBlocksToMoveTasks + * total number of blocks which will be send to this datanode for + * block movement. + * + * @return block infos which needs to move its storage locations. + */ + public BlockMovingInfo[] getBlocksToMoveStorages(int numBlocksToMoveTasks) { synchronized (storageMovementBlocks) { - // TODO: Presently returning the list of blocks under one trackId. - // Need to limit the list of items into small batches with in trackId - // itself if blocks are many(For example: a file contains many blocks). - return storageMovementBlocks.poll(); + List blockMovingInfos = new ArrayList<>(); + for (; !storageMovementBlocks.isEmpty() + && numBlocksToMoveTasks > 0; numBlocksToMoveTasks--) { + blockMovingInfos.add(storageMovementBlocks.poll()); + } + BlockMovingInfo[] blkMoveArray = new BlockMovingInfo[blockMovingInfos + .size()]; + blkMoveArray = blockMovingInfos.toArray(blkMoveArray); + if (blkMoveArray.length > 0) { + return blkMoveArray; + } + return null; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/776af85f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 6187b37..ae5fe22 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -40,7 +40,6 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList; import org.apache.hadoop.hdfs.server.common.Util; -import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementInfosBatch; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; @@ -48,6 +47,7 @@ import org.apache.hadoop.hdfs.server.protocol.*; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock; +import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.*; import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException; @@ -208,6 +208,8 @@ public class DatanodeManager { */ private final long timeBetweenResendingCachingDirectivesMs; + private final boolean blocksToMoveShareEqualRatio; + DatanodeManager(final BlockManager blockManager, final Namesystem namesystem, final Configuration conf) throws IOException { this.namesystem = namesystem; @@ -332,6 +334,12 @@ public class DatanodeManager { this.blocksPerPostponedMisreplicatedBlocksRescan = conf.getLong( DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY, DFSConfigKeys.DFS_NAMENODE_BLOCKS_PER_POSTPONEDBLOCKS_RESCAN_KEY_DEFAULT); + + // SPS configuration to decide blocks to move can share equal ratio of + // maxtransfers with pending replica and erasure-coded reconstruction tasks + blocksToMoveShareEqualRatio = conf.getBoolean( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_KEY, + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SHARE_EQUAL_REPLICA_MAX_STREAMS_DEFAULT); } private static long getStaleIntervalFromConf(Configuration conf, @@ -1094,13 +1102,14 @@ public class DatanodeManager { // Sets dropSPSWork flag to true, to ensure that // DNA_DROP_SPS_WORK_COMMAND will send to datanode via next heartbeat // response immediately after the node registration. This is - // to avoid a situation, where multiple trackId responses coming from - // different co-odinator datanodes. After SPS monitor time out, it - // will retry the files which were scheduled to the disconnected(for - // long time more than heartbeat expiry) DN, by finding new - // co-ordinator datanode. Now, if the expired datanode reconnects back - // after SPS reschedules, it leads to get different movement results - // from reconnected and new DN co-ordinators. + // to avoid a situation, where multiple block attempt finished + // responses coming from different datanodes. After SPS monitor time + // out, it will retry the files which were scheduled to the + // disconnected(for long time more than heartbeat expiry) DN, by + // finding new datanode. Now, if the expired datanode reconnects back + // after SPS reschedules, it leads to get different movement attempt + // finished report from reconnected and newly datanode which is + // attempting the block movement. nodeS.setDropSPSWork(true); // resolve network location @@ -1680,19 +1689,47 @@ public class DatanodeManager { final List cmds = new ArrayList<>(); // Allocate _approximately_ maxTransfers pending tasks to DataNode. // NN chooses pending tasks based on the ratio between the lengths of - // replication and erasure-coded block queues. + // replication, erasure-coded block queues and block storage movement + // queues. int totalReplicateBlocks = nodeinfo.getNumberOfReplicateBlocks(); int totalECBlocks = nodeinfo.getNumberOfBlocksToBeErasureCoded(); + int totalBlocksToMove = nodeinfo.getNumberOfBlocksToMoveStorages(); int totalBlocks = totalReplicateBlocks + totalECBlocks; - if (totalBlocks > 0) { - int numReplicationTasks = (int) Math.ceil( - (double) (totalReplicateBlocks * maxTransfers) / totalBlocks); - int numECTasks = (int) Math.ceil( - (double) (totalECBlocks * maxTransfers) / totalBlocks); - + if (totalBlocks > 0 || totalBlocksToMove > 0) { + int numReplicationTasks = 0; + int numECTasks = 0; + int numBlocksToMoveTasks = 0; + // Check blocksToMoveShareEqualRatio configuration is true/false. If true, + // then equally sharing the max transfer. Otherwise gives high priority to + // the pending_replica/erasure-coded tasks and only the delta streams will + // be used for blocks to move tasks. + if (blocksToMoveShareEqualRatio) { + // add blocksToMove count to total blocks so that will get equal share + totalBlocks = totalBlocks + totalBlocksToMove; + numReplicationTasks = (int) Math + .ceil((double) (totalReplicateBlocks * maxTransfers) / totalBlocks); + numECTasks = (int) Math + .ceil((double) (totalECBlocks * maxTransfers) / totalBlocks); + numBlocksToMoveTasks = (int) Math + .ceil((double) (totalBlocksToMove * maxTransfers) / totalBlocks); + } else { + // Calculate the replica and ec tasks, then pick blocksToMove if there + // is any streams available. + numReplicationTasks = (int) Math + .ceil((double) (totalReplicateBlocks * maxTransfers) / totalBlocks); + numECTasks = (int) Math + .ceil((double) (totalECBlocks * maxTransfers) / totalBlocks); + int numTasks = numReplicationTasks + numECTasks; + if (numTasks < maxTransfers) { + int remainingMaxTransfers = maxTransfers - numTasks; + numBlocksToMoveTasks = Math.min(totalBlocksToMove, + remainingMaxTransfers); + } + } if (LOG.isDebugEnabled()) { LOG.debug("Pending replication tasks: " + numReplicationTasks - + " erasure-coded tasks: " + numECTasks); + + " erasure-coded tasks: " + numECTasks + " blocks to move tasks: " + + numBlocksToMoveTasks); } // check pending replication tasks List pendingList = nodeinfo.getReplicationCommand( @@ -1708,6 +1745,23 @@ public class DatanodeManager { cmds.add(new BlockECReconstructionCommand( DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList)); } + // check pending block storage movement tasks + if (nodeinfo.shouldDropSPSWork()) { + cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND); + // Set back to false to indicate that the new value has been sent to the + // datanode. + nodeinfo.setDropSPSWork(false); + } else { + // Get pending block storage movement tasks + BlockMovingInfo[] blkStorageMovementInfos = nodeinfo + .getBlocksToMoveStorages(numBlocksToMoveTasks); + + if (blkStorageMovementInfos != null) { + cmds.add(new BlockStorageMovementCommand( + DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT, blockPoolId, + Arrays.asList(blkStorageMovementInfos))); + } + } } // check block invalidation @@ -1751,24 +1805,6 @@ public class DatanodeManager { } } - if (nodeinfo.shouldDropSPSWork()) { - cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND); - // Set back to false to indicate that the new value has been sent to the - // datanode. - nodeinfo.setDropSPSWork(false); - } - - // check pending block storage movement tasks - BlockStorageMovementInfosBatch blkStorageMovementInfosBatch = nodeinfo - .getBlocksToMoveStorages(); - - if (blkStorageMovementInfosBatch != null) { - cmds.add(new BlockStorageMovementCommand( - DatanodeProtocol.DNA_BLOCK_STORAGE_MOVEMENT, - blkStorageMovementInfosBatch.getTrackID(), blockPoolId, - blkStorageMovementInfosBatch.getBlockMovingInfo())); - } - if (!cmds.isEmpty()) { return cmds.toArray(new DatanodeCommand[cmds.size()]); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/776af85f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 9308471..1656b16 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -799,8 +799,7 @@ class BPOfferService { LOG.info("DatanodeCommand action: DNA_BLOCK_STORAGE_MOVEMENT"); BlockStorageMovementCommand blkSPSCmd = (BlockStorageMovementCommand) cmd; dn.getStoragePolicySatisfyWorker().processBlockMovingTasks( - blkSPSCmd.getTrackID(), blkSPSCmd.getBlockPoolId(), - blkSPSCmd.getBlockMovingTasks()); + blkSPSCmd.getBlockPoolId(), blkSPSCmd.getBlockMovingTasks()); break; case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND: LOG.info("DatanodeCommand action: DNA_DROP_SPS_WORK_COMMAND"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/776af85f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index f537f49..b7beda4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.client.BlockReportOptions; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -50,7 +51,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -513,8 +514,11 @@ class BPServiceActor implements Runnable { SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) : SlowDiskReports.EMPTY_REPORT; - BlocksStorageMovementResult[] blksMovementResults = - getBlocksMovementResults(); + // Get the blocks storage move attempt finished blocks + List results = dn.getStoragePolicySatisfyWorker() + .getBlocksMovementsStatusHandler().getMoveAttemptFinishedBlocks(); + BlocksStorageMoveAttemptFinished storageMoveAttemptFinishedBlks = + getStorageMoveAttemptFinishedBlocks(results); HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration, reports, @@ -527,7 +531,7 @@ class BPServiceActor implements Runnable { requestBlockReportLease, slowPeers, slowDisks, - blksMovementResults); + storageMoveAttemptFinishedBlks); if (outliersReportDue) { // If the report was due and successfully sent, schedule the next one. @@ -537,20 +541,23 @@ class BPServiceActor implements Runnable { // Remove the blocks movement results after successfully transferring // to namenode. dn.getStoragePolicySatisfyWorker().getBlocksMovementsStatusHandler() - .remove(blksMovementResults); + .remove(results); return response; } - private BlocksStorageMovementResult[] getBlocksMovementResults() { - List trackIdVsMovementStatus = dn - .getStoragePolicySatisfyWorker().getBlocksMovementsStatusHandler() - .getBlksMovementResults(); - BlocksStorageMovementResult[] blksMovementResult = - new BlocksStorageMovementResult[trackIdVsMovementStatus.size()]; - trackIdVsMovementStatus.toArray(blksMovementResult); + private BlocksStorageMoveAttemptFinished getStorageMoveAttemptFinishedBlocks( + List finishedBlks) { - return blksMovementResult; + if (finishedBlks.isEmpty()) { + return null; + } + + // Create BlocksStorageMoveAttemptFinished with currently finished + // blocks + Block[] blockList = new Block[finishedBlks.size()]; + finishedBlks.toArray(blockList); + return new BlocksStorageMoveAttemptFinished(blockList); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/hadoop/blob/776af85f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java index f3d2bb6..b3b9fd9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java @@ -21,14 +21,14 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementAttemptFinished; import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsStatusHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,12 +41,12 @@ import org.slf4j.LoggerFactory; public class BlockStorageMovementTracker implements Runnable { private static final Logger LOG = LoggerFactory .getLogger(BlockStorageMovementTracker.class); - private final CompletionService moverCompletionService; + private final CompletionService moverCompletionService; private final BlocksMovementsStatusHandler blksMovementsStatusHandler; - // Keeps the information - trackID vs its list of blocks - private final Map>> moverTaskFutures; - private final Map> movementResults; + // Keeps the information - block vs its list of future move tasks + private final Map>> moverTaskFutures; + private final Map> movementResults; private volatile boolean running = true; @@ -59,7 +59,7 @@ public class BlockStorageMovementTracker implements Runnable { * blocks movements status handler */ public BlockStorageMovementTracker( - CompletionService moverCompletionService, + CompletionService moverCompletionService, BlocksMovementsStatusHandler handler) { this.moverCompletionService = moverCompletionService; this.moverTaskFutures = new HashMap<>(); @@ -82,32 +82,33 @@ public class BlockStorageMovementTracker implements Runnable { } } try { - Future future = moverCompletionService.take(); + Future future = + moverCompletionService.take(); if (future != null) { - BlockMovementResult result = future.get(); + BlockMovementAttemptFinished result = future.get(); LOG.debug("Completed block movement. {}", result); - long trackId = result.getTrackId(); - List> blocksMoving = moverTaskFutures - .get(trackId); + Block block = result.getBlock(); + List> blocksMoving = + moverTaskFutures.get(block); if (blocksMoving == null) { - LOG.warn("Future task doesn't exist for trackId " + trackId); + LOG.warn("Future task doesn't exist for block : {} ", block); continue; } blocksMoving.remove(future); - List resultPerTrackIdList = - addMovementResultToTrackIdList(result); + List resultPerTrackIdList = + addMovementResultToBlockIdList(result); // Completed all the scheduled blocks movement under this 'trackId'. - if (blocksMoving.isEmpty() || moverTaskFutures.get(trackId) == null) { + if (blocksMoving.isEmpty() || moverTaskFutures.get(block) == null) { synchronized (moverTaskFutures) { - moverTaskFutures.remove(trackId); + moverTaskFutures.remove(block); } if (running) { // handle completed or inprogress blocks movements per trackId. blksMovementsStatusHandler.handle(resultPerTrackIdList); } - movementResults.remove(trackId); + movementResults.remove(block); } } } catch (InterruptedException e) { @@ -123,38 +124,39 @@ public class BlockStorageMovementTracker implements Runnable { } } - private List addMovementResultToTrackIdList( - BlockMovementResult result) { - long trackId = result.getTrackId(); - List perTrackIdList; + private List addMovementResultToBlockIdList( + BlockMovementAttemptFinished result) { + Block block = result.getBlock(); + List perBlockIdList; synchronized (movementResults) { - perTrackIdList = movementResults.get(trackId); - if (perTrackIdList == null) { - perTrackIdList = new ArrayList<>(); - movementResults.put(trackId, perTrackIdList); + perBlockIdList = movementResults.get(block); + if (perBlockIdList == null) { + perBlockIdList = new ArrayList<>(); + movementResults.put(block, perBlockIdList); } - perTrackIdList.add(result); + perBlockIdList.add(result); } - return perTrackIdList; + return perBlockIdList; } /** * Add future task to the tracking list to check the completion status of the * block movement. * - * @param trackID - * tracking Id + * @param blockID + * block identifier * @param futureTask * future task used for moving the respective block */ - void addBlock(long trackID, Future futureTask) { + void addBlock(Block block, + Future futureTask) { synchronized (moverTaskFutures) { - List> futures = moverTaskFutures - .get(Long.valueOf(trackID)); + List> futures = + moverTaskFutures.get(block); // null for the first task if (futures == null) { futures = new ArrayList<>(); - moverTaskFutures.put(trackID, futures); + moverTaskFutures.put(block, futures); } futures.add(futureTask); // Notify waiting tracker thread about the newly added tasks. @@ -175,16 +177,6 @@ public class BlockStorageMovementTracker implements Runnable { } /** - * @return the list of trackIds which are still waiting to complete all the - * scheduled blocks movements. - */ - Set getInProgressTrackIds() { - synchronized (moverTaskFutures) { - return moverTaskFutures.keySet(); - } - } - - /** * Sets running flag to false and clear the pending movement result queues. */ public void stopTracking() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/776af85f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java index 4e57805..47318f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.datanode; import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; -import static org.apache.hadoop.util.Time.monotonicNow; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; @@ -32,9 +31,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; -import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; @@ -62,7 +59,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; @@ -89,14 +85,11 @@ public class StoragePolicySatisfyWorker { private final int moverThreads; private final ExecutorService moveExecutor; - private final CompletionService moverCompletionService; + private final CompletionService moverCompletionService; private final BlocksMovementsStatusHandler handler; private final BlockStorageMovementTracker movementTracker; private Daemon movementTrackerThread; - private long inprogressTrackIdsCheckInterval = 30 * 1000; // 30seconds. - private long nextInprogressRecheckTime; - public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) { this.datanode = datanode; this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf); @@ -111,16 +104,6 @@ public class StoragePolicySatisfyWorker { movementTrackerThread = new Daemon(movementTracker); movementTrackerThread.setName("BlockStorageMovementTracker"); - // Interval to check that the inprogress trackIds. The time interval is - // proportional o the heart beat interval time period. - final long heartbeatIntervalSeconds = conf.getTimeDuration( - DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, - DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS); - inprogressTrackIdsCheckInterval = 5 * heartbeatIntervalSeconds; - // update first inprogress recheck time to a future time stamp. - nextInprogressRecheckTime = monotonicNow() - + inprogressTrackIdsCheckInterval; - // TODO: Needs to manage the number of concurrent moves per DataNode. } @@ -186,30 +169,26 @@ public class StoragePolicySatisfyWorker { * separate thread. Each task will move the block replica to the target node & * wait for the completion. * - * @param trackID - * unique tracking identifier - * @param blockPoolID - * block pool ID + * @param blockPoolID block pool identifier + * * @param blockMovingInfos * list of blocks to be moved */ - public void processBlockMovingTasks(long trackID, String blockPoolID, - Collection blockMovingInfos) { + public void processBlockMovingTasks(final String blockPoolID, + final Collection blockMovingInfos) { LOG.debug("Received BlockMovingTasks {}", blockMovingInfos); for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { - assert blkMovingInfo.getSources().length == blkMovingInfo - .getTargets().length; - for (int i = 0; i < blkMovingInfo.getSources().length; i++) { - DatanodeInfo target = blkMovingInfo.getTargets()[i]; - BlockMovingTask blockMovingTask = new BlockMovingTask( - trackID, blockPoolID, blkMovingInfo.getBlock(), - blkMovingInfo.getSources()[i], target, - blkMovingInfo.getSourceStorageTypes()[i], - blkMovingInfo.getTargetStorageTypes()[i]); - Future moveCallable = moverCompletionService - .submit(blockMovingTask); - movementTracker.addBlock(trackID, moveCallable); - } + StorageType sourceStorageType = blkMovingInfo.getSourceStorageType(); + StorageType targetStorageType = blkMovingInfo.getTargetStorageType(); + assert sourceStorageType != targetStorageType + : "Source and Target storage type shouldn't be same!"; + BlockMovingTask blockMovingTask = new BlockMovingTask(blockPoolID, + blkMovingInfo.getBlock(), blkMovingInfo.getSource(), + blkMovingInfo.getTarget(), sourceStorageType, targetStorageType); + Future moveCallable = moverCompletionService + .submit(blockMovingTask); + movementTracker.addBlock(blkMovingInfo.getBlock(), + moveCallable); } } @@ -217,8 +196,7 @@ public class StoragePolicySatisfyWorker { * This class encapsulates the process of moving the block replica to the * given target and wait for the response. */ - private class BlockMovingTask implements Callable { - private final long trackID; + private class BlockMovingTask implements Callable { private final String blockPoolID; private final Block block; private final DatanodeInfo source; @@ -226,10 +204,9 @@ public class StoragePolicySatisfyWorker { private final StorageType srcStorageType; private final StorageType targetStorageType; - BlockMovingTask(long trackID, String blockPoolID, Block block, + BlockMovingTask(String blockPoolID, Block block, DatanodeInfo source, DatanodeInfo target, StorageType srcStorageType, StorageType targetStorageType) { - this.trackID = trackID; this.blockPoolID = blockPoolID; this.block = block; this.source = source; @@ -239,23 +216,26 @@ public class StoragePolicySatisfyWorker { } @Override - public BlockMovementResult call() { + public BlockMovementAttemptFinished call() { BlockMovementStatus status = moveBlock(); - return new BlockMovementResult(trackID, block.getBlockId(), target, - status); + return new BlockMovementAttemptFinished(block, source, target, status); } private BlockMovementStatus moveBlock() { LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy " - + "storageType, sourceStoragetype:{} and destinStoragetype:{}", + + "storageType, sourceStoragetype:{} and destinStoragetype:{}", block, source, target, srcStorageType, targetStorageType); Socket sock = null; DataOutputStream out = null; DataInputStream in = null; try { + datanode.incrementXmitsInProgress(); + ExtendedBlock extendedBlock = new ExtendedBlock(blockPoolID, block); DNConf dnConf = datanode.getDnConf(); - String dnAddr = target.getXferAddr(dnConf.getConnectToDnViaHostname()); + + String dnAddr = datanode.getDatanodeId() + .getXferAddr(dnConf.getConnectToDnViaHostname()); sock = datanode.newSocket(); NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), dnConf.getSocketTimeout()); @@ -297,9 +277,10 @@ public class StoragePolicySatisfyWorker { LOG.warn( "Failed to move block:{} from src:{} to destin:{} to satisfy " + "storageType:{}", - block, source, target, targetStorageType, e); + block, source, target, targetStorageType, e); return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE; } finally { + datanode.decrementXmitsInProgress(); IOUtils.closeStream(out); IOUtils.closeStream(in); IOUtils.closeSocket(sock); @@ -357,29 +338,25 @@ public class StoragePolicySatisfyWorker { } /** - * This class represents result from a block movement task. This will have the + * This class represents status from a block movement task. This will have the * information of the task which was successful or failed due to errors. */ - static class BlockMovementResult { - private final long trackId; - private final long blockId; + static class BlockMovementAttemptFinished { + private final Block block; + private final DatanodeInfo src; private final DatanodeInfo target; private final BlockMovementStatus status; - BlockMovementResult(long trackId, long blockId, + BlockMovementAttemptFinished(Block block, DatanodeInfo src, DatanodeInfo target, BlockMovementStatus status) { - this.trackId = trackId; - this.blockId = blockId; + this.block = block; + this.src = src; this.target = target; this.status = status; } - long getTrackId() { - return trackId; - } - - long getBlockId() { - return blockId; + Block getBlock() { + return block; } BlockMovementStatus getStatus() { @@ -388,99 +365,79 @@ public class StoragePolicySatisfyWorker { @Override public String toString() { - return new StringBuilder().append("Block movement result(\n ") - .append("track id: ").append(trackId).append(" block id: ") - .append(blockId).append(" target node: ").append(target) + return new StringBuilder().append("Block movement attempt finished(\n ") + .append(" block : ") + .append(block).append(" src node: ").append(src) + .append(" target node: ").append(target) .append(" movement status: ").append(status).append(")").toString(); } } /** * Blocks movements status handler, which is used to collect details of the - * completed or inprogress list of block movements and this status(success or - * failure or inprogress) will be send to the namenode via heartbeat. + * completed block movements and it will send these attempted finished(with + * success or failure) blocks to the namenode via heartbeat. */ - class BlocksMovementsStatusHandler { - private final List trackIdVsMovementStatus = + public static class BlocksMovementsStatusHandler { + private final List blockIdVsMovementStatus = new ArrayList<>(); /** - * Collect all the block movement results. Later this will be send to - * namenode via heart beat. + * Collect all the storage movement attempt finished blocks. Later this will + * be send to namenode via heart beat. * - * @param results - * result of all the block movements per trackId + * @param moveAttemptFinishedBlks + * set of storage movement attempt finished blocks */ - void handle(List resultsPerTrackId) { - BlocksStorageMovementResult.Status status = - BlocksStorageMovementResult.Status.SUCCESS; - long trackId = -1; - for (BlockMovementResult blockMovementResult : resultsPerTrackId) { - trackId = blockMovementResult.getTrackId(); - if (blockMovementResult.status == - BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE) { - status = BlocksStorageMovementResult.Status.FAILURE; - // If any of the block movement is failed, then mark as failure so - // that namenode can take a decision to retry the blocks associated to - // the given trackId. - break; - } - } + void handle(List moveAttemptFinishedBlks) { + List blocks = new ArrayList<>(); - // Adding to the tracking results list. Later this will be send to + for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) { + blocks.add(item.getBlock()); + } + // Adding to the tracking report list. Later this will be send to // namenode via datanode heartbeat. - synchronized (trackIdVsMovementStatus) { - trackIdVsMovementStatus.add( - new BlocksStorageMovementResult(trackId, status)); + synchronized (blockIdVsMovementStatus) { + blockIdVsMovementStatus.addAll(blocks); } } /** - * @return unmodifiable list of blocks storage movement results. + * @return unmodifiable list of storage movement attempt finished blocks. */ - List getBlksMovementResults() { - List movementResults = new ArrayList<>(); - // 1. Adding all the completed trackids. - synchronized (trackIdVsMovementStatus) { - if (trackIdVsMovementStatus.size() > 0) { - movementResults = Collections - .unmodifiableList(trackIdVsMovementStatus); + List getMoveAttemptFinishedBlocks() { + List moveAttemptFinishedBlks = new ArrayList<>(); + // 1. Adding all the completed block ids. + synchronized (blockIdVsMovementStatus) { + if (blockIdVsMovementStatus.size() > 0) { + moveAttemptFinishedBlks = Collections + .unmodifiableList(blockIdVsMovementStatus); } } - // 2. Adding the in progress track ids after those which are completed. - Set inProgressTrackIds = getInProgressTrackIds(); - for (Long trackId : inProgressTrackIds) { - movementResults.add(new BlocksStorageMovementResult(trackId, - BlocksStorageMovementResult.Status.IN_PROGRESS)); - } - return movementResults; + return moveAttemptFinishedBlks; } /** - * Remove the blocks storage movement results. + * Remove the storage movement attempt finished blocks from the tracking + * list. * - * @param results - * set of blocks storage movement results + * @param moveAttemptFinishedBlks + * set of storage movement attempt finished blocks */ - void remove(BlocksStorageMovementResult[] results) { - if (results != null) { - synchronized (trackIdVsMovementStatus) { - for (BlocksStorageMovementResult blocksMovementResult : results) { - trackIdVsMovementStatus.remove(blocksMovementResult); - } - } + void remove(List moveAttemptFinishedBlks) { + if (moveAttemptFinishedBlks != null) { + blockIdVsMovementStatus.removeAll(moveAttemptFinishedBlks); } } /** - * Clear the trackID vs movement status tracking map. + * Clear the blockID vs movement status tracking map. */ void removeAll() { - synchronized (trackIdVsMovementStatus) { - trackIdVsMovementStatus.clear(); + synchronized (blockIdVsMovementStatus) { + blockIdVsMovementStatus.clear(); } } - } @VisibleForTesting @@ -498,23 +455,4 @@ public class StoragePolicySatisfyWorker { movementTracker.removeAll(); handler.removeAll(); } - - /** - * Gets list of trackids which are inprogress. Will do collection periodically - * on 'dfs.datanode.storage.policy.satisfier.worker.inprogress.recheck.time. - * millis' interval. - * - * @return collection of trackids which are inprogress - */ - private Set getInProgressTrackIds() { - Set trackIds = new HashSet<>(); - long now = monotonicNow(); - if (nextInprogressRecheckTime >= now) { - trackIds = movementTracker.getInProgressTrackIds(); - - // schedule next re-check interval - nextInprogressRecheckTime = now + inprogressTrackIdsCheckInterval; - } - return trackIds; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/776af85f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java index 549819f..cc5b63a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java @@ -22,15 +22,12 @@ import static org.apache.hadoop.util.Time.monotonicNow; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.AttemptedItemInfo; import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,14 +35,12 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; /** - * A monitor class for checking whether block storage movements finished or not. - * If block storage movement results from datanode indicates about the movement - * success, then it will just remove the entries from tracking. If it reports - * failure, then it will add back to needed block storage movements list. If it - * reports in_progress, that means the blocks movement is in progress and the - * coordinator is still tracking the movement. If no DN reports about movement - * for longer time, then such items will be retries automatically after timeout. - * The default timeout would be 30mins. + * A monitor class for checking whether block storage movements attempt + * completed or not. If this receives block storage movement attempt + * status(either success or failure) from DN then it will just remove the + * entries from tracking. If there is no DN reports about movement attempt + * finished for a longer time period, then such items will retries automatically + * after timeout. The default timeout would be 5 minutes. */ public class BlockStorageMovementAttemptedItems { private static final Logger LOG = @@ -55,37 +50,34 @@ public class BlockStorageMovementAttemptedItems { * A map holds the items which are already taken for blocks movements * processing and sent to DNs. */ - private final Map storageMovementAttemptedItems; - private final List storageMovementAttemptedResults; + private final List storageMovementAttemptedItems; + private final List movementFinishedBlocks; private volatile boolean monitorRunning = true; private Daemon timerThread = null; - private final StoragePolicySatisfier sps; // - // It might take anywhere between 20 to 60 minutes before + // It might take anywhere between 5 to 10 minutes before // a request is timed out. // - private long selfRetryTimeout = 20 * 60 * 1000; + private long selfRetryTimeout = 5 * 60 * 1000; // - // It might take anywhere between 5 to 10 minutes before + // It might take anywhere between 1 to 2 minutes before // a request is timed out. // - private long minCheckTimeout = 5 * 60 * 1000; // minimum value + private long minCheckTimeout = 1 * 60 * 1000; // minimum value private BlockStorageMovementNeeded blockStorageMovementNeeded; public BlockStorageMovementAttemptedItems(long recheckTimeout, long selfRetryTimeout, - BlockStorageMovementNeeded unsatisfiedStorageMovementFiles, - StoragePolicySatisfier sps) { + BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) { if (recheckTimeout > 0) { this.minCheckTimeout = Math.min(minCheckTimeout, recheckTimeout); } this.selfRetryTimeout = selfRetryTimeout; this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles; - storageMovementAttemptedItems = new HashMap<>(); - storageMovementAttemptedResults = new ArrayList<>(); - this.sps = sps; + storageMovementAttemptedItems = new ArrayList<>(); + movementFinishedBlocks = new ArrayList<>(); } /** @@ -94,33 +86,26 @@ public class BlockStorageMovementAttemptedItems { * * @param itemInfo * - tracking info - * @param allBlockLocsAttemptedToSatisfy - * - failed to find matching target nodes to satisfy storage type - * for all the block locations of the given blockCollectionID */ - public void add(ItemInfo itemInfo, boolean allBlockLocsAttemptedToSatisfy) { + public void add(AttemptedItemInfo itemInfo) { synchronized (storageMovementAttemptedItems) { - AttemptedItemInfo attemptedItemInfo = new AttemptedItemInfo( - itemInfo.getStartId(), itemInfo.getTrackId(), monotonicNow(), - allBlockLocsAttemptedToSatisfy); - storageMovementAttemptedItems.put(itemInfo.getTrackId(), - attemptedItemInfo); + storageMovementAttemptedItems.add(itemInfo); } } /** - * Add the trackIDBlocksStorageMovementResults to - * storageMovementAttemptedResults. + * Add the storage movement attempt finished blocks to + * storageMovementFinishedBlocks. * - * @param blksMovementResults + * @param moveAttemptFinishedBlks + * storage movement attempt finished blocks */ - public void addResults(BlocksStorageMovementResult[] blksMovementResults) { - if (blksMovementResults.length == 0) { + public void addReportedMovedBlocks(Block[] moveAttemptFinishedBlks) { + if (moveAttemptFinishedBlks.length == 0) { return; } - synchronized (storageMovementAttemptedResults) { - storageMovementAttemptedResults - .addAll(Arrays.asList(blksMovementResults)); + synchronized (movementFinishedBlocks) { + movementFinishedBlocks.addAll(Arrays.asList(moveAttemptFinishedBlks)); } } @@ -129,8 +114,8 @@ public class BlockStorageMovementAttemptedItems { */ public synchronized void start() { monitorRunning = true; - timerThread = new Daemon(new BlocksStorageMovementAttemptResultMonitor()); - timerThread.setName("BlocksStorageMovementAttemptResultMonitor"); + timerThread = new Daemon(new BlocksStorageMovementAttemptMonitor()); + timerThread.setName("BlocksStorageMovementAttemptMonitor"); timerThread.start(); } @@ -163,82 +148,22 @@ public class BlockStorageMovementAttemptedItems { } /** - * This class contains information of an attempted trackID. Information such - * as, (a)last attempted or reported time stamp, (b)whether all the blocks in - * the trackID were attempted and blocks movement has been scheduled to - * satisfy storage policy. This is used by - * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}. - */ - private final static class AttemptedItemInfo extends ItemInfo { - private long lastAttemptedOrReportedTime; - private final boolean allBlockLocsAttemptedToSatisfy; - - /** - * AttemptedItemInfo constructor. - * - * @param rootId - * rootId for trackId - * @param trackId - * trackId for file. - * @param lastAttemptedOrReportedTime - * last attempted or reported time - * @param allBlockLocsAttemptedToSatisfy - * whether all the blocks in the trackID were attempted and blocks - * movement has been scheduled to satisfy storage policy - */ - private AttemptedItemInfo(long rootId, long trackId, - long lastAttemptedOrReportedTime, - boolean allBlockLocsAttemptedToSatisfy) { - super(rootId, trackId); - this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime; - this.allBlockLocsAttemptedToSatisfy = allBlockLocsAttemptedToSatisfy; - } - - /** - * @return last attempted or reported time stamp. - */ - private long getLastAttemptedOrReportedTime() { - return lastAttemptedOrReportedTime; - } - - /** - * @return true/false. True value represents that, all the block locations - * under the trackID has found matching target nodes to satisfy - * storage policy. False value represents that, trackID needed - * retries to satisfy the storage policy for some of the block - * locations. - */ - private boolean isAllBlockLocsAttemptedToSatisfy() { - return allBlockLocsAttemptedToSatisfy; - } - - /** - * Update lastAttemptedOrReportedTime, so that the expiration time will be - * postponed to future. - */ - private void touchLastReportedTimeStamp() { - this.lastAttemptedOrReportedTime = monotonicNow(); - } - - } - - /** - * A monitor class for checking block storage movement result and long waiting - * items periodically. + * A monitor class for checking block storage movement attempt status and long + * waiting items periodically. */ - private class BlocksStorageMovementAttemptResultMonitor implements Runnable { + private class BlocksStorageMovementAttemptMonitor implements Runnable { @Override public void run() { while (monitorRunning) { try { - blockStorageMovementResultCheck(); + blockStorageMovementReportedItemsCheck(); blocksStorageMovementUnReportedItemsCheck(); Thread.sleep(minCheckTimeout); } catch (InterruptedException ie) { - LOG.info("BlocksStorageMovementAttemptResultMonitor thread " + LOG.info("BlocksStorageMovementAttemptMonitor thread " + "is interrupted.", ie); } catch (IOException ie) { - LOG.warn("BlocksStorageMovementAttemptResultMonitor thread " + LOG.warn("BlocksStorageMovementAttemptMonitor thread " + "received exception and exiting.", ie); } } @@ -248,29 +173,21 @@ public class BlockStorageMovementAttemptedItems { @VisibleForTesting void blocksStorageMovementUnReportedItemsCheck() { synchronized (storageMovementAttemptedItems) { - Iterator> iter = - storageMovementAttemptedItems.entrySet().iterator(); + Iterator iter = storageMovementAttemptedItems + .iterator(); long now = monotonicNow(); while (iter.hasNext()) { - Entry entry = iter.next(); - AttemptedItemInfo itemInfo = entry.getValue(); + AttemptedItemInfo itemInfo = iter.next(); if (now > itemInfo.getLastAttemptedOrReportedTime() + selfRetryTimeout) { - Long blockCollectionID = entry.getKey(); - synchronized (storageMovementAttemptedResults) { - if (!isExistInResult(blockCollectionID)) { - ItemInfo candidate = new ItemInfo( - itemInfo.getStartId(), blockCollectionID); - blockStorageMovementNeeded.add(candidate); - iter.remove(); - LOG.info("TrackID: {} becomes timed out and moved to needed " - + "retries queue for next iteration.", blockCollectionID); - } else { - LOG.info("Blocks storage movement results for the" - + " tracking id : " + blockCollectionID - + " is reported from one of the co-ordinating datanode." - + " So, the result will be processed soon."); - } + Long blockCollectionID = itemInfo.getTrackId(); + synchronized (movementFinishedBlocks) { + ItemInfo candidate = new ItemInfo(itemInfo.getStartId(), + blockCollectionID); + blockStorageMovementNeeded.add(candidate); + iter.remove(); + LOG.info("TrackID: {} becomes timed out and moved to needed " + + "retries queue for next iteration.", blockCollectionID); } } } @@ -278,118 +195,38 @@ public class BlockStorageMovementAttemptedItems { } } - private boolean isExistInResult(Long blockCollectionID) { - Iterator iter = storageMovementAttemptedResults - .iterator(); - while (iter.hasNext()) { - BlocksStorageMovementResult storageMovementAttemptedResult = iter.next(); - if (storageMovementAttemptedResult.getTrackId() == blockCollectionID) { - return true; - } - } - return false; - } - @VisibleForTesting - void blockStorageMovementResultCheck() throws IOException { - synchronized (storageMovementAttemptedResults) { - Iterator resultsIter = - storageMovementAttemptedResults.iterator(); - while (resultsIter.hasNext()) { - boolean isInprogress = false; - // TrackID need to be retried in the following cases: - // 1) All or few scheduled block(s) movement has been failed. - // 2) All the scheduled block(s) movement has been succeeded but there - // are unscheduled block(s) movement in this trackID. Say, some of - // the blocks in the trackID couldn't finding any matching target node - // for scheduling block movement in previous SPS iteration. - BlocksStorageMovementResult storageMovementAttemptedResult = resultsIter - .next(); + void blockStorageMovementReportedItemsCheck() throws IOException { + synchronized (movementFinishedBlocks) { + Iterator finishedBlksIter = movementFinishedBlocks.iterator(); + while (finishedBlksIter.hasNext()) { + Block blk = finishedBlksIter.next(); synchronized (storageMovementAttemptedItems) { - Status status = storageMovementAttemptedResult.getStatus(); - long trackId = storageMovementAttemptedResult.getTrackId(); - AttemptedItemInfo attemptedItemInfo = storageMovementAttemptedItems - .get(trackId); - // itemInfo is null means no root for trackId, using trackId only as - // root and handling it in - // blockStorageMovementNeeded#removeIteamTrackInfo() for cleaning - // the xAttr - ItemInfo itemInfo = new ItemInfo((attemptedItemInfo != null) - ? attemptedItemInfo.getStartId() : trackId, trackId); - switch (status) { - case FAILURE: - if (attemptedItemInfo != null) { - blockStorageMovementNeeded.add(itemInfo); - LOG.warn("Blocks storage movement results for the tracking id:" - + "{} is reported from co-ordinating datanode, but result" - + " status is FAILURE. So, added for retry", trackId); - } else { - LOG.info("Blocks storage movement is FAILURE for the track" - + " id {}. But the trackID doesn't exists in" - + " storageMovementAttemptedItems list.", trackId); - blockStorageMovementNeeded - .removeItemTrackInfo(itemInfo); - } - break; - case SUCCESS: - // ItemInfo could be null. One case is, before the blocks movements - // result arrives the attempted trackID became timed out and then - // removed the trackID from the storageMovementAttemptedItems list. - // TODO: Need to ensure that trackID is added to the - // 'blockStorageMovementNeeded' queue for retries to handle the - // following condition. If all the block locations under the trackID - // are attempted and failed to find matching target nodes to satisfy - // storage policy in previous SPS iteration. - String msg = "Blocks storage movement is SUCCESS for the track id: " - + trackId + " reported from co-ordinating datanode."; - if (attemptedItemInfo != null) { - if (!attemptedItemInfo.isAllBlockLocsAttemptedToSatisfy()) { - blockStorageMovementNeeded - .add(new ItemInfo(attemptedItemInfo.getStartId(), trackId)); - LOG.warn("{} But adding trackID back to retry queue as some of" - + " the blocks couldn't find matching target nodes in" - + " previous SPS iteration.", msg); - } else { - LOG.info(msg); - blockStorageMovementNeeded - .removeItemTrackInfo(itemInfo); - } - } else { - LOG.info("{} But the trackID doesn't exists in " - + "storageMovementAttemptedItems list", msg); + Iterator iterator = storageMovementAttemptedItems + .iterator(); + while (iterator.hasNext()) { + AttemptedItemInfo attemptedItemInfo = iterator.next(); + attemptedItemInfo.getBlocks().remove(blk); + if (attemptedItemInfo.getBlocks().isEmpty()) { + // TODO: try add this at front of the Queue, so that this element + // gets the chance first and can be cleaned from queue quickly as + // all movements already done. blockStorageMovementNeeded - .removeItemTrackInfo(itemInfo); - } - break; - case IN_PROGRESS: - isInprogress = true; - attemptedItemInfo = storageMovementAttemptedItems - .get(storageMovementAttemptedResult.getTrackId()); - if(attemptedItemInfo != null){ - // update the attempted expiration time to next cycle. - attemptedItemInfo.touchLastReportedTimeStamp(); + .add(new ItemInfo(attemptedItemInfo.getStartId(), + attemptedItemInfo.getTrackId())); + iterator.remove(); } - break; - default: - LOG.error("Unknown status: {}", status); - break; - } - // Remove trackID from the attempted list if the attempt has been - // completed(success or failure), if any. - if (!isInprogress) { - storageMovementAttemptedItems - .remove(storageMovementAttemptedResult.getTrackId()); } } - // Remove trackID from results as processed above. - resultsIter.remove(); + // Remove attempted blocks from movementFinishedBlocks list. + finishedBlksIter.remove(); } } } @VisibleForTesting - public int resultsCount() { - return storageMovementAttemptedResults.size(); + public int getMovementFinishedBlocksCount() { + return movementFinishedBlocks.size(); } @VisibleForTesting @@ -398,7 +235,7 @@ public class BlockStorageMovementAttemptedItems { } public void clearQueues() { - storageMovementAttemptedResults.clear(); + movementFinishedBlocks.clear(); storageMovementAttemptedItems.clear(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/776af85f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementInfosBatch.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementInfosBatch.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementInfosBatch.java deleted file mode 100644 index a790c13..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementInfosBatch.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.namenode; - -import java.util.List; - -import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; - -/** - * This class represents a batch of blocks under one trackId which needs to move - * its storage locations to satisfy the storage policy. - */ -public class BlockStorageMovementInfosBatch { - private long trackID; - private List blockMovingInfos; - - /** - * Constructor to create the block storage movement infos batch. - * - * @param trackID - * - unique identifier which will be used for tracking the given set - * of blocks movement. - * @param blockMovingInfos - * - list of block to storage infos. - */ - public BlockStorageMovementInfosBatch(long trackID, - List blockMovingInfos) { - this.trackID = trackID; - this.blockMovingInfos = blockMovingInfos; - } - - public long getTrackID() { - return trackID; - } - - public List getBlockMovingInfo() { - return blockMovingInfos; - } - - @Override - public String toString() { - return new StringBuilder().append("BlockStorageMovementInfosBatch(\n ") - .append("TrackID: ").append(trackID).append(" BlockMovingInfos: ") - .append(blockMovingInfos).append(")").toString(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/776af85f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index dfb5e27..5c2f99f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -266,7 +266,7 @@ import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger; import org.apache.hadoop.hdfs.server.namenode.top.TopConf; import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics; import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; +import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -3880,7 +3880,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, boolean requestFullBlockReportLease, @Nonnull SlowPeerReports slowPeers, @Nonnull SlowDiskReports slowDisks, - BlocksStorageMovementResult[] blksMovementResults) throws IOException { + BlocksStorageMoveAttemptFinished blksMovementsFinished) + throws IOException { readLock(); try { //get datanode commands @@ -3901,11 +3902,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, if (!sps.isRunning()) { if (LOG.isDebugEnabled()) { LOG.debug( - "Storage policy satisfier is not running. So, ignoring block " - + "storage movement results sent by co-ordinator datanode"); + "Storage policy satisfier is not running. So, ignoring storage" + + " movement attempt finished block info sent by DN"); } } else { - sps.handleBlocksStorageMovementResults(blksMovementResults); + sps.handleStorageMovementAttemptFinishedBlks(blksMovementsFinished); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org