hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rake...@apache.org
Subject [35/50] [abbrv] hadoop git commit: HDFS-10954. [SPS]: Provide mechanism to send blocks movement result back to NN from coordinator DN. Contributed by Rakesh R
Date Fri, 17 Feb 2017 14:50:15 GMT
HDFS-10954. [SPS]: Provide mechanism to send blocks movement result back to NN from coordinator DN. Contributed by Rakesh R


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/047e1cd4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/047e1cd4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/047e1cd4

Branch: refs/heads/HDFS-10285
Commit: 047e1cd4368f7c65603c1ad7337e965a4ec306cb
Parents: e4983d5
Author: Rakesh Radhakrishnan <rakeshr@apache.org>
Authored: Thu Nov 3 09:39:14 2016 +0530
Committer: Rakesh Radhakrishnan <rakeshr@apache.org>
Committed: Fri Feb 17 19:49:39 2017 +0530

----------------------------------------------------------------------
 .../DatanodeProtocolClientSideTranslatorPB.java |  9 ++-
 .../DatanodeProtocolServerSideTranslatorPB.java |  4 +-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 52 +++++++++++++++
 .../server/blockmanagement/BlockManager.java    |  4 ++
 .../hdfs/server/datanode/BPServiceActor.java    | 25 ++++++-
 .../datanode/StoragePolicySatisfyWorker.java    | 70 +++++++++++++++++---
 .../hdfs/server/namenode/FSNamesystem.java      | 10 ++-
 .../hdfs/server/namenode/NameNodeRpcServer.java |  7 +-
 .../server/namenode/StoragePolicySatisfier.java | 23 +++++++
 .../protocol/BlocksStorageMovementResult.java   | 64 ++++++++++++++++++
 .../hdfs/server/protocol/DatanodeProtocol.java  |  5 +-
 .../src/main/proto/DatanodeProtocol.proto       | 14 ++++
 .../TestNameNodePrunesMissingStorages.java      |  3 +-
 .../datanode/InternalDataNodeTestUtils.java     |  4 +-
 .../server/datanode/TestBPOfferService.java     |  8 ++-
 .../hdfs/server/datanode/TestBlockRecovery.java |  4 +-
 .../server/datanode/TestDataNodeLifeline.java   |  7 +-
 .../TestDatanodeProtocolRetryPolicy.java        |  4 +-
 .../server/datanode/TestFsDatasetCache.java     |  4 +-
 .../TestStoragePolicySatisfyWorker.java         | 13 ++--
 .../hdfs/server/datanode/TestStorageReport.java |  4 +-
 .../server/namenode/NNThroughputBenchmark.java  |  6 +-
 .../hdfs/server/namenode/NameNodeAdapter.java   |  3 +-
 .../hdfs/server/namenode/TestDeadDatanode.java  |  4 +-
 .../namenode/TestStoragePolicySatisfier.java    | 50 ++++++++++++++
 25 files changed, 363 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index d9e6026..09a8c32 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlock
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -136,7 +137,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
       int xmitsInProgress, int xceiverCount, int failedVolumes,
       VolumeFailureSummary volumeFailureSummary,
       boolean requestFullBlockReportLease,
