From common-commits-return-86245-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Tue Jul 31 11:25:58 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 7058218067B for ; Tue, 31 Jul 2018 11:25:56 +0200 (CEST) Received: (qmail 76785 invoked by uid 500); 31 Jul 2018 09:25:45 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 75997 invoked by uid 99); 31 Jul 2018 09:25:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 31 Jul 2018 09:25:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 35627E1138; Tue, 31 Jul 2018 09:25:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ehiggs@apache.org To: common-commits@hadoop.apache.org Date: Tue, 31 Jul 2018 09:26:32 -0000 Message-Id: In-Reply-To: <4e533beaec11404caf0635c07bf3761a@git.apache.org> References: <4e533beaec11404caf0635c07bf3761a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [50/50] hadoop git commit: HDFS-13310. The DatanodeProtocol should have a DNA_BACKUP to backup blocks. Original patch contributed by Ewan Higgs. Followup work and fixed contributed by Virajith Jalaparthi. HDFS-13310. The DatanodeProtocol should have a DNA_BACKUP to backup blocks. Original patch contributed by Ewan Higgs. Followup work and fixed contributed by Virajith Jalaparthi. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d52a2afb Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d52a2afb Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d52a2afb Branch: refs/heads/HDFS-12090 Commit: d52a2afbf0d39b9de9d021e4e38f4ee993fa5fa0 Parents: 3e06a5d Author: Ewan Higgs Authored: Mon Jul 23 13:14:04 2018 +0200 Committer: Ewan Higgs Committed: Tue Jul 31 11:24:39 2018 +0200 ---------------------------------------------------------------------- .../BlockSyncTaskExecutionFeedback.java | 67 ++++++ .../protocol/SyncTaskExecutionOutcome.java | 25 +++ .../protocol/SyncTaskExecutionResult.java | 46 ++++ .../DatanodeProtocolClientSideTranslatorPB.java | 8 +- .../DatanodeProtocolServerSideTranslatorPB.java | 6 +- .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 208 ++++++++++++++++++- .../server/blockmanagement/DatanodeManager.java | 4 +- .../hdfs/server/datanode/BPServiceActor.java | 9 +- .../hdfs/server/namenode/FSNamesystem.java | 7 +- .../hdfs/server/namenode/NameNodeRpcServer.java | 7 +- .../hdfs/server/protocol/BlockSyncTask.java | 83 ++++++++ .../protocol/BulkSyncTaskExecutionFeedback.java | 36 ++++ .../hdfs/server/protocol/DatanodeProtocol.java | 22 +- .../hdfs/server/protocol/SyncCommand.java | 39 ++++ .../src/main/proto/DatanodeProtocol.proto | 88 +++++++- .../blockmanagement/TestDatanodeManager.java | 2 +- .../TestNameNodePrunesMissingStorages.java | 2 +- .../datanode/InternalDataNodeTestUtils.java | 3 +- .../server/datanode/TestBPOfferService.java | 5 +- .../hdfs/server/datanode/TestBlockRecovery.java | 4 +- .../server/datanode/TestDataNodeLifeline.java | 9 +- .../TestDatanodeProtocolRetryPolicy.java | 4 +- .../server/datanode/TestFsDatasetCache.java | 4 +- .../hdfs/server/datanode/TestStorageReport.java | 4 +- .../server/namenode/NNThroughputBenchmark.java | 8 +- .../hdfs/server/namenode/NameNodeAdapter.java | 5 +- .../hdfs/server/namenode/TestDeadDatanode.java | 4 +- 27 files changed, 662 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockSyncTaskExecutionFeedback.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockSyncTaskExecutionFeedback.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockSyncTaskExecutionFeedback.java new file mode 100644 index 0000000..2e5393e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockSyncTaskExecutionFeedback.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import java.util.UUID; + +/** + * Feedback for a BlockSyncTask. + */ +public class BlockSyncTaskExecutionFeedback { + + private UUID syncTaskId; + private SyncTaskExecutionOutcome outcome; + private SyncTaskExecutionResult result; + private String syncMountId; + + public BlockSyncTaskExecutionFeedback(UUID syncTaskId, + SyncTaskExecutionOutcome outcome, SyncTaskExecutionResult result, + String syncMountId) { + this.syncTaskId = syncTaskId; + this.outcome = outcome; + this.result = result; + this.syncMountId = syncMountId; + } + + public static BlockSyncTaskExecutionFeedback finishedSuccessfully( + UUID syncTaskId, String syncMountId, SyncTaskExecutionResult result) { + return new BlockSyncTaskExecutionFeedback(syncTaskId, + SyncTaskExecutionOutcome.FINISHED_SUCCESSFULLY, result, syncMountId); + } + + public static BlockSyncTaskExecutionFeedback failedWithException( + UUID syncTaskId, String syncMountId, Exception e) { + return new BlockSyncTaskExecutionFeedback(syncTaskId, + SyncTaskExecutionOutcome.EXCEPTION, null, syncMountId); + } + + public UUID getSyncTaskId() { + return syncTaskId; + } + + public SyncTaskExecutionOutcome getOutcome() { + return outcome; + } + + public SyncTaskExecutionResult getResult() { + return result; + } + + public String getSyncMountId() { + return syncMountId; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncTaskExecutionOutcome.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncTaskExecutionOutcome.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncTaskExecutionOutcome.java new file mode 100644 index 0000000..492575b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncTaskExecutionOutcome.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +/** + * SyncTaskExecutionOutcome is whether the SyncTask was successful or not. + */ +public enum SyncTaskExecutionOutcome { + FINISHED_SUCCESSFULLY, + EXCEPTION +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncTaskExecutionResult.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncTaskExecutionResult.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncTaskExecutionResult.java new file mode 100644 index 0000000..b623dc5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncTaskExecutionResult.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import java.nio.ByteBuffer; + +/** + * Result of a SyncTask. + */ +public class SyncTaskExecutionResult { + + /** result is the opaque byte stream result of a task. e.g. PartHandle */ + private ByteBuffer result; + private Long numberOfBytes; + + public SyncTaskExecutionResult(ByteBuffer result, Long numberOfBytes) { + this.result = result; + this.numberOfBytes = numberOfBytes; + } + + public static SyncTaskExecutionResult emptyResult() { + return new SyncTaskExecutionResult(ByteBuffer.wrap(new byte[0]), 0L); + } + + public ByteBuffer getResult() { + return result; + } + + public Long getNumberOfBytes() { + return numberOfBytes; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 9cc4516..20b314c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlock import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; +import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -138,7 +139,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements VolumeFailureSummary volumeFailureSummary, boolean requestFullBlockReportLease, @Nonnull SlowPeerReports slowPeers, - @Nonnull SlowDiskReports slowDisks) throws IOException { + @Nonnull SlowDiskReports slowDisks, + BulkSyncTaskExecutionFeedback feedback) throws IOException { HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder() .setRegistration(PBHelper.convert(registration)) .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount) @@ -161,6 +163,10 @@ public class DatanodeProtocolClientSideTranslatorPB implements if (slowDisks.haveSlowDisks()) { builder.addAllSlowDisks(PBHelper.convertSlowDiskInfo(slowDisks)); } + if(feedback != null && !feedback.getFeedbacks().isEmpty()) { + builder.setBulkSyncTaskExecutionFeedback(PBHelper.convert(feedback)); + } + HeartbeatResponseProto resp; try { resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java index 5cba284..a51ce85 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java @@ -109,6 +109,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements @Override public HeartbeatResponseProto sendHeartbeat(RpcController controller, HeartbeatRequestProto request) throws ServiceException { + HeartbeatResponse response; try { final StorageReport[] report = PBHelperClient.convertStorageReports( @@ -122,7 +123,10 @@ public class DatanodeProtocolServerSideTranslatorPB implements request.getXceiverCount(), request.getFailedVolumes(), volumeFailureSummary, request.getRequestFullBlockReportLease(), PBHelper.convertSlowPeerInfo(request.getSlowPeersList()), - PBHelper.convertSlowDiskInfo(request.getSlowDisksList())); + PBHelper.convertSlowDiskInfo(request.getSlowDisksList()), + PBHelper.convertBulkSyncTaskExecutionFeedback( + request.getBulkSyncTaskExecutionFeedback()) + ); } catch (IOException e) { throw new ServiceException(e); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index ac01348..d520a40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hdfs.protocolPB; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -24,9 +27,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; import com.google.protobuf.ByteString; - import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.DFSUtilClient; @@ -43,38 +47,44 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommand import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECReconstructionCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockSyncTaskProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BulkSyncTaskExecutionFeedbackProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto; -import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos - .SlowDiskReportProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowDiskReportProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerReportProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SyncCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SyncTaskExecutionFeedbackProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SyncTaskExecutionOutcomeProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SyncTaskExecutionResultProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SyncTaskIdProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto; -import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ProvidedStorageLocationProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfosProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ProvidedStorageLocationProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.BlockKeyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.BlockWithLocationsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.BlocksWithLocationsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.CheckpointCommandProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.CheckpointSignatureProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.ExportedBlockKeysProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NNHAStatusHeartbeatProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeCommandProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeRegistrationProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeRegistrationProto.NamenodeRoleProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamespaceInfoProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NNHAStatusHeartbeatProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.RecoveringBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.RemoteEditLogManifestProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.RemoteEditLogProto; @@ -89,18 +99,23 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; +import org.apache.hadoop.hdfs.server.protocol.BlockSyncTask; +import org.apache.hadoop.hdfs.server.protocol.BlockSyncTaskExecutionFeedback; +import org.apache.hadoop.hdfs.server.protocol.SyncTaskExecutionOutcome; +import org.apache.hadoop.hdfs.server.protocol.SyncTaskExecutionResult; import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; -import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations; +import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback; import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -119,7 +134,9 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; +import org.apache.hadoop.hdfs.server.protocol.SyncCommand; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; +import org.apache.hadoop.ipc.ClientId; /** * Utilities for converting protobuf classes to and from implementation classes @@ -469,11 +486,52 @@ public class PBHelper { return PBHelper.convert(proto.getBlkIdCmd()); case BlockECReconstructionCommand: return PBHelper.convert(proto.getBlkECReconstructionCmd()); + case SyncCommand: + return PBHelper.convert(proto.getSyncCommand()); default: return null; } } - + + private static SyncCommand convert(SyncCommandProto backupCommand) { + List syncTasksProtoList = + backupCommand.getSyncTasksList(); + List syncTasksList = + new ArrayList(syncTasksProtoList.size()); + for (BlockSyncTaskProto syncTaskProto : syncTasksProtoList) { + syncTasksList.add(convertSyncTask(syncTaskProto)); + } + + return new SyncCommand(DatanodeProtocol.DNA_BACKUP, syncTasksList); + } + + private static BlockSyncTask convertSyncTask( + BlockSyncTaskProto syncTaskProto) { + SyncTaskIdProto syncTaskIdProto = syncTaskProto.getSyncTaskId(); + UUID syncTaskId = convert(syncTaskIdProto); + try { + return new BlockSyncTask(syncTaskId, + new URI(syncTaskProto.getUri()), + PBHelperClient.convertLocatedBlocks( + syncTaskProto.getLocatedBlocksList()), + syncTaskProto.getPartNumber(), + syncTaskProto.getUploadHandle().toByteArray(), + syncTaskProto.getOffset(), + syncTaskProto.getLength(), + syncTaskIdProto.getSyncMountId()); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(); + } + } + + public static UUID convert(SyncTaskIdProto syncTaskIdProto) { + byte[] clientId = syncTaskIdProto.getSyncTaskId().toByteArray(); + long syncTaskIdMsb = ClientId.getMsb(clientId); + long syncTaskIdLsb = ClientId.getLsb(clientId); + return new UUID(syncTaskIdMsb, syncTaskIdLsb); + } + + public static BalancerBandwidthCommandProto convert( BalancerBandwidthCommand bbCmd) { return BalancerBandwidthCommandProto.newBuilder() @@ -603,6 +661,10 @@ public class PBHelper { .setBlkECReconstructionCmd( convert((BlockECReconstructionCommand) datanodeCommand)); break; + case DatanodeProtocol.DNA_BACKUP: + builder.setCmdType(DatanodeCommandProto.Type.SyncCommand) + .setSyncCommand(convert((SyncCommand) datanodeCommand)); + break; case DatanodeProtocol.DNA_UNKNOWN: //Not expected default: builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand); @@ -1124,4 +1186,130 @@ public class PBHelper { return new FileRegion(block, providedStorageLocation); } + + private static SyncCommandProto convert(SyncCommand syncCommand) { + SyncCommandProto.Builder builder = SyncCommandProto.newBuilder(); + + List syncTaskProtos = syncCommand.getSyncTasks() + .stream() + .map(syncTask -> convert(syncTask)) + .collect(Collectors.toList()); + + builder.addAllSyncTasks(syncTaskProtos); + + return builder.build(); + } + + private static BlockSyncTaskProto convert(BlockSyncTask blockSyncTask) { + BlockSyncTaskProto.Builder builder = BlockSyncTaskProto.newBuilder(); + builder.addAllLocatedBlocks( + PBHelperClient.convertLocatedBlocks2(blockSyncTask.getLocatedBlocks())); + builder.setUploadHandle( + ByteString.copyFrom(blockSyncTask.getUploadHandle())); + builder.setPartNumber(blockSyncTask.getPartNumber()); + builder.setUri(blockSyncTask.getRemoteURI().toString()); + builder.setOffset(blockSyncTask.getOffset()); + builder.setLength(blockSyncTask.getLength()); + + return builder.build(); + } + + public static SyncTaskIdProto convert(UUID syncTaskId, String syncMountId) { + SyncTaskIdProto.Builder builder = SyncTaskIdProto.newBuilder(); + ByteBuffer syncTaskIdBytes = ByteBuffer.wrap(new byte[16]); + syncTaskIdBytes.putLong(syncTaskId.getMostSignificantBits()); + syncTaskIdBytes.putLong(syncTaskId.getLeastSignificantBits()); + builder.setSyncTaskId(ByteString.copyFrom(syncTaskIdBytes.array())); + builder.setSyncMountId(syncMountId); + return builder.build(); + } + + + public static BulkSyncTaskExecutionFeedbackProto convert( + BulkSyncTaskExecutionFeedback bulkFeedback) { + return BulkSyncTaskExecutionFeedbackProto.newBuilder() + .addAllFeedbacks(bulkFeedback.getFeedbacks().stream() + .map(f -> convert(f)).collect(Collectors.toList())) + .build(); + } + + public static SyncTaskExecutionFeedbackProto convert( + BlockSyncTaskExecutionFeedback feedback) { + SyncTaskExecutionFeedbackProto.Builder builder = + SyncTaskExecutionFeedbackProto.newBuilder() + .setSyncTaskId( + convert(feedback.getSyncTaskId(), feedback.getSyncMountId())) + .setOutcome(convert(feedback.getOutcome())); + if (feedback.getResult() != null) { + builder.setResult(convert(feedback.getResult())); + } + return builder.build(); + } + + public static SyncTaskExecutionOutcomeProto convert( + SyncTaskExecutionOutcome outcome) { + switch (outcome) { + case FINISHED_SUCCESSFULLY: + return SyncTaskExecutionOutcomeProto.FINISHED_SUCCESSFULLY; + case EXCEPTION: + return SyncTaskExecutionOutcomeProto.EXCEPTION; + default: + throw new IllegalArgumentException( + "Unknown SyncTaskExecutionOutcome: " + outcome); + } + } + + public static SyncTaskExecutionResultProto convert( + SyncTaskExecutionResult result) { + SyncTaskExecutionResultProto.Builder builder = + SyncTaskExecutionResultProto.newBuilder(); + if (result.getResult() != null) { + builder.setResult(ByteString.copyFrom(result.getResult())); + } + if (result.getNumberOfBytes() != null) { + builder.setNumberOfBytes(result.getNumberOfBytes()); + } + return builder.build(); + } + + public static BulkSyncTaskExecutionFeedback convertBulkSyncTaskExecutionFeedback( + BulkSyncTaskExecutionFeedbackProto bulkSyncTaskExecutionFeedback) { + return new BulkSyncTaskExecutionFeedback( + bulkSyncTaskExecutionFeedback.getFeedbacksList().stream() + .map(feedback -> convert(feedback)).collect(Collectors.toList())); + } + + public static BlockSyncTaskExecutionFeedback convert( + SyncTaskExecutionFeedbackProto feedback) { + return new BlockSyncTaskExecutionFeedback(convert(feedback.getSyncTaskId()), + convert(feedback.getOutcome()), + feedback.hasResult() ? convert(feedback.getResult()) : null, + feedback.getSyncTaskId().getSyncMountId()); + } + + public static SyncTaskExecutionOutcome convert( + SyncTaskExecutionOutcomeProto outcome) { + switch (outcome) { + case FINISHED_SUCCESSFULLY: + return SyncTaskExecutionOutcome.FINISHED_SUCCESSFULLY; + case EXCEPTION: + return SyncTaskExecutionOutcome.EXCEPTION; + default: + throw new IllegalArgumentException( + "Unknown SyncTaskExecutionOutcomeProto: " + outcome); + } + } + + public static SyncTaskExecutionResult convert( + SyncTaskExecutionResultProto result) { + byte[] bytes = null; + if (result.getResult() != null) { + bytes = result.getResult().toByteArray(); + } + + ByteBuffer byteBuffer = + (bytes == null) ? null : ByteBuffer.wrap(bytes).asReadOnlyBuffer(); + return new SyncTaskExecutionResult(byteBuffer, result.getNumberOfBytes()); + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 9ebc693..bea5fe0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1631,7 +1631,9 @@ public class DatanodeManager { int maxTransfers, int failedVolumes, VolumeFailureSummary volumeFailureSummary, @Nonnull SlowPeerReports slowPeers, - @Nonnull SlowDiskReports slowDisks) throws IOException { + @Nonnull SlowDiskReports slowDisks, + BulkSyncTaskExecutionFeedback bulkSyncTaskExecutionFeedback) + throws IOException { final DatanodeDescriptor nodeinfo; try { nodeinfo = getDatanode(nodeReg); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index a94d2df..0e8144a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; +import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -511,6 +512,11 @@ class BPServiceActor implements Runnable { outliersReportDue && dn.getDiskMetrics() != null ? SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) : SlowDiskReports.EMPTY_REPORT; + + // TODO - collect feedback from SyncTasks here. + BulkSyncTaskExecutionFeedback bulkSyncTaskExecutionFeedback = + new BulkSyncTaskExecutionFeedback(Collections.emptyList()); + HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration, reports, dn.getFSDataset().getCacheCapacity(), @@ -521,7 +527,8 @@ class BPServiceActor implements Runnable { volumeFailureSummary, requestBlockReportLease, slowPeers, - slowDisks); + slowDisks, + bulkSyncTaskExecutionFeedback); if (outliersReportDue) { // If the report was due and successfully sent, schedule the next one. http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 8c95f7d..268f27f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -101,6 +101,7 @@ import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.hdfs.server.namenode.metrics.ReplicatedBlocksMBean; +import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.apache.hadoop.util.Time.now; import static org.apache.hadoop.util.Time.monotonicNow; @@ -3860,7 +3861,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, VolumeFailureSummary volumeFailureSummary, boolean requestFullBlockReportLease, @Nonnull SlowPeerReports slowPeers, - @Nonnull SlowDiskReports slowDisks) throws IOException { + @Nonnull SlowDiskReports slowDisks, + BulkSyncTaskExecutionFeedback bulkSyncTaskExecutionFeedback) + throws IOException { readLock(); try { //get datanode commands @@ -3869,7 +3872,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat( nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed, xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary, - slowPeers, slowDisks); + slowPeers, slowDisks, bulkSyncTaskExecutionFeedback); long blockReportLeaseId = 0; if (requestFullBlockReportLease) { blockReportLeaseId = blockManager.requestBlockReportLeaseId(nodeReg); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index c5b9d5a..413a14a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -157,6 +157,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; +import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -1498,13 +1499,15 @@ public class NameNodeRpcServer implements NamenodeProtocols { int failedVolumes, VolumeFailureSummary volumeFailureSummary, boolean requestFullBlockReportLease, @Nonnull SlowPeerReports slowPeers, - @Nonnull SlowDiskReports slowDisks) throws IOException { + @Nonnull SlowDiskReports slowDisks, + BulkSyncTaskExecutionFeedback bulkSyncTaskExecutionFeedback) + throws IOException { checkNNStartup(); verifyRequest(nodeReg); return namesystem.handleHeartbeat(nodeReg, report, dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress, failedVolumes, volumeFailureSummary, requestFullBlockReportLease, - slowPeers, slowDisks); + slowPeers, slowDisks, bulkSyncTaskExecutionFeedback); } @Override // DatanodeProtocol http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockSyncTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockSyncTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockSyncTask.java new file mode 100644 index 0000000..875a409 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockSyncTask.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import java.net.URI; +import java.util.List; +import java.util.UUID; + +/** + * A BlockSyncTask is an operation that is sent to the datanodes to copy + * blocks to an external storage endpoint as a part of an orchestrated + * synchronization across multiple datanodes. + * BlockSyncTask is intended to be an immutable POJO. + */ +public class BlockSyncTask { + private final UUID syncTaskId; + private final URI remoteURI; + private final List locatedBlocks; + private String syncMountId; + private final int partNumber; + private byte[] uploadHandle; + private final int offset; + private final long length; + + public BlockSyncTask(UUID syncTaskId, URI remoteURI, + List locatedBlocks, Integer partNumber, byte[] uploadHandle, + int offset, long length, String syncMountId) { + this.syncTaskId = syncTaskId; + this.remoteURI = remoteURI; + this.locatedBlocks = locatedBlocks; + this.syncMountId = syncMountId; + this.partNumber = partNumber; + this.uploadHandle = uploadHandle; + this.offset = offset; + this.length = length; + } + + public int getPartNumber() { + return partNumber; + } + + public byte[] getUploadHandle() { + return uploadHandle; + } + + public int getOffset() { + return offset; + } + + public long getLength() { + return length; + } + + public UUID getSyncTaskId() { + return syncTaskId; + } + + public URI getRemoteURI() { + return remoteURI; + } + + public List getLocatedBlocks() { + return locatedBlocks; + } + + public String getSyncMountId() { + return syncMountId; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BulkSyncTaskExecutionFeedback.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BulkSyncTaskExecutionFeedback.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BulkSyncTaskExecutionFeedback.java new file mode 100644 index 0000000..0d459e8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BulkSyncTaskExecutionFeedback.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import java.util.Collection; + +/** + * Feedback for a collection of {@link BlockSyncTask}s. + */ +public class BulkSyncTaskExecutionFeedback { + + private Collection feedbacks; + + public BulkSyncTaskExecutionFeedback( + Collection feedbacks) { + this.feedbacks = feedbacks; + } + + public Collection getFeedbacks() { + return feedbacks; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index 1f55100..d69dee7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -79,6 +79,7 @@ public interface DatanodeProtocol { final static int DNA_CACHE = 9; // cache blocks final static int DNA_UNCACHE = 10; // uncache blocks final static int DNA_ERASURE_CODING_RECONSTRUCTION = 11; // erasure coding reconstruction command + final static int DNA_BACKUP = 14; // back up data to PROVIDED stores. /** * Register Datanode. @@ -101,6 +102,8 @@ public interface DatanodeProtocol { * or to copy them to other DataNodes, etc. * @param registration datanode registration information * @param reports utilization report per storage + * @param cacheCapacity + * @param cacheUsed * @param xmitsInProgress number of transfers from this datanode to others * @param xceiverCount number of active transceiver threads * @param failedVolumes number of failed volumes @@ -110,20 +113,19 @@ public interface DatanodeProtocol { * @param slowPeers Details of peer DataNodes that were detected as being * slow to respond to packet writes. Empty report if no * slow peers were detected by the DataNode. + * @param bulkSyncTaskExecutionFeedback Result of the execution of the + * sync tasks. * @throws IOException on error */ @Idempotent public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, - StorageReport[] reports, - long dnCacheCapacity, - long dnCacheUsed, - int xmitsInProgress, - int xceiverCount, - int failedVolumes, - VolumeFailureSummary volumeFailureSummary, - boolean requestFullBlockReportLease, - @Nonnull SlowPeerReports slowPeers, - @Nonnull SlowDiskReports slowDisks) + StorageReport[] reports, long cacheCapacity, long cacheUsed, + int xmitsInProgress, int xceiverCount, int failedVolumes, + VolumeFailureSummary volumeFailureSummary, + boolean requestFullBlockReportLease, + @Nonnull SlowPeerReports slowPeers, + @Nonnull SlowDiskReports slowDisks, + BulkSyncTaskExecutionFeedback bulkSyncTaskExecutionFeedback) throws IOException; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncCommand.java new file mode 100644 index 0000000..7e2c242 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncCommand.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import java.util.List; + +/** + * A SyncCommand is an instruction to a DataNode to move the + * give file to specified target DataNodes provided storage. + */ +public class SyncCommand extends DatanodeCommand { + + private final List syncTasks; + + public SyncCommand(int action, List syncTasks) { + super(action); + this.syncTasks = syncTasks; + } + + public List getSyncTasks() { + return syncTasks; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index bf0df5b..5d6ef41 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -60,6 +60,7 @@ message DatanodeCommandProto { NullDatanodeCommand = 7; BlockIdCommand = 8; BlockECReconstructionCommand = 9; + SyncCommand = 10; } required Type cmdType = 1; // Type of the command @@ -74,6 +75,7 @@ message DatanodeCommandProto { optional RegisterCommandProto registerCmd = 7; optional BlockIdCommandProto blkIdCmd = 8; optional BlockECReconstructionCommandProto blkECReconstructionCmd = 9; + optional SyncCommandProto syncCommand = 10; } /** @@ -154,6 +156,89 @@ message BlockECReconstructionCommandProto { repeated BlockECReconstructionInfoProto blockECReconstructioninfo = 1; } +message SyncTaskIdProto { + required bytes syncTaskId = 1; + required string syncMountId = 2; +} + +/** + * Instruct the datanode to perform a backup command + */ +message SyncCommandProto { + repeated BlockSyncTaskProto syncTasks = 1; +} + +/** + * A block synchronization task as part of an orchestrated synchronization + * across potentially multiple datanodes (i.e. multipart put part). + */ +message BlockSyncTaskProto { + required SyncTaskIdProto syncTaskId = 1; + + required bytes uploadHandle = 2; + required int32 partNumber = 3; + repeated LocatedBlockProto locatedBlocks = 4; + required string uri = 5; + required int32 offset = 6; + required int64 length = 7; +} + +/** + * Block storage movement information + */ +message BlockMovingInfoProto { + required BlockProto block = 1; + required DatanodeInfoProto sourceDnInfo = 2; + required DatanodeInfoProto targetDnInfo = 3; + required StorageTypeProto sourceStorageType = 4; + required StorageTypeProto targetStorageType = 5; +} + +/** + * Blocks for which storage movements has been attempted and finished + * with either success or failure. + */ +message BlocksStorageMoveAttemptFinishedProto { + repeated BlockProto blocks = 1; +} + +/** + * A collection of feedbacks for a collection of sync tasks. + */ +message BulkSyncTaskExecutionFeedbackProto { + repeated SyncTaskExecutionFeedbackProto feedbacks = 1; +} + +/** + * Feedback for a sync task that has been executed. + * syncTaskId - identifier for the task. + * outcome - success/error. + * operation - the type of operation. + * result - if the outcome is successful, the results of the sync task. + */ +message SyncTaskExecutionFeedbackProto { + required SyncTaskIdProto syncTaskId = 1; + required SyncTaskExecutionOutcomeProto outcome = 2; + optional SyncTaskExecutionResultProto result = 3; +} + +/** + * Success of failure indication of a sync task. + */ +enum SyncTaskExecutionOutcomeProto { + FINISHED_SUCCESSFULLY = 0; + EXCEPTION = 1; +} + +/** + * result - the opaque result data from the sync task. + * numberOfBytes - the number of bytes copied. + */ +message SyncTaskExecutionResultProto { + optional bytes result = 1; + optional int64 numberOfBytes = 2; +} + /** * registration - Information of the datanode registering with the namenode */ @@ -210,6 +295,7 @@ message HeartbeatRequestProto { optional bool requestFullBlockReportLease = 9 [ default = false ]; repeated SlowPeerReportProto slowPeers = 10; repeated SlowDiskReportProto slowDisks = 11; + optional BulkSyncTaskExecutionFeedbackProto bulkSyncTaskExecutionFeedback = 12; } /** @@ -275,7 +361,7 @@ message StorageBlockReportProto { */ message BlockReportResponseProto { optional DatanodeCommandProto cmd = 1; -} +} /** * registration - datanode registration information http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java index dd6f40a..8ba0396 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java @@ -589,7 +589,7 @@ public class TestDatanodeManager { Mockito.when(dm.getDatanode(dnReg)).thenReturn(nodeInfo); DatanodeCommand[] cmds = dm.handleHeartbeat( dnReg, new StorageReport[1], "bp-123", 0, 0, 10, maxTransfers, 0, null, - SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT); + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, null); long expectedNumCmds = Arrays.stream( new int[]{numReplicationTasks, numECTasks}) http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java index 05b6d30..e81fb1f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java @@ -116,7 +116,7 @@ public class TestNameNodePrunesMissingStorages { cluster.stopDataNode(0); cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0, 0, null, true, SlowPeerReports.EMPTY_REPORT, - SlowDiskReports.EMPTY_REPORT); + SlowDiskReports.EMPTY_REPORT, null); // Check that the missing storage was pruned. assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java index b453991..2a6975f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java @@ -167,7 +167,8 @@ public class InternalDataNodeTestUtils { Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), Mockito.any(SlowPeerReports.class), - Mockito.any(SlowDiskReports.class))).thenReturn( + Mockito.any(SlowDiskReports.class), + null)).thenReturn( new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat( HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current() .nextLong() | 1L)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 4863ca1..62b84d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.datanode; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; + +import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -157,7 +159,8 @@ public class TestBPOfferService { Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), Mockito.any(SlowPeerReports.class), - Mockito.any(SlowDiskReports.class)); + Mockito.any(SlowDiskReports.class), + Mockito.any(BulkSyncTaskExecutionFeedback.class)); mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0); datanodeCommands[nnIdx] = new DatanodeCommand[0]; return mock; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 07fd4ae..09fae14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode; import org.apache.hadoop.hdfs.AppendTestUtil; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; @@ -232,7 +233,8 @@ public class TestBlockRecovery { Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), Mockito.any(SlowPeerReports.class), - Mockito.any(SlowDiskReports.class))) + Mockito.any(SlowDiskReports.class), + Mockito.any(BulkSyncTaskExecutionFeedback.class))) .thenReturn(new HeartbeatResponse( new DatanodeCommand[0], new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1), http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java index 28427bc..6374540 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java @@ -24,13 +24,14 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHEC import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY; +import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.any; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyLong; @@ -172,7 +173,8 @@ public class TestDataNodeLifeline { any(VolumeFailureSummary.class), anyBoolean(), any(SlowPeerReports.class), - any(SlowDiskReports.class)); + any(SlowDiskReports.class), + any(BulkSyncTaskExecutionFeedback.class)); // Intercept lifeline to trigger latch count-down on each call. doAnswer(new LatchCountingAnswer(lifelinesSent)) @@ -237,7 +239,8 @@ public class TestDataNodeLifeline { any(VolumeFailureSummary.class), anyBoolean(), any(SlowPeerReports.class), - any(SlowDiskReports.class)); + any(SlowDiskReports.class), + any(BulkSyncTaskExecutionFeedback.class)); // While waiting on the latch for the expected number of heartbeat messages, // poll DataNode tracking information. We expect that the DataNode always http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java index bb1d9ef..2d7dea9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode; +import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -222,7 +223,8 @@ public class TestDatanodeProtocolRetryPolicy { Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), Mockito.any(SlowPeerReports.class), - Mockito.any(SlowDiskReports.class)); + Mockito.any(SlowDiskReports.class), + Mockito.any(BulkSyncTaskExecutionFeedback.class)); dn = new DataNode(conf, locations, null, null) { @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index 2dbd5b9..2cf0135 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode; import net.jcip.annotations.NotThreadSafe; +import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; @@ -208,7 +209,8 @@ public class TestFsDatasetCache { (StorageReport[]) any(), anyLong(), anyLong(), anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(), anyBoolean(), any(SlowPeerReports.class), - any(SlowDiskReports.class)); + any(SlowDiskReports.class), + any(BulkSyncTaskExecutionFeedback.class)); } finally { lock.writeLock().unlock(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java index 5f62ddb..0f0bc1d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; @@ -110,7 +111,8 @@ public class TestStorageReport { anyLong(), anyLong(), anyInt(), anyInt(), anyInt(), Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), Mockito.any(SlowPeerReports.class), - Mockito.any(SlowDiskReports.class)); + Mockito.any(SlowDiskReports.class), + Mockito.any(BulkSyncTaskExecutionFeedback.class)); StorageReport[] reports = captor.getValue(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index 3a3c471..9940174 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -956,8 +956,8 @@ public class NNThroughputBenchmark implements Tool { DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) }; DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep, 0L, 0L, 0, 0, 0, null, true, - SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT) - .getCommands(); + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, + null).getCommands(); if(cmds != null) { for (DatanodeCommand cmd : cmds ) { if(LOG.isDebugEnabled()) { @@ -1007,8 +1007,8 @@ public class NNThroughputBenchmark implements Tool { false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) }; DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep, 0L, 0L, 0, 0, 0, null, true, - SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT) - .getCommands(); + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, + null).getCommands(); if (cmds != null) { for (DatanodeCommand cmd : cmds) { if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index b85527a..8589f75 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -17,11 +17,13 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.mockito.Mockito.spy; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.lang3.reflect.FieldUtils; @@ -130,7 +132,8 @@ public class NameNodeAdapter { return namesystem.handleHeartbeat(nodeReg, BlockManagerTestUtil.getStorageReportsForDatanode(dd), dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true, - SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT); + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, + new BulkSyncTaskExecutionFeedback(new ArrayList<>())); } public static boolean setReplication(final FSNamesystem ns, http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index 366f584..9f9a897 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -139,8 +139,8 @@ public class TestDeadDatanode { false, 0, 0, 0, 0, 0) }; DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true, - SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT) - .getCommands(); + SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, + null).getCommands(); assertEquals(1, cmd.length); assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER .getAction()); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org