hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ehi...@apache.org
Subject [50/50] hadoop git commit: HDFS-13310. The DatanodeProtocol should have a DNA_BACKUP to backup blocks. Original patch contributed by Ewan Higgs. Followup work and fixed contributed by Virajith Jalaparthi.
Date Tue, 31 Jul 2018 09:26:32 GMT
HDFS-13310. The DatanodeProtocol should have a DNA_BACKUP to backup blocks. Original patch contributed by Ewan Higgs. Followup work and fixed contributed by Virajith Jalaparthi.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d52a2afb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d52a2afb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d52a2afb

Branch: refs/heads/HDFS-12090
Commit: d52a2afbf0d39b9de9d021e4e38f4ee993fa5fa0
Parents: 3e06a5d
Author: Ewan Higgs <ewan.higgs@wdc.com>
Authored: Mon Jul 23 13:14:04 2018 +0200
Committer: Ewan Higgs <ewan.higgs@wdc.com>
Committed: Tue Jul 31 11:24:39 2018 +0200

----------------------------------------------------------------------
 .../BlockSyncTaskExecutionFeedback.java         |  67 ++++++
 .../protocol/SyncTaskExecutionOutcome.java      |  25 +++
 .../protocol/SyncTaskExecutionResult.java       |  46 ++++
 .../DatanodeProtocolClientSideTranslatorPB.java |   8 +-
 .../DatanodeProtocolServerSideTranslatorPB.java |   6 +-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 208 ++++++++++++++++++-
 .../server/blockmanagement/DatanodeManager.java |   4 +-
 .../hdfs/server/datanode/BPServiceActor.java    |   9 +-
 .../hdfs/server/namenode/FSNamesystem.java      |   7 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java |   7 +-
 .../hdfs/server/protocol/BlockSyncTask.java     |  83 ++++++++
 .../protocol/BulkSyncTaskExecutionFeedback.java |  36 ++++
 .../hdfs/server/protocol/DatanodeProtocol.java  |  22 +-
 .../hdfs/server/protocol/SyncCommand.java       |  39 ++++
 .../src/main/proto/DatanodeProtocol.proto       |  88 +++++++-
 .../blockmanagement/TestDatanodeManager.java    |   2 +-
 .../TestNameNodePrunesMissingStorages.java      |   2 +-
 .../datanode/InternalDataNodeTestUtils.java     |   3 +-
 .../server/datanode/TestBPOfferService.java     |   5 +-
 .../hdfs/server/datanode/TestBlockRecovery.java |   4 +-
 .../server/datanode/TestDataNodeLifeline.java   |   9 +-
 .../TestDatanodeProtocolRetryPolicy.java        |   4 +-
 .../server/datanode/TestFsDatasetCache.java     |   4 +-
 .../hdfs/server/datanode/TestStorageReport.java |   4 +-
 .../server/namenode/NNThroughputBenchmark.java  |   8 +-
 .../hdfs/server/namenode/NameNodeAdapter.java   |   5 +-
 .../hdfs/server/namenode/TestDeadDatanode.java  |   4 +-
 27 files changed, 662 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockSyncTaskExecutionFeedback.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockSyncTaskExecutionFeedback.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockSyncTaskExecutionFeedback.java
new file mode 100644
index 0000000..2e5393e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockSyncTaskExecutionFeedback.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.util.UUID;
+
+/**
+ * Feedback for a BlockSyncTask.
+ */
+public class BlockSyncTaskExecutionFeedback {
+
+  private UUID syncTaskId;
+  private SyncTaskExecutionOutcome outcome;
+  private SyncTaskExecutionResult result;
+  private String syncMountId;
+
+  public BlockSyncTaskExecutionFeedback(UUID syncTaskId,
+      SyncTaskExecutionOutcome outcome, SyncTaskExecutionResult result,
+      String syncMountId) {
+    this.syncTaskId = syncTaskId;
+    this.outcome = outcome;
+    this.result = result;
+    this.syncMountId = syncMountId;
+  }
+
+  public static BlockSyncTaskExecutionFeedback finishedSuccessfully(
+      UUID syncTaskId, String syncMountId, SyncTaskExecutionResult result) {
+    return new BlockSyncTaskExecutionFeedback(syncTaskId,
+        SyncTaskExecutionOutcome.FINISHED_SUCCESSFULLY, result, syncMountId);
+  }
+
+  public static BlockSyncTaskExecutionFeedback failedWithException(
+      UUID syncTaskId, String syncMountId, Exception e) {
+    return new BlockSyncTaskExecutionFeedback(syncTaskId,
+        SyncTaskExecutionOutcome.EXCEPTION, null, syncMountId);
+  }
+
+  public UUID getSyncTaskId() {
+    return syncTaskId;
+  }
+
+  public SyncTaskExecutionOutcome getOutcome() {
+    return outcome;
+  }
+
+  public SyncTaskExecutionResult getResult() {
+    return result;
+  }
+
+  public String getSyncMountId() {
+    return syncMountId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncTaskExecutionOutcome.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncTaskExecutionOutcome.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncTaskExecutionOutcome.java
new file mode 100644
index 0000000..492575b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncTaskExecutionOutcome.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+/**
+ * SyncTaskExecutionOutcome is whether the SyncTask was successful or not.
+ */
+public enum SyncTaskExecutionOutcome {
+  FINISHED_SUCCESSFULLY,
+  EXCEPTION
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncTaskExecutionResult.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncTaskExecutionResult.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncTaskExecutionResult.java
new file mode 100644
index 0000000..b623dc5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncTaskExecutionResult.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Result of a SyncTask.
+ */
+public class SyncTaskExecutionResult {
+
+  /** result is the opaque byte stream result of a task. e.g. PartHandle */
+  private ByteBuffer result;
+  private Long numberOfBytes;
+
+  public SyncTaskExecutionResult(ByteBuffer result, Long numberOfBytes) {
+    this.result = result;
+    this.numberOfBytes = numberOfBytes;
+  }
+
+  public static SyncTaskExecutionResult emptyResult() {
+    return new SyncTaskExecutionResult(ByteBuffer.wrap(new byte[0]), 0L);
+  }
+
+  public ByteBuffer getResult() {
+    return result;
+  }
+
+  public Long getNumberOfBytes() {
+    return numberOfBytes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index 9cc4516..20b314c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlock
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -138,7 +139,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
       VolumeFailureSummary volumeFailureSummary,
       boolean requestFullBlockReportLease,
       @Nonnull SlowPeerReports slowPeers,
-      @Nonnull SlowDiskReports slowDisks) throws IOException {
+      @Nonnull SlowDiskReports slowDisks,
+      BulkSyncTaskExecutionFeedback feedback) throws IOException {
     HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
         .setRegistration(PBHelper.convert(registration))
         .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
@@ -161,6 +163,10 @@ public class DatanodeProtocolClientSideTranslatorPB implements
     if (slowDisks.haveSlowDisks()) {
       builder.addAllSlowDisks(PBHelper.convertSlowDiskInfo(slowDisks));
     }
+    if(feedback != null && !feedback.getFeedbacks().isEmpty()) {
+      builder.setBulkSyncTaskExecutionFeedback(PBHelper.convert(feedback));
+    }
+
     HeartbeatResponseProto resp;
     try {
       resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index 5cba284..a51ce85 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@ -109,6 +109,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements
   @Override
   public HeartbeatResponseProto sendHeartbeat(RpcController controller,
       HeartbeatRequestProto request) throws ServiceException {
+
     HeartbeatResponse response;
     try {
       final StorageReport[] report = PBHelperClient.convertStorageReports(
@@ -122,7 +123,10 @@ public class DatanodeProtocolServerSideTranslatorPB implements
           request.getXceiverCount(), request.getFailedVolumes(),
           volumeFailureSummary, request.getRequestFullBlockReportLease(),
           PBHelper.convertSlowPeerInfo(request.getSlowPeersList()),
-          PBHelper.convertSlowDiskInfo(request.getSlowDisksList()));
+          PBHelper.convertSlowDiskInfo(request.getSlowDisksList()),
+          PBHelper.convertBulkSyncTaskExecutionFeedback(
+              request.getBulkSyncTaskExecutionFeedback())
+      );
     } catch (IOException e) {
       throw new ServiceException(e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index ac01348..d520a40 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hdfs.protocolPB;
 
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -24,9 +27,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
 
 import com.google.protobuf.ByteString;
-
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSUtilClient;
@@ -43,38 +47,44 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommand
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECReconstructionCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockSyncTaskProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BulkSyncTaskExecutionFeedbackProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos
-    .SlowDiskReportProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowDiskReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerReportProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SyncCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SyncTaskExecutionFeedbackProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SyncTaskExecutionOutcomeProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SyncTaskExecutionResultProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SyncTaskIdProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
-import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ProvidedStorageLocationProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfosProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ProvidedStorageLocationProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.BlockKeyProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.BlockWithLocationsProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.BlocksWithLocationsProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.CheckpointCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.CheckpointSignatureProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.ExportedBlockKeysProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NNHAStatusHeartbeatProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeRegistrationProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeRegistrationProto.NamenodeRoleProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamespaceInfoProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NNHAStatusHeartbeatProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.RecoveringBlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.RemoteEditLogManifestProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.RemoteEditLogProto;
@@ -89,18 +99,23 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
+import org.apache.hadoop.hdfs.server.protocol.BlockSyncTask;
+import org.apache.hadoop.hdfs.server.protocol.BlockSyncTaskExecutionFeedback;
+import org.apache.hadoop.hdfs.server.protocol.SyncTaskExecutionOutcome;
+import org.apache.hadoop.hdfs.server.protocol.SyncTaskExecutionResult;
 import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -119,7 +134,9 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
+import org.apache.hadoop.hdfs.server.protocol.SyncCommand;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
+import org.apache.hadoop.ipc.ClientId;
 
 /**
  * Utilities for converting protobuf classes to and from implementation classes
@@ -469,11 +486,52 @@ public class PBHelper {
       return PBHelper.convert(proto.getBlkIdCmd());
     case BlockECReconstructionCommand:
       return PBHelper.convert(proto.getBlkECReconstructionCmd());
+    case SyncCommand:
+      return PBHelper.convert(proto.getSyncCommand());
     default:
       return null;
     }
   }
-  
+
+  private static SyncCommand convert(SyncCommandProto backupCommand) {
+    List<BlockSyncTaskProto> syncTasksProtoList =
+        backupCommand.getSyncTasksList();
+    List<BlockSyncTask> syncTasksList =
+        new ArrayList(syncTasksProtoList.size());
+    for (BlockSyncTaskProto syncTaskProto : syncTasksProtoList) {
+      syncTasksList.add(convertSyncTask(syncTaskProto));
+    }
+
+    return new SyncCommand(DatanodeProtocol.DNA_BACKUP, syncTasksList);
+  }
+
+  private static BlockSyncTask convertSyncTask(
+      BlockSyncTaskProto syncTaskProto) {
+    SyncTaskIdProto syncTaskIdProto = syncTaskProto.getSyncTaskId();
+    UUID syncTaskId = convert(syncTaskIdProto);
+    try {
+      return new BlockSyncTask(syncTaskId,
+          new URI(syncTaskProto.getUri()),
+          PBHelperClient.convertLocatedBlocks(
+              syncTaskProto.getLocatedBlocksList()),
+          syncTaskProto.getPartNumber(),
+          syncTaskProto.getUploadHandle().toByteArray(),
+          syncTaskProto.getOffset(),
+          syncTaskProto.getLength(),
+          syncTaskIdProto.getSyncMountId());
+    } catch (URISyntaxException e) {
+      throw new IllegalArgumentException();
+    }
+  }
+
+  public static UUID convert(SyncTaskIdProto syncTaskIdProto) {
+    byte[] clientId = syncTaskIdProto.getSyncTaskId().toByteArray();
+    long syncTaskIdMsb = ClientId.getMsb(clientId);
+    long syncTaskIdLsb = ClientId.getLsb(clientId);
+    return new UUID(syncTaskIdMsb, syncTaskIdLsb);
+  }
+
+
   public static BalancerBandwidthCommandProto convert(
       BalancerBandwidthCommand bbCmd) {
     return BalancerBandwidthCommandProto.newBuilder()
@@ -603,6 +661,10 @@ public class PBHelper {
           .setBlkECReconstructionCmd(
               convert((BlockECReconstructionCommand) datanodeCommand));
       break;
+    case DatanodeProtocol.DNA_BACKUP:
+      builder.setCmdType(DatanodeCommandProto.Type.SyncCommand)
+          .setSyncCommand(convert((SyncCommand) datanodeCommand));
+      break;
     case DatanodeProtocol.DNA_UNKNOWN: //Not expected
     default:
       builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand);
@@ -1124,4 +1186,130 @@ public class PBHelper {
 
     return new FileRegion(block, providedStorageLocation);
   }
+
+  private static SyncCommandProto convert(SyncCommand syncCommand) {
+    SyncCommandProto.Builder builder = SyncCommandProto.newBuilder();
+
+    List<BlockSyncTaskProto> syncTaskProtos = syncCommand.getSyncTasks()
+        .stream()
+        .map(syncTask -> convert(syncTask))
+        .collect(Collectors.toList());
+
+    builder.addAllSyncTasks(syncTaskProtos);
+
+    return builder.build();
+  }
+
+  private static BlockSyncTaskProto convert(BlockSyncTask blockSyncTask) {
+    BlockSyncTaskProto.Builder builder = BlockSyncTaskProto.newBuilder();
+    builder.addAllLocatedBlocks(
+        PBHelperClient.convertLocatedBlocks2(blockSyncTask.getLocatedBlocks()));
+    builder.setUploadHandle(
+        ByteString.copyFrom(blockSyncTask.getUploadHandle()));
+    builder.setPartNumber(blockSyncTask.getPartNumber());
+    builder.setUri(blockSyncTask.getRemoteURI().toString());
+    builder.setOffset(blockSyncTask.getOffset());
+    builder.setLength(blockSyncTask.getLength());
+
+    return builder.build();
+  }
+
+  public static SyncTaskIdProto convert(UUID syncTaskId, String syncMountId) {
+    SyncTaskIdProto.Builder builder = SyncTaskIdProto.newBuilder();
+    ByteBuffer syncTaskIdBytes = ByteBuffer.wrap(new byte[16]);
+    syncTaskIdBytes.putLong(syncTaskId.getMostSignificantBits());
+    syncTaskIdBytes.putLong(syncTaskId.getLeastSignificantBits());
+    builder.setSyncTaskId(ByteString.copyFrom(syncTaskIdBytes.array()));
+    builder.setSyncMountId(syncMountId);
+    return builder.build();
+  }
+
+
+  public static BulkSyncTaskExecutionFeedbackProto convert(
+      BulkSyncTaskExecutionFeedback bulkFeedback) {
+    return BulkSyncTaskExecutionFeedbackProto.newBuilder()
+        .addAllFeedbacks(bulkFeedback.getFeedbacks().stream()
+            .map(f -> convert(f)).collect(Collectors.toList()))
+        .build();
+  }
+
+  public static SyncTaskExecutionFeedbackProto convert(
+      BlockSyncTaskExecutionFeedback feedback) {
+    SyncTaskExecutionFeedbackProto.Builder builder =
+        SyncTaskExecutionFeedbackProto.newBuilder()
+            .setSyncTaskId(
+                convert(feedback.getSyncTaskId(), feedback.getSyncMountId()))
+            .setOutcome(convert(feedback.getOutcome()));
+    if (feedback.getResult() != null) {
+      builder.setResult(convert(feedback.getResult()));
+    }
+    return builder.build();
+  }
+
+  public static SyncTaskExecutionOutcomeProto convert(
+      SyncTaskExecutionOutcome outcome) {
+    switch (outcome) {
+    case FINISHED_SUCCESSFULLY:
+      return SyncTaskExecutionOutcomeProto.FINISHED_SUCCESSFULLY;
+    case EXCEPTION:
+      return SyncTaskExecutionOutcomeProto.EXCEPTION;
+    default:
+      throw new IllegalArgumentException(
+          "Unknown SyncTaskExecutionOutcome: " + outcome);
+    }
+  }
+
+  public static SyncTaskExecutionResultProto convert(
+      SyncTaskExecutionResult result) {
+    SyncTaskExecutionResultProto.Builder builder =
+        SyncTaskExecutionResultProto.newBuilder();
+    if (result.getResult() != null) {
+      builder.setResult(ByteString.copyFrom(result.getResult()));
+    }
+    if (result.getNumberOfBytes() != null) {
+      builder.setNumberOfBytes(result.getNumberOfBytes());
+    }
+    return builder.build();
+  }
+
+  public static BulkSyncTaskExecutionFeedback convertBulkSyncTaskExecutionFeedback(
+      BulkSyncTaskExecutionFeedbackProto bulkSyncTaskExecutionFeedback) {
+    return new BulkSyncTaskExecutionFeedback(
+        bulkSyncTaskExecutionFeedback.getFeedbacksList().stream()
+            .map(feedback -> convert(feedback)).collect(Collectors.toList()));
+  }
+
+  public static BlockSyncTaskExecutionFeedback convert(
+      SyncTaskExecutionFeedbackProto feedback) {
+    return new BlockSyncTaskExecutionFeedback(convert(feedback.getSyncTaskId()),
+        convert(feedback.getOutcome()),
+        feedback.hasResult() ? convert(feedback.getResult()) : null,
+        feedback.getSyncTaskId().getSyncMountId());
+  }
+
+  public static SyncTaskExecutionOutcome convert(
+      SyncTaskExecutionOutcomeProto outcome) {
+    switch (outcome) {
+    case FINISHED_SUCCESSFULLY:
+      return SyncTaskExecutionOutcome.FINISHED_SUCCESSFULLY;
+    case EXCEPTION:
+      return SyncTaskExecutionOutcome.EXCEPTION;
+    default:
+      throw new IllegalArgumentException(
+          "Unknown SyncTaskExecutionOutcomeProto: " + outcome);
+    }
+  }
+
+  public static SyncTaskExecutionResult convert(
+      SyncTaskExecutionResultProto result) {
+    byte[] bytes = null;
+    if (result.getResult() != null) {
+      bytes = result.getResult().toByteArray();
+    }
+
+    ByteBuffer byteBuffer =
+        (bytes == null) ? null : ByteBuffer.wrap(bytes).asReadOnlyBuffer();
+    return new SyncTaskExecutionResult(byteBuffer, result.getNumberOfBytes());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 9ebc693..bea5fe0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1631,7 +1631,9 @@ public class DatanodeManager {
       int maxTransfers, int failedVolumes,
       VolumeFailureSummary volumeFailureSummary,
       @Nonnull SlowPeerReports slowPeers,
-      @Nonnull SlowDiskReports slowDisks) throws IOException {
+      @Nonnull SlowDiskReports slowDisks,
+      BulkSyncTaskExecutionFeedback bulkSyncTaskExecutionFeedback)
+      throws IOException {
     final DatanodeDescriptor nodeinfo;
     try {
       nodeinfo = getDatanode(nodeReg);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index a94d2df..0e8144a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -511,6 +512,11 @@ class BPServiceActor implements Runnable {
         outliersReportDue && dn.getDiskMetrics() != null ?
             SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) :
             SlowDiskReports.EMPTY_REPORT;
+
+    // TODO - collect feedback from SyncTasks here.
+    BulkSyncTaskExecutionFeedback bulkSyncTaskExecutionFeedback =
+        new BulkSyncTaskExecutionFeedback(Collections.emptyList());
+
     HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
         reports,
         dn.getFSDataset().getCacheCapacity(),
@@ -521,7 +527,8 @@ class BPServiceActor implements Runnable {
         volumeFailureSummary,
         requestBlockReportLease,
         slowPeers,
-        slowDisks);
+        slowDisks,
+        bulkSyncTaskExecutionFeedback);
 
     if (outliersReportDue) {
       // If the report was due and successfully sent, schedule the next one.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 8c95f7d..268f27f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -101,6 +101,7 @@ import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.server.namenode.metrics.ReplicatedBlocksMBean;
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.apache.hadoop.util.Time.now;
 import static org.apache.hadoop.util.Time.monotonicNow;
@@ -3860,7 +3861,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       VolumeFailureSummary volumeFailureSummary,
       boolean requestFullBlockReportLease,
       @Nonnull SlowPeerReports slowPeers,
-      @Nonnull SlowDiskReports slowDisks) throws IOException {
+      @Nonnull SlowDiskReports slowDisks,
+      BulkSyncTaskExecutionFeedback bulkSyncTaskExecutionFeedback)
+      throws IOException {
     readLock();
     try {
       //get datanode commands
@@ -3869,7 +3872,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
           nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
           xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary,
-          slowPeers, slowDisks);
+          slowPeers, slowDisks, bulkSyncTaskExecutionFeedback);
       long blockReportLeaseId = 0;
       if (requestFullBlockReportLease) {
         blockReportLeaseId =  blockManager.requestBlockReportLeaseId(nodeReg);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index c5b9d5a..413a14a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -157,6 +157,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -1498,13 +1499,15 @@ public class NameNodeRpcServer implements NamenodeProtocols {
       int failedVolumes, VolumeFailureSummary volumeFailureSummary,
       boolean requestFullBlockReportLease,
       @Nonnull SlowPeerReports slowPeers,
-      @Nonnull SlowDiskReports slowDisks) throws IOException {
+      @Nonnull SlowDiskReports slowDisks,
+      BulkSyncTaskExecutionFeedback bulkSyncTaskExecutionFeedback)
+      throws IOException {
     checkNNStartup();
     verifyRequest(nodeReg);
     return namesystem.handleHeartbeat(nodeReg, report,
         dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
         failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
-        slowPeers, slowDisks);
+        slowPeers, slowDisks, bulkSyncTaskExecutionFeedback);
   }
 
   @Override // DatanodeProtocol

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockSyncTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockSyncTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockSyncTask.java
new file mode 100644
index 0000000..875a409
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockSyncTask.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import java.net.URI;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * A BlockSyncTask is an operation that is sent to the datanodes to copy
+ * blocks to an external storage endpoint as a part of an orchestrated
+ * synchronization across multiple datanodes.
+ * BlockSyncTask is intended to be an immutable POJO.
+ */
+public class BlockSyncTask {
+  private final UUID syncTaskId;
+  private final URI remoteURI;
+  private final List<LocatedBlock> locatedBlocks;
+  private String syncMountId;
+  private final int partNumber;
+  private byte[] uploadHandle;
+  private final int offset;
+  private final long length;
+
+  public BlockSyncTask(UUID syncTaskId, URI remoteURI,
+      List<LocatedBlock> locatedBlocks, Integer partNumber, byte[] uploadHandle,
+      int offset, long length, String syncMountId) {
+    this.syncTaskId = syncTaskId;
+    this.remoteURI = remoteURI;
+    this.locatedBlocks = locatedBlocks;
+    this.syncMountId = syncMountId;
+    this.partNumber = partNumber;
+    this.uploadHandle = uploadHandle;
+    this.offset = offset;
+    this.length = length;
+  }
+
+  public int getPartNumber() {
+    return partNumber;
+  }
+
+  public byte[] getUploadHandle() {
+    return uploadHandle;
+  }
+
+  public int getOffset() {
+    return offset;
+  }
+
+  public long getLength() {
+    return length;
+  }
+
+  public UUID getSyncTaskId() {
+    return syncTaskId;
+  }
+
+  public URI getRemoteURI() {
+    return remoteURI;
+  }
+
+  public List<LocatedBlock> getLocatedBlocks() {
+    return locatedBlocks;
+  }
+
+  public String getSyncMountId() {
+    return syncMountId;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BulkSyncTaskExecutionFeedback.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BulkSyncTaskExecutionFeedback.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BulkSyncTaskExecutionFeedback.java
new file mode 100644
index 0000000..0d459e8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BulkSyncTaskExecutionFeedback.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.util.Collection;
+
+/**
+ * Feedback for a collection of {@link BlockSyncTask}s.
+ */
+public class BulkSyncTaskExecutionFeedback {
+
+  private Collection<BlockSyncTaskExecutionFeedback> feedbacks;
+
+  public BulkSyncTaskExecutionFeedback(
+      Collection<BlockSyncTaskExecutionFeedback> feedbacks) {
+    this.feedbacks = feedbacks;
+  }
+
+  public Collection<BlockSyncTaskExecutionFeedback> getFeedbacks() {
+    return feedbacks;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index 1f55100..d69dee7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -79,6 +79,7 @@ public interface DatanodeProtocol {
   final static int DNA_CACHE = 9;      // cache blocks
   final static int DNA_UNCACHE = 10;   // uncache blocks
   final static int DNA_ERASURE_CODING_RECONSTRUCTION = 11; // erasure coding reconstruction command
+  final static int DNA_BACKUP = 14; // back up data to PROVIDED stores.
 
   /** 
    * Register Datanode.
@@ -101,6 +102,8 @@ public interface DatanodeProtocol {
    * or to copy them to other DataNodes, etc.
    * @param registration datanode registration information
    * @param reports utilization report per storage
+   * @param cacheCapacity
+   * @param cacheUsed
    * @param xmitsInProgress number of transfers from this datanode to others
    * @param xceiverCount number of active transceiver threads
    * @param failedVolumes number of failed volumes
@@ -110,20 +113,19 @@ public interface DatanodeProtocol {
    * @param slowPeers Details of peer DataNodes that were detected as being
    *                  slow to respond to packet writes. Empty report if no
    *                  slow peers were detected by the DataNode.
+   * @param bulkSyncTaskExecutionFeedback Result of the execution of the
+   *                                      sync tasks.
    * @throws IOException on error
    */
   @Idempotent
   public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
-                                       StorageReport[] reports,
-                                       long dnCacheCapacity,
-                                       long dnCacheUsed,
-                                       int xmitsInProgress,
-                                       int xceiverCount,
-                                       int failedVolumes,
-                                       VolumeFailureSummary volumeFailureSummary,
-                                       boolean requestFullBlockReportLease,
-                                       @Nonnull SlowPeerReports slowPeers,
-                                       @Nonnull SlowDiskReports slowDisks)
+      StorageReport[] reports, long cacheCapacity, long cacheUsed,
+      int xmitsInProgress, int xceiverCount, int failedVolumes,
+      VolumeFailureSummary volumeFailureSummary,
+      boolean requestFullBlockReportLease,
+      @Nonnull SlowPeerReports slowPeers,
+      @Nonnull SlowDiskReports slowDisks,
+      BulkSyncTaskExecutionFeedback bulkSyncTaskExecutionFeedback)
       throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncCommand.java
new file mode 100644
index 0000000..7e2c242
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/SyncCommand.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.util.List;
+
+/**
+ * A SyncCommand is an instruction to a DataNode to move the
+ * give file to specified target DataNodes provided storage.
+ */
+public class SyncCommand extends DatanodeCommand {
+
+  private final List<BlockSyncTask> syncTasks;
+
+  public SyncCommand(int action, List<BlockSyncTask> syncTasks) {
+    super(action);
+    this.syncTasks = syncTasks;
+  }
+
+  public List<BlockSyncTask> getSyncTasks() {
+    return syncTasks;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index bf0df5b..5d6ef41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -60,6 +60,7 @@ message DatanodeCommandProto {
     NullDatanodeCommand = 7;
     BlockIdCommand = 8;
     BlockECReconstructionCommand = 9;
+    SyncCommand = 10;
   }
 
   required Type cmdType = 1;    // Type of the command
@@ -74,6 +75,7 @@ message DatanodeCommandProto {
   optional RegisterCommandProto registerCmd = 7;
   optional BlockIdCommandProto blkIdCmd = 8;
   optional BlockECReconstructionCommandProto blkECReconstructionCmd = 9;
+  optional SyncCommandProto syncCommand = 10;
 }
 
 /**
@@ -154,6 +156,89 @@ message BlockECReconstructionCommandProto {
   repeated BlockECReconstructionInfoProto blockECReconstructioninfo = 1;
 }
 
+message SyncTaskIdProto {
+  required bytes syncTaskId = 1;
+  required string syncMountId = 2;
+}
+
+/**
+ * Instruct the datanode to perform a backup command
+ */
+message SyncCommandProto {
+  repeated BlockSyncTaskProto syncTasks = 1;
+}
+
+/**
+ * A block synchronization task as part of an orchestrated synchronization
+ * across potentially multiple datanodes (i.e. multipart put part).
+ */
+message BlockSyncTaskProto {
+  required SyncTaskIdProto syncTaskId = 1;
+
+  required bytes uploadHandle = 2;
+  required int32 partNumber = 3;
+  repeated LocatedBlockProto locatedBlocks = 4;
+  required string uri = 5;
+  required int32 offset = 6;
+  required int64 length = 7;
+}
+
+/**
+ * Block storage movement information
+ */
+message BlockMovingInfoProto {
+  required BlockProto block = 1;
+  required DatanodeInfoProto sourceDnInfo = 2;
+  required DatanodeInfoProto targetDnInfo = 3;
+  required StorageTypeProto sourceStorageType = 4;
+  required StorageTypeProto targetStorageType = 5;
+}
+
+/**
+ * Blocks for which storage movements has been attempted and finished
+ * with either success or failure.
+ */
+message BlocksStorageMoveAttemptFinishedProto {
+  repeated BlockProto blocks = 1;
+}
+
+/**
+ * A collection of feedbacks for a collection of sync tasks.
+ */
+message BulkSyncTaskExecutionFeedbackProto {
+  repeated SyncTaskExecutionFeedbackProto feedbacks = 1;
+}
+
+/**
+ * Feedback for a sync task that has been executed.
+ * syncTaskId - identifier for the task.
+ * outcome - success/error.
+ * operation - the type of operation.
+ * result - if the outcome is successful, the results of the sync task.
+ */
+message SyncTaskExecutionFeedbackProto {
+  required SyncTaskIdProto syncTaskId = 1;
+  required SyncTaskExecutionOutcomeProto outcome = 2;
+  optional SyncTaskExecutionResultProto result = 3;
+}
+
+/**
+ * Success of failure indication of a sync task.
+ */
+enum SyncTaskExecutionOutcomeProto {
+  FINISHED_SUCCESSFULLY = 0;
+  EXCEPTION = 1;
+}
+
+/**
+ * result - the opaque result data from the sync task.
+ * numberOfBytes - the number of bytes copied.
+ */
+message SyncTaskExecutionResultProto {
+  optional bytes result = 1;
+  optional int64 numberOfBytes = 2;
+}
+
 /**
  * registration - Information of the datanode registering with the namenode
  */
@@ -210,6 +295,7 @@ message HeartbeatRequestProto {
   optional bool requestFullBlockReportLease = 9 [ default = false ];
   repeated SlowPeerReportProto slowPeers = 10;
   repeated SlowDiskReportProto slowDisks = 11;
+  optional BulkSyncTaskExecutionFeedbackProto bulkSyncTaskExecutionFeedback = 12;
 }
 
 /**
@@ -275,7 +361,7 @@ message StorageBlockReportProto {
  */
 message BlockReportResponseProto {
   optional DatanodeCommandProto cmd = 1;
-} 
+}
 
 /**
  * registration - datanode registration information

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
index dd6f40a..8ba0396 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
@@ -589,7 +589,7 @@ public class TestDatanodeManager {
     Mockito.when(dm.getDatanode(dnReg)).thenReturn(nodeInfo);
     DatanodeCommand[] cmds = dm.handleHeartbeat(
         dnReg, new StorageReport[1], "bp-123", 0, 0, 10, maxTransfers, 0, null,
-        SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
+        SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT, null);
 
     long expectedNumCmds = Arrays.stream(
         new int[]{numReplicationTasks, numECTasks})

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index 05b6d30..e81fb1f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -116,7 +116,7 @@ public class TestNameNodePrunesMissingStorages {
       cluster.stopDataNode(0);
       cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0,
           0, null, true, SlowPeerReports.EMPTY_REPORT,
-          SlowDiskReports.EMPTY_REPORT);
+          SlowDiskReports.EMPTY_REPORT, null);
 
       // Check that the missing storage was pruned.
       assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
index b453991..2a6975f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
@@ -167,7 +167,8 @@ public class InternalDataNodeTestUtils {
             Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class),
             Mockito.anyBoolean(),
             Mockito.any(SlowPeerReports.class),
-            Mockito.any(SlowDiskReports.class))).thenReturn(
+            Mockito.any(SlowDiskReports.class),
+            null)).thenReturn(
         new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat(
             HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current()
             .nextLong() | 1L));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 4863ca1..62b84d9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -157,7 +159,8 @@ public class TestBPOfferService {
           Mockito.any(VolumeFailureSummary.class),
           Mockito.anyBoolean(),
           Mockito.any(SlowPeerReports.class),
-          Mockito.any(SlowDiskReports.class));
+          Mockito.any(SlowDiskReports.class),
+          Mockito.any(BulkSyncTaskExecutionFeedback.class));
     mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
     datanodeCommands[nnIdx] = new DatanodeCommand[0];
     return mock;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 07fd4ae..09fae14 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode;
 import org.apache.hadoop.hdfs.AppendTestUtil;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
@@ -232,7 +233,8 @@ public class TestBlockRecovery {
             Mockito.any(VolumeFailureSummary.class),
             Mockito.anyBoolean(),
             Mockito.any(SlowPeerReports.class),
-            Mockito.any(SlowDiskReports.class)))
+            Mockito.any(SlowDiskReports.class),
+            Mockito.any(BulkSyncTaskExecutionFeedback.class)))
         .thenReturn(new HeartbeatResponse(
             new DatanodeCommand[0],
             new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
index 28427bc..6374540 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
@@ -24,13 +24,14 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHEC
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY;
 
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.any;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.anyBoolean;
 import static org.mockito.Mockito.anyInt;
 import static org.mockito.Mockito.anyLong;
@@ -172,7 +173,8 @@ public class TestDataNodeLifeline {
             any(VolumeFailureSummary.class),
             anyBoolean(),
             any(SlowPeerReports.class),
-            any(SlowDiskReports.class));
+            any(SlowDiskReports.class),
+            any(BulkSyncTaskExecutionFeedback.class));
 
     // Intercept lifeline to trigger latch count-down on each call.
     doAnswer(new LatchCountingAnswer<Void>(lifelinesSent))
@@ -237,7 +239,8 @@ public class TestDataNodeLifeline {
             any(VolumeFailureSummary.class),
             anyBoolean(),
             any(SlowPeerReports.class),
-            any(SlowDiskReports.class));
+            any(SlowDiskReports.class),
+            any(BulkSyncTaskExecutionFeedback.class));
 
     // While waiting on the latch for the expected number of heartbeat messages,
     // poll DataNode tracking information.  We expect that the DataNode always

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
index bb1d9ef..2d7dea9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.server.datanode;
 
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -222,7 +223,8 @@ public class TestDatanodeProtocolRetryPolicy {
            Mockito.any(VolumeFailureSummary.class),
            Mockito.anyBoolean(),
            Mockito.any(SlowPeerReports.class),
-           Mockito.any(SlowDiskReports.class));
+           Mockito.any(SlowDiskReports.class),
+           Mockito.any(BulkSyncTaskExecutionFeedback.class));
 
     dn = new DataNode(conf, locations, null, null) {
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
index 2dbd5b9..2cf0135 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import net.jcip.annotations.NotThreadSafe;
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
@@ -208,7 +209,8 @@ public class TestFsDatasetCache {
           (StorageReport[]) any(), anyLong(), anyLong(),
           anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
           anyBoolean(), any(SlowPeerReports.class),
-          any(SlowDiskReports.class));
+          any(SlowDiskReports.class),
+          any(BulkSyncTaskExecutionFeedback.class));
     } finally {
       lock.writeLock().unlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
index 5f62ddb..0f0bc1d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@@ -110,7 +111,8 @@ public class TestStorageReport {
         anyLong(), anyLong(), anyInt(), anyInt(), anyInt(),
         Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(),
         Mockito.any(SlowPeerReports.class),
-        Mockito.any(SlowDiskReports.class));
+        Mockito.any(SlowDiskReports.class),
+        Mockito.any(BulkSyncTaskExecutionFeedback.class));
 
     StorageReport[] reports = captor.getValue();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index 3a3c471..9940174 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -956,8 +956,8 @@ public class NNThroughputBenchmark implements Tool {
           DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) };
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
           0L, 0L, 0, 0, 0, null, true,
-          SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
-          .getCommands();
+          SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
+          null).getCommands();
       if(cmds != null) {
         for (DatanodeCommand cmd : cmds ) {
           if(LOG.isDebugEnabled()) {
@@ -1007,8 +1007,8 @@ public class NNThroughputBenchmark implements Tool {
           false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) };
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
           rep, 0L, 0L, 0, 0, 0, null, true,
-          SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
-          .getCommands();
+          SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
+          null).getCommands();
       if (cmds != null) {
         for (DatanodeCommand cmd : cmds) {
           if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index b85527a..8589f75 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -17,11 +17,13 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import static org.mockito.Mockito.spy;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.lang3.reflect.FieldUtils;
@@ -130,7 +132,8 @@ public class NameNodeAdapter {
     return namesystem.handleHeartbeat(nodeReg,
         BlockManagerTestUtil.getStorageReportsForDatanode(dd),
         dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true,
-        SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
+        SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
+        new BulkSyncTaskExecutionFeedback(new ArrayList<>()));
   }
 
   public static boolean setReplication(final FSNamesystem ns,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d52a2afb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index 366f584..9f9a897 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -139,8 +139,8 @@ public class TestDeadDatanode {
         false, 0, 0, 0, 0, 0) };
     DatanodeCommand[] cmd =
         dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true,
-            SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
-            .getCommands();
+            SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT,
+            null).getCommands();
     assertEquals(1, cmd.length);
     assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
         .getAction());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message