-      @Nonnull SlowPeerReports slowPeers) throws IOException {
+      @Nonnull SlowPeerReports slowPeers,
+      BlocksStorageMovementResult[] blksMovementResults) throws IOException {
     HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
         .setRegistration(PBHelper.convert(registration))
         .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
@@ -156,6 +158,11 @@ public class DatanodeProtocolClientSideTranslatorPB implements
     if (slowPeers.haveSlowPeers()) {
       builder.addAllSlowPeers(PBHelper.convertSlowPeerInfo(slowPeers));
     }
+
+    // Adding blocks movement results to the heart beat request.
+    builder.addAllBlksMovementResults(
+        PBHelper.convertBlksMovResults(blksMovementResults));
+
     HeartbeatResponseProto resp;
     try {
       resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index b1c8e34..56d3f20 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@ -121,7 +121,9 @@ public class DatanodeProtocolServerSideTranslatorPB implements
           request.getXmitsInProgress(),
           request.getXceiverCount(), request.getFailedVolumes(),
           volumeFailureSummary, request.getRequestFullBlockReportLease(),
-          PBHelper.convertSlowPeerInfo(request.getSlowPeersList()));
+          PBHelper.convertSlowPeerInfo(request.getSlowPeersList()),
+          PBHelper.convertBlksMovResults(
+              request.getBlksMovementResultsList()));
     } catch (IOException e) {
       throw new ServiceException(e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 6949d71..8caa277 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailur
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlocksStorageMovementResultProto;
 import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
@@ -96,6 +97,8 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStr
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
@@ -884,6 +887,55 @@ public class PBHelper {
     return SlowPeerReports.create(slowPeersMap);
   }
 
+  public static BlocksStorageMovementResult[] convertBlksMovResults(
+      List<BlocksStorageMovementResultProto> protos) {
+    BlocksStorageMovementResult[] results =
+        new BlocksStorageMovementResult[protos.size()];
+    for (int i = 0; i < protos.size(); i++) {
+      BlocksStorageMovementResultProto resultProto = protos.get(i);
+      BlocksStorageMovementResult.Status status;
+      switch (resultProto.getStatus()) {
+      case SUCCESS:
+        status = Status.SUCCESS;
+        break;
+      case FAILURE:
+        status = Status.FAILURE;
+        break;
+      default:
+        throw new AssertionError("Unknown status: " + resultProto.getStatus());
+      }
+      results[i] = new BlocksStorageMovementResult(resultProto.getTrackID(),
+          status);
+    }
+    return results;
+  }
+
+  public static List<BlocksStorageMovementResultProto> convertBlksMovResults(
+      BlocksStorageMovementResult[] blocksMovementResults) {
+    List<BlocksStorageMovementResultProto> blocksMovementResultsProto =
+        new ArrayList<>();
+    BlocksStorageMovementResultProto.Builder builder =
+        BlocksStorageMovementResultProto.newBuilder();
+    for (int i = 0; i < blocksMovementResults.length; i++) {
+      BlocksStorageMovementResult report = blocksMovementResults[i];
+      builder.setTrackID(report.getTrackId());
+      BlocksStorageMovementResultProto.Status status;
+      switch (report.getStatus()) {
+      case SUCCESS:
+        status = BlocksStorageMovementResultProto.Status.SUCCESS;
+        break;
+      case FAILURE:
+        status = BlocksStorageMovementResultProto.Status.FAILURE;
+        break;
+      default:
+        throw new AssertionError("Unknown status: " + report.getStatus());
+      }
+      builder.setStatus(status);
+      blocksMovementResultsProto.add(builder.build());
+    }
+    return blocksMovementResultsProto;
+  }
+
   public static JournalInfo convert(JournalInfoProto info) {
     int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
     int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index d5bb793..a3fae3d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -4746,4 +4746,8 @@ public class BlockManager implements BlockStatsMXBean {
   public void satisfyStoragePolicy(long id) {
     storageMovementNeeded.add(id);
   }
+
+  public StoragePolicySatisfier getStoragePolicySatisfier() {
+    return sps;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 644a8ab..da1eddf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -502,6 +503,10 @@ class BPServiceActor implements Runnable {
         slowPeersReportDue && dn.getPeerMetrics() != null ?
             SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
             SlowPeerReports.EMPTY_REPORT;
+
+    BlocksStorageMovementResult[] blksMovementResults =
+        getBlocksMovementResults();
+
     HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
         reports,
         dn.getFSDataset().getCacheCapacity(),
@@ -511,15 +516,33 @@ class BPServiceActor implements Runnable {
         numFailedVolumes,
         volumeFailureSummary,
         requestBlockReportLease,
-        slowPeers);
+        slowPeers,
+        blksMovementResults);
 
     if (slowPeersReportDue) {
       // If the report was due and successfully sent, schedule the next one.
       scheduler.scheduleNextSlowPeerReport();
     }
+
+    // Remove the blocks movement results after successfully transferring
+    // to namenode.
+    dn.getStoragePolicySatisfyWorker().getBlocksMovementsCompletionHandler()
+        .remove(blksMovementResults);
+
     return response;
   }
 
+  private BlocksStorageMovementResult[] getBlocksMovementResults() {
+    List<BlocksStorageMovementResult> trackIdVsMovementStatus = dn
+        .getStoragePolicySatisfyWorker().getBlocksMovementsCompletionHandler()
+        .getBlksMovementResults();
+    BlocksStorageMovementResult[] blksMovementResult =
+        new BlocksStorageMovementResult[trackIdVsMovementStatus.size()];
+    trackIdVsMovementStatus.toArray(blksMovementResult);
+
+    return blksMovementResult;
+  }
+
   @VisibleForTesting
   void sendLifelineForTests() throws IOException {
     lifelineSender.sendLifeline();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index 604fb4a..8a8f87d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -29,6 +29,7 @@ import java.io.OutputStream;
 import java.net.Socket;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
@@ -276,7 +278,7 @@ public class StoragePolicySatisfyWorker {
   /**
    * Block movement status code.
    */
-  enum BlockMovementStatus {
+  public static enum BlockMovementStatus {
     /** Success. */
     DN_BLK_STORAGE_MOVEMENT_SUCCESS(0),
     /**
@@ -343,26 +345,72 @@ public class StoragePolicySatisfyWorker {
 
   /**
    * Blocks movements completion handler, which is used to collect details of
-   * the completed list of block movements and notify the namenode about the
-   * success or failures.
+   * the completed list of block movements and this status(success or failure)
+   * will be send to the namenode via heartbeat.
    */
   static class BlocksMovementsCompletionHandler {
-    private final List<BlockMovementResult> completedBlocks = new ArrayList<>();
+    private final List<BlocksStorageMovementResult> trackIdVsMovementStatus =
+        new ArrayList<>();
 
     /**
-     * Collect all the block movement results and notify namenode.
+     * Collect all the block movement results. Later this will be send to
+     * namenode via heart beat.
      *
      * @param results
      *          result of all the block movements per trackId
      */
-    void handle(List<BlockMovementResult> results) {
-      completedBlocks.addAll(results);
-      // TODO: notify namenode about the success/failures.
+    void handle(List<BlockMovementResult> resultsPerTrackId) {
+      BlocksStorageMovementResult.Status status =
+          BlocksStorageMovementResult.Status.SUCCESS;
+      long trackId = -1;
+      for (BlockMovementResult blockMovementResult : resultsPerTrackId) {
+        trackId = blockMovementResult.getTrackId();
+        if (blockMovementResult.status ==
+            BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE) {
+          status = BlocksStorageMovementResult.Status.FAILURE;
+          // If any of the block movement is failed, then mark as failure so
+          // that namenode can take a decision to retry the blocks associated to
+          // the given trackId.
+          break;
+        }
+      }
+
+      // Adding to the tracking results list. Later this will be send to
+      // namenode via datanode heartbeat.
+      synchronized (trackIdVsMovementStatus) {
+        trackIdVsMovementStatus.add(
+            new BlocksStorageMovementResult(trackId, status));
+      }
+    }
+
+    /**
+     * @return unmodifiable list of blocks storage movement results.
+     */
+    List<BlocksStorageMovementResult> getBlksMovementResults() {
+      synchronized (trackIdVsMovementStatus) {
+        if (trackIdVsMovementStatus.size() <= 0) {
+          return new ArrayList<>();
+        }
+        List<BlocksStorageMovementResult> results = Collections
+            .unmodifiableList(trackIdVsMovementStatus);
+        return results;
+      }
     }
 
-    @VisibleForTesting
-    List<BlockMovementResult> getCompletedBlocks() {
-      return completedBlocks;
+    /**
+     * Remove the blocks storage movement results.
+     *
+     * @param results
+     *          set of blocks storage movement results
+     */
+    void remove(BlocksStorageMovementResult[] results) {
+      if (results != null) {
+        synchronized (trackIdVsMovementStatus) {
+          for (BlocksStorageMovementResult blocksMovementResult : results) {
+            trackIdVsMovementStatus.remove(blocksMovementResult);
+          }
+        }
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 38a326c..09ef4e3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -247,6 +247,7 @@ import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
 import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
 import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
 import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -3642,7 +3643,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       int xceiverCount, int xmitsInProgress, int failedVolumes,
       VolumeFailureSummary volumeFailureSummary,
       boolean requestFullBlockReportLease,
-      @Nonnull SlowPeerReports slowPeers) throws IOException {
+      @Nonnull SlowPeerReports slowPeers,
+      BlocksStorageMovementResult[] blksMovementResults) throws IOException {
     readLock();
     try {
       //get datanode commands
@@ -3656,6 +3658,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       if (requestFullBlockReportLease) {
         blockReportLeaseId =  blockManager.requestBlockReportLeaseId(nodeReg);
       }
+
+      // TODO: Handle blocks movement results send by the coordinator datanode.
+      // This has to be revisited as part of HDFS-11029.
+      blockManager.getStoragePolicySatisfier()
+          .handleBlocksStorageMovementResults(blksMovementResults);
+
       //create ha status
       final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
           haContext.getState().getServiceState(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index f9cfa42..d4577a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -142,6 +142,7 @@ import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -1422,13 +1423,15 @@ public class NameNodeRpcServer implements NamenodeProtocols {
       int xmitsInProgress, int xceiverCount,
       int failedVolumes, VolumeFailureSummary volumeFailureSummary,
       boolean requestFullBlockReportLease,
-      @Nonnull SlowPeerReports slowPeers) throws IOException {
+      @Nonnull SlowPeerReports slowPeers,
+      BlocksStorageMovementResult[] blkMovementStatus) throws IOException {
     checkNNStartup();
     verifyRequest(nodeReg);
     return namesystem.handleHeartbeat(nodeReg, report,
         dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
         failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
-        slowPeers);
+        slowPeers,
+        blkMovementStatus);
   }
 
   @Override // DatanodeProtocol

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index b5aed37..fbe686a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -39,11 +39,14 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Setting storagePolicy on a file after the file write will only update the new
  * storage policy type in Namespace, but physical block storage movement will
@@ -394,4 +397,24 @@ public class StoragePolicySatisfier implements Runnable {
       return typeNodeMap.get(type);
     }
   }
+
+  // TODO: Temporarily keeping the results for assertion. This has to be
+  // revisited as part of HDFS-11029.
+  @VisibleForTesting
+  List<BlocksStorageMovementResult> results = new ArrayList<>();
+
+  /**
+   * Receives the movement results of collection of blocks associated to a
+   * trackId.
+   *
+   * @param blksMovementResults
+   *          movement status of the set of blocks associated to a trackId.
+   */
+  void handleBlocksStorageMovementResults(
+      BlocksStorageMovementResult[] blksMovementResults) {
+    if (blksMovementResults.length <= 0) {
+      return;
+    }
+    results.addAll(Arrays.asList(blksMovementResults));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
new file mode 100644
index 0000000..1afba34
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+/**
+ * This class represents, movement status of a set of blocks associated to a
+ * track Id.
+ */
+public class BlocksStorageMovementResult {
+
+  private final long trackId;
+  private final Status status;
+
+  /**
+   * SUCCESS - If all the blocks associated to track id has moved successfully
+   * or maximum possible movements done.
+   *
+   * <p>
+   * FAILURE - If any of its(trackId) blocks movement failed and requires to
+   * retry these failed blocks movements. Example selected target node is no
+   * more running or no space. So, retrying by selecting new target node might
+   * work.
+   */
+  public static enum Status {
+    SUCCESS, FAILURE;
+  }
+
+  /**
+   * BlocksStorageMovementResult constructor.
+   *
+   * @param trackId
+   *          tracking identifier
+   * @param status
+   *          block movement status
+   */
+  public BlocksStorageMovementResult(long trackId, Status status) {
+    this.trackId = trackId;
+    this.status = status;
+  }
+
+  public long getTrackId() {
+    return trackId;
+  }
+
+  public Status getStatus() {
+    return status;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index 79a0132..c54f90c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -111,6 +111,8 @@ public interface DatanodeProtocol {
    * @param slowPeers Details of peer DataNodes that were detected as being
    *                  slow to respond to packet writes. Empty report if no
    *                  slow peers were detected by the DataNode.
+   * @param blksMovementResults array of movement status of a set of blocks
+   *                            associated to a trackId.
    * @throws IOException on error
    */
   @Idempotent
@@ -123,7 +125,8 @@ public interface DatanodeProtocol {
                                        int failedVolumes,
                                        VolumeFailureSummary volumeFailureSummary,
                                        boolean requestFullBlockReportLease,
-                                       @Nonnull SlowPeerReports slowPeers)
+                                       @Nonnull SlowPeerReports slowPeers,
+                                       BlocksStorageMovementResult[] blksMovementResults)
       throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 7fd2781..f914935 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -177,6 +177,18 @@ message BlockStorageMovementProto {
 }
 
 /**
+ * Movement status of the set of blocks associated to a trackId.
+ */
+message BlocksStorageMovementResultProto {
+  enum Status {
+    SUCCESS = 1; // block movement succeeded
+    FAILURE = 2; // block movement failed and needs to retry
+  }
+  required uint64 trackID = 1;
+  required Status status = 2;
+}
+
+/**
  * registration - Information of the datanode registering with the namenode
  */
 message RegisterDatanodeRequestProto {
@@ -218,6 +230,7 @@ message VolumeFailureSummaryProto {
  * cacheUsed - amount of cache used
  * volumeFailureSummary - info about volume failures
  * slowPeers - info about peer DataNodes that are suspected to be slow.
+ * blksMovementResults - status of the scheduled blocks movements
  */
 message HeartbeatRequestProto {
   required DatanodeRegistrationProto registration = 1; // Datanode info
@@ -230,6 +243,7 @@ message HeartbeatRequestProto {
   optional VolumeFailureSummaryProto volumeFailureSummary = 8;
   optional bool requestFullBlockReportLease = 9 [ default = false ];
   repeated SlowPeerReportProto slowPeers = 10;
+  repeated BlocksStorageMovementResultProto blksMovementResults = 11;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index a5c6e0d..a146322 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
@@ -114,7 +115,7 @@ public class TestNameNodePrunesMissingStorages {
       // Stop the DataNode and send fake heartbeat with missing storage.
       cluster.stopDataNode(0);
       cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0,
-          0, null, true, SlowPeerReports.EMPTY_REPORT);
+          0, null, true, SlowPeerReports.EMPTY_REPORT, new BlocksStorageMovementResult[0]);
 
       // Check that the missing storage was pruned.
       assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
index cf43fd0..a1275df 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@@ -138,7 +139,8 @@ public class InternalDataNodeTestUtils {
             Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt(),
             Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class),
             Mockito.anyBoolean(),
-            Mockito.any(SlowPeerReports.class))).thenReturn(
+            Mockito.any(SlowPeerReports.class),
+            Mockito.any(BlocksStorageMovementResult[].class))).thenReturn(
         new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat(
             HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current()
             .nextLong() | 1L));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index c6b38ee..1f670d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -121,6 +122,8 @@ public class TestBPOfferService {
     Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
     Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn"))
         .when(mockDn).getMetrics();
+    Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn)).when(mockDn)
+        .getStoragePolicySatisfyWorker();
 
     // Set up a simulated dataset with our fake BP
     mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf));
@@ -154,7 +157,8 @@ public class TestBPOfferService {
           Mockito.anyInt(),
           Mockito.any(VolumeFailureSummary.class),
           Mockito.anyBoolean(),
-          Mockito.any(SlowPeerReports.class));
+          Mockito.any(SlowPeerReports.class),
+          Mockito.any(BlocksStorageMovementResult[].class));
     mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
     datanodeCommands[nnIdx] = new DatanodeCommand[0];
     return mock;
@@ -344,6 +348,8 @@ public class TestBPOfferService {
     Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf();
     Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")).
       when(mockDn).getMetrics();
+    Mockito.doReturn(new StoragePolicySatisfyWorker(conf, mockDn)).when(mockDn)
+        .getStoragePolicySatisfyWorker();
     final AtomicInteger count = new AtomicInteger();
     Mockito.doAnswer(new Answer<Void>() {
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index b64f1e2..3667151 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -82,6 +82,7 @@ import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -219,7 +220,8 @@ public class TestBlockRecovery {
             Mockito.anyInt(),
             Mockito.any(VolumeFailureSummary.class),
             Mockito.anyBoolean(),
-            Mockito.any(SlowPeerReports.class)))
+            Mockito.any(SlowPeerReports.class),
+            Mockito.any(BlocksStorageMovementResult[].class)))
         .thenReturn(new HeartbeatResponse(
             new DatanodeCommand[0],
             new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
index 8a9f0b8..3f56fd4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
@@ -169,7 +170,8 @@ public class TestDataNodeLifeline {
             anyInt(),
             any(VolumeFailureSummary.class),
             anyBoolean(),
-            any(SlowPeerReports.class));
+            any(SlowPeerReports.class),
+            any(BlocksStorageMovementResult[].class));
 
     // Intercept lifeline to trigger latch count-down on each call.
     doAnswer(new LatchCountingAnswer<Void>(lifelinesSent))
@@ -233,7 +235,8 @@ public class TestDataNodeLifeline {
             anyInt(),
             any(VolumeFailureSummary.class),
             anyBoolean(),
-            any(SlowPeerReports.class));
+            any(SlowPeerReports.class),
+            any(BlocksStorageMovementResult[].class));
 
     // While waiting on the latch for the expected number of heartbeat messages,
     // poll DataNode tracking information.  We expect that the DataNode always

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
index c94f74e..1aef383 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@@ -220,7 +221,8 @@ public class TestDatanodeProtocolRetryPolicy {
            Mockito.anyInt(),
            Mockito.any(VolumeFailureSummary.class),
            Mockito.anyBoolean(),
-           Mockito.any(SlowPeerReports.class));
+           Mockito.any(SlowPeerReports.class),
+           Mockito.any(BlocksStorageMovementResult[].class));
 
     dn = new DataNode(conf, locations, null, null) {
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
index eb015c0..c6764db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.Page
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -173,7 +174,8 @@ public class TestFsDatasetCache {
         (DatanodeRegistration) any(),
         (StorageReport[]) any(), anyLong(), anyLong(),
         anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
-        anyBoolean(), any(SlowPeerReports.class));
+        anyBoolean(), any(SlowPeerReports.class),
+        (BlocksStorageMovementResult[]) any());
   }
 
   private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
index ea3eec3..1eb44e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
@@ -34,10 +34,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult;
-import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementStatus;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
@@ -191,12 +190,12 @@ public class TestStoragePolicySatisfyWorker {
     GenericTestUtils.waitFor(new Supplier<Boolean>() {
       @Override
       public Boolean get() {
-        List<BlockMovementResult> completedBlocks = worker
-            .getBlocksMovementsCompletionHandler().getCompletedBlocks();
+        List<BlocksStorageMovementResult> completedBlocks = worker
+            .getBlocksMovementsCompletionHandler().getBlksMovementResults();
         int failedCount = 0;
-        for (BlockMovementResult blockMovementResult : completedBlocks) {
-          if (BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE ==
-              blockMovementResult.getStatus()) {
+        for (BlocksStorageMovementResult blkMovementResult : completedBlocks) {
+          if (blkMovementResult.getStatus() ==
+              BlocksStorageMovementResult.Status.FAILURE) {
             failedCount++;
           }
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
index 2b793e9..94a7908 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
@@ -108,7 +109,8 @@ public class TestStorageReport {
         captor.capture(),
         anyLong(), anyLong(), anyInt(), anyInt(), anyInt(),
         Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(),
-        Mockito.any(SlowPeerReports.class));
+        Mockito.any(SlowPeerReports.class),
+        Mockito.any(BlocksStorageMovementResult[].class));
 
     StorageReport[] reports = captor.getValue();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index b86b3fb..d8e83df 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -953,7 +954,8 @@ public class NNThroughputBenchmark implements Tool {
           DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) };
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
           0L, 0L, 0, 0, 0, null, true,
-          SlowPeerReports.EMPTY_REPORT).getCommands();
+          SlowPeerReports.EMPTY_REPORT,
+          new BlocksStorageMovementResult[0]).getCommands();
       if(cmds != null) {
         for (DatanodeCommand cmd : cmds ) {
           if(LOG.isDebugEnabled()) {
@@ -1003,7 +1005,7 @@ public class NNThroughputBenchmark implements Tool {
           false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) };
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
           rep, 0L, 0L, 0, 0, 0, null, true,
-          SlowPeerReports.EMPTY_REPORT).getCommands();
+          SlowPeerReports.EMPTY_REPORT, new BlocksStorageMovementResult[0]).getCommands();
       if (cmds != null) {
         for (DatanodeCommand cmd : cmds) {
           if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index 2b8faf4..43a2fa3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.ha.EditLogTailer;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
@@ -124,7 +125,7 @@ public class NameNodeAdapter {
     return namesystem.handleHeartbeat(nodeReg,
         BlockManagerTestUtil.getStorageReportsForDatanode(dd),
         dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true,
-        SlowPeerReports.EMPTY_REPORT);
+        SlowPeerReports.EMPTY_REPORT, new BlocksStorageMovementResult[0]);
   }
 
   public static boolean setReplication(final FSNamesystem ns,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index b9161c3..69851d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -134,7 +135,8 @@ public class TestDeadDatanode {
         false, 0, 0, 0, 0, 0) };
     DatanodeCommand[] cmd =
         dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true,
-            SlowPeerReports.EMPTY_REPORT).getCommands();
+            SlowPeerReports.EMPTY_REPORT, new BlocksStorageMovementResult[0]).getCommands();
+
     assertEquals(1, cmd.length);
     assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
         .getAction());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/047e1cd4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index 37664b5..cbfdfc6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.IOException;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -27,6 +28,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -146,6 +148,54 @@ public class TestStoragePolicySatisfier {
     }
   }
 
+  /**
+   * Tests to verify that the block storage movement results will be propagated
+   * to Namenode via datanode heartbeat.
+   */
+  @Test(timeout = 300000)
+  public void testPerTrackIdBlocksStorageMovementResults() throws Exception {
+    try {
+      // Change policy to ONE_SSD
+      distributedFS.setStoragePolicy(new Path(file), "ONE_SSD");
+      FSNamesystem namesystem = hdfsCluster.getNamesystem();
+      INode inode = namesystem.getFSDirectory().getINode(file);
+
+      StorageType[][] newtypes =
+          new StorageType[][]{{StorageType.SSD, StorageType.DISK}};
+
+      // Making sure SDD based nodes added to cluster. Adding SSD based
+      // datanodes.
+      startAdditionalDNs(config, 1, numOfDatanodes, newtypes,
+          storagesPerDatanode, capacity, hdfsCluster);
+      namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+      hdfsCluster.triggerHeartbeats();
+
+      // Wait till the block is moved to SSD areas
+      waitExpectedStorageType(file, StorageType.SSD, distributedFS, 1, 30000);
+      waitExpectedStorageType(file, StorageType.DISK, distributedFS, 2, 30000);
+
+      // TODO: Temporarily using the results from StoragePolicySatisfier class.
+      // This has to be revisited as part of HDFS-11029.
+      waitForBlocksMovementResult(1, 30000);
+    } finally {
+      hdfsCluster.shutdown();
+    }
+  }
+
+  private void waitForBlocksMovementResult(int expectedResultsCount,
+      int timeout) throws TimeoutException, InterruptedException {
+    BlockManager blockManager = hdfsCluster.getNamesystem().getBlockManager();
+    final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        LOG.info("expectedResultsCount={} actualResultsCount={}",
+            expectedResultsCount, sps.results.size());
+        return expectedResultsCount == sps.results.size();
+      }
+    }, 100, timeout);
+  }
+
   private void writeContent(final DistributedFileSystem dfs,
       final String fileName) throws IOException {
     // write to DISK


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message