hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From umamah...@apache.org
Subject [2/3] hadoop git commit: HDFS-12570: [SPS]: Refactor Co-ordinator datanode logic to track the block storage movements. Contributed by Rakesh R.
Date Fri, 13 Oct 2017 00:18:46 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
index 549819f..cc5b63a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
@@ -22,15 +22,12 @@ import static org.apache.hadoop.util.Time.monotonicNow;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.AttemptedItemInfo;
 import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult.Status;
 import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,14 +35,12 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 
 /**
- * A monitor class for checking whether block storage movements finished or not.
- * If block storage movement results from datanode indicates about the movement
- * success, then it will just remove the entries from tracking. If it reports
- * failure, then it will add back to needed block storage movements list. If it
- * reports in_progress, that means the blocks movement is in progress and the
- * coordinator is still tracking the movement. If no DN reports about movement
- * for longer time, then such items will be retries automatically after timeout.
- * The default timeout would be 30mins.
+ * A monitor class for checking whether block storage movements attempt
+ * completed or not. If this receives block storage movement attempt
+ * status(either success or failure) from DN then it will just remove the
+ * entries from tracking. If there is no DN reports about movement attempt
+ * finished for a longer time period, then such items will retries automatically
+ * after timeout. The default timeout would be 5 minutes.
  */
 public class BlockStorageMovementAttemptedItems {
   private static final Logger LOG =
@@ -55,37 +50,34 @@ public class BlockStorageMovementAttemptedItems {
    * A map holds the items which are already taken for blocks movements
    * processing and sent to DNs.
    */
-  private final Map<Long, AttemptedItemInfo> storageMovementAttemptedItems;
-  private final List<BlocksStorageMovementResult> storageMovementAttemptedResults;
+  private final List<AttemptedItemInfo> storageMovementAttemptedItems;
+  private final List<Block> movementFinishedBlocks;
   private volatile boolean monitorRunning = true;
   private Daemon timerThread = null;
-  private final StoragePolicySatisfier sps;
   //
-  // It might take anywhere between 20 to 60 minutes before
+  // It might take anywhere between 5 to 10 minutes before
   // a request is timed out.
   //
-  private long selfRetryTimeout = 20 * 60 * 1000;
+  private long selfRetryTimeout = 5 * 60 * 1000;
 
   //
-  // It might take anywhere between 5 to 10 minutes before
+  // It might take anywhere between 1 to 2 minutes before
   // a request is timed out.
   //
-  private long minCheckTimeout = 5 * 60 * 1000; // minimum value
+  private long minCheckTimeout = 1 * 60 * 1000; // minimum value
   private BlockStorageMovementNeeded blockStorageMovementNeeded;
 
   public BlockStorageMovementAttemptedItems(long recheckTimeout,
       long selfRetryTimeout,
-      BlockStorageMovementNeeded unsatisfiedStorageMovementFiles,
-      StoragePolicySatisfier sps) {
+      BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) {
     if (recheckTimeout > 0) {
       this.minCheckTimeout = Math.min(minCheckTimeout, recheckTimeout);
     }
 
     this.selfRetryTimeout = selfRetryTimeout;
     this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
-    storageMovementAttemptedItems = new HashMap<>();
-    storageMovementAttemptedResults = new ArrayList<>();
-    this.sps = sps;
+    storageMovementAttemptedItems = new ArrayList<>();
+    movementFinishedBlocks = new ArrayList<>();
   }
 
   /**
@@ -94,33 +86,26 @@ public class BlockStorageMovementAttemptedItems {
    *
    * @param itemInfo
    *          - tracking info
-   * @param allBlockLocsAttemptedToSatisfy
-   *          - failed to find matching target nodes to satisfy storage type
-   *          for all the block locations of the given blockCollectionID
    */
-  public void add(ItemInfo itemInfo, boolean allBlockLocsAttemptedToSatisfy) {
+  public void add(AttemptedItemInfo itemInfo) {
     synchronized (storageMovementAttemptedItems) {
-      AttemptedItemInfo attemptedItemInfo = new AttemptedItemInfo(
-          itemInfo.getStartId(), itemInfo.getTrackId(), monotonicNow(),
-          allBlockLocsAttemptedToSatisfy);
-      storageMovementAttemptedItems.put(itemInfo.getTrackId(),
-          attemptedItemInfo);
+      storageMovementAttemptedItems.add(itemInfo);
     }
   }
 
   /**
-   * Add the trackIDBlocksStorageMovementResults to
-   * storageMovementAttemptedResults.
+   * Add the storage movement attempt finished blocks to
+   * storageMovementFinishedBlocks.
    *
-   * @param blksMovementResults
+   * @param moveAttemptFinishedBlks
+   *          storage movement attempt finished blocks
    */
-  public void addResults(BlocksStorageMovementResult[] blksMovementResults) {
-    if (blksMovementResults.length == 0) {
+  public void addReportedMovedBlocks(Block[] moveAttemptFinishedBlks) {
+    if (moveAttemptFinishedBlks.length == 0) {
       return;
     }
-    synchronized (storageMovementAttemptedResults) {
-      storageMovementAttemptedResults
-          .addAll(Arrays.asList(blksMovementResults));
+    synchronized (movementFinishedBlocks) {
+      movementFinishedBlocks.addAll(Arrays.asList(moveAttemptFinishedBlks));
     }
   }
 
@@ -129,8 +114,8 @@ public class BlockStorageMovementAttemptedItems {
    */
   public synchronized void start() {
     monitorRunning = true;
-    timerThread = new Daemon(new BlocksStorageMovementAttemptResultMonitor());
-    timerThread.setName("BlocksStorageMovementAttemptResultMonitor");
+    timerThread = new Daemon(new BlocksStorageMovementAttemptMonitor());
+    timerThread.setName("BlocksStorageMovementAttemptMonitor");
     timerThread.start();
   }
 
@@ -163,82 +148,22 @@ public class BlockStorageMovementAttemptedItems {
   }
 
   /**
-   * This class contains information of an attempted trackID. Information such
-   * as, (a)last attempted or reported time stamp, (b)whether all the blocks in
-   * the trackID were attempted and blocks movement has been scheduled to
-   * satisfy storage policy. This is used by
-   * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
-   */
-  private final static class AttemptedItemInfo extends ItemInfo {
-    private long lastAttemptedOrReportedTime;
-    private final boolean allBlockLocsAttemptedToSatisfy;
-
-    /**
-     * AttemptedItemInfo constructor.
-     *
-     * @param rootId
-     *          rootId for trackId
-     * @param trackId
-     *          trackId for file.
-     * @param lastAttemptedOrReportedTime
-     *          last attempted or reported time
-     * @param allBlockLocsAttemptedToSatisfy
-     *          whether all the blocks in the trackID were attempted and blocks
-     *          movement has been scheduled to satisfy storage policy
-     */
-    private AttemptedItemInfo(long rootId, long trackId,
-        long lastAttemptedOrReportedTime,
-        boolean allBlockLocsAttemptedToSatisfy) {
-      super(rootId, trackId);
-      this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime;
-      this.allBlockLocsAttemptedToSatisfy = allBlockLocsAttemptedToSatisfy;
-    }
-
-    /**
-     * @return last attempted or reported time stamp.
-     */
-    private long getLastAttemptedOrReportedTime() {
-      return lastAttemptedOrReportedTime;
-    }
-
-    /**
-     * @return true/false. True value represents that, all the block locations
-     *         under the trackID has found matching target nodes to satisfy
-     *         storage policy. False value represents that, trackID needed
-     *         retries to satisfy the storage policy for some of the block
-     *         locations.
-     */
-    private boolean isAllBlockLocsAttemptedToSatisfy() {
-      return allBlockLocsAttemptedToSatisfy;
-    }
-
-    /**
-     * Update lastAttemptedOrReportedTime, so that the expiration time will be
-     * postponed to future.
-     */
-    private void touchLastReportedTimeStamp() {
-      this.lastAttemptedOrReportedTime = monotonicNow();
-    }
-
-  }
-
-  /**
-   * A monitor class for checking block storage movement result and long waiting
-   * items periodically.
+   * A monitor class for checking block storage movement attempt status and long
+   * waiting items periodically.
    */
-  private class BlocksStorageMovementAttemptResultMonitor implements Runnable {
+  private class BlocksStorageMovementAttemptMonitor implements Runnable {
     @Override
     public void run() {
       while (monitorRunning) {
         try {
-          blockStorageMovementResultCheck();
+          blockStorageMovementReportedItemsCheck();
           blocksStorageMovementUnReportedItemsCheck();
           Thread.sleep(minCheckTimeout);
         } catch (InterruptedException ie) {
-          LOG.info("BlocksStorageMovementAttemptResultMonitor thread "
+          LOG.info("BlocksStorageMovementAttemptMonitor thread "
               + "is interrupted.", ie);
         } catch (IOException ie) {
-          LOG.warn("BlocksStorageMovementAttemptResultMonitor thread "
+          LOG.warn("BlocksStorageMovementAttemptMonitor thread "
               + "received exception and exiting.", ie);
         }
       }
@@ -248,29 +173,21 @@ public class BlockStorageMovementAttemptedItems {
   @VisibleForTesting
   void blocksStorageMovementUnReportedItemsCheck() {
     synchronized (storageMovementAttemptedItems) {
-      Iterator<Entry<Long, AttemptedItemInfo>> iter =
-          storageMovementAttemptedItems.entrySet().iterator();
+      Iterator<AttemptedItemInfo> iter = storageMovementAttemptedItems
+          .iterator();
       long now = monotonicNow();
       while (iter.hasNext()) {
-        Entry<Long, AttemptedItemInfo> entry = iter.next();
-        AttemptedItemInfo itemInfo = entry.getValue();
+        AttemptedItemInfo itemInfo = iter.next();
         if (now > itemInfo.getLastAttemptedOrReportedTime()
             + selfRetryTimeout) {
-          Long blockCollectionID = entry.getKey();
-          synchronized (storageMovementAttemptedResults) {
-            if (!isExistInResult(blockCollectionID)) {
-              ItemInfo candidate = new ItemInfo(
-                  itemInfo.getStartId(), blockCollectionID);
-              blockStorageMovementNeeded.add(candidate);
-              iter.remove();
-              LOG.info("TrackID: {} becomes timed out and moved to needed "
-                  + "retries queue for next iteration.", blockCollectionID);
-            } else {
-              LOG.info("Blocks storage movement results for the"
-                  + " tracking id : " + blockCollectionID
-                  + " is reported from one of the co-ordinating datanode."
-                  + " So, the result will be processed soon.");
-            }
+          Long blockCollectionID = itemInfo.getTrackId();
+          synchronized (movementFinishedBlocks) {
+            ItemInfo candidate = new ItemInfo(itemInfo.getStartId(),
+                blockCollectionID);
+            blockStorageMovementNeeded.add(candidate);
+            iter.remove();
+            LOG.info("TrackID: {} becomes timed out and moved to needed "
+                + "retries queue for next iteration.", blockCollectionID);
           }
         }
       }
@@ -278,118 +195,38 @@ public class BlockStorageMovementAttemptedItems {
     }
   }
 
-  private boolean isExistInResult(Long blockCollectionID) {
-    Iterator<BlocksStorageMovementResult> iter = storageMovementAttemptedResults
-        .iterator();
-    while (iter.hasNext()) {
-      BlocksStorageMovementResult storageMovementAttemptedResult = iter.next();
-      if (storageMovementAttemptedResult.getTrackId() == blockCollectionID) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   @VisibleForTesting
-  void blockStorageMovementResultCheck() throws IOException {
-    synchronized (storageMovementAttemptedResults) {
-      Iterator<BlocksStorageMovementResult> resultsIter =
-          storageMovementAttemptedResults.iterator();
-      while (resultsIter.hasNext()) {
-        boolean isInprogress = false;
-        // TrackID need to be retried in the following cases:
-        // 1) All or few scheduled block(s) movement has been failed.
-        // 2) All the scheduled block(s) movement has been succeeded but there
-        // are unscheduled block(s) movement in this trackID. Say, some of
-        // the blocks in the trackID couldn't finding any matching target node
-        // for scheduling block movement in previous SPS iteration.
-        BlocksStorageMovementResult storageMovementAttemptedResult = resultsIter
-            .next();
+  void blockStorageMovementReportedItemsCheck() throws IOException {
+    synchronized (movementFinishedBlocks) {
+      Iterator<Block> finishedBlksIter = movementFinishedBlocks.iterator();
+      while (finishedBlksIter.hasNext()) {
+        Block blk = finishedBlksIter.next();
         synchronized (storageMovementAttemptedItems) {
-          Status status = storageMovementAttemptedResult.getStatus();
-          long trackId = storageMovementAttemptedResult.getTrackId();
-          AttemptedItemInfo attemptedItemInfo = storageMovementAttemptedItems
-              .get(trackId);
-          // itemInfo is null means no root for trackId, using trackId only as
-          // root and handling it in
-          // blockStorageMovementNeeded#removeIteamTrackInfo() for cleaning
-          // the xAttr
-          ItemInfo itemInfo = new ItemInfo((attemptedItemInfo != null)
-              ? attemptedItemInfo.getStartId() : trackId, trackId);
-          switch (status) {
-          case FAILURE:
-            if (attemptedItemInfo != null) {
-              blockStorageMovementNeeded.add(itemInfo);
-              LOG.warn("Blocks storage movement results for the tracking id:"
-                  + "{} is reported from co-ordinating datanode, but result"
-                  + " status is FAILURE. So, added for retry", trackId);
-            } else {
-              LOG.info("Blocks storage movement is FAILURE for the track"
-                  + " id {}. But the trackID doesn't exists in"
-                  + " storageMovementAttemptedItems list.", trackId);
-              blockStorageMovementNeeded
-                  .removeItemTrackInfo(itemInfo);
-            }
-            break;
-          case SUCCESS:
-            // ItemInfo could be null. One case is, before the blocks movements
-            // result arrives the attempted trackID became timed out and then
-            // removed the trackID from the storageMovementAttemptedItems list.
-            // TODO: Need to ensure that trackID is added to the
-            // 'blockStorageMovementNeeded' queue for retries to handle the
-            // following condition. If all the block locations under the trackID
-            // are attempted and failed to find matching target nodes to satisfy
-            // storage policy in previous SPS iteration.
-            String msg = "Blocks storage movement is SUCCESS for the track id: "
-                + trackId + " reported from co-ordinating datanode.";
-            if (attemptedItemInfo != null) {
-              if (!attemptedItemInfo.isAllBlockLocsAttemptedToSatisfy()) {
-                blockStorageMovementNeeded
-                    .add(new ItemInfo(attemptedItemInfo.getStartId(), trackId));
-                LOG.warn("{} But adding trackID back to retry queue as some of"
-                    + " the blocks couldn't find matching target nodes in"
-                    + " previous SPS iteration.", msg);
-              } else {
-                LOG.info(msg);
-                blockStorageMovementNeeded
-                    .removeItemTrackInfo(itemInfo);
-              }
-            } else {
-              LOG.info("{} But the trackID doesn't exists in "
-                  + "storageMovementAttemptedItems list", msg);
+          Iterator<AttemptedItemInfo> iterator = storageMovementAttemptedItems
+              .iterator();
+          while (iterator.hasNext()) {
+            AttemptedItemInfo attemptedItemInfo = iterator.next();
+            attemptedItemInfo.getBlocks().remove(blk);
+            if (attemptedItemInfo.getBlocks().isEmpty()) {
+              // TODO: try add this at front of the Queue, so that this element
+              // gets the chance first and can be cleaned from queue quickly as
+              // all movements already done.
               blockStorageMovementNeeded
-              .removeItemTrackInfo(itemInfo);
-            }
-            break;
-          case IN_PROGRESS:
-            isInprogress = true;
-            attemptedItemInfo = storageMovementAttemptedItems
-                .get(storageMovementAttemptedResult.getTrackId());
-            if(attemptedItemInfo != null){
-              // update the attempted expiration time to next cycle.
-              attemptedItemInfo.touchLastReportedTimeStamp();
+                  .add(new ItemInfo(attemptedItemInfo.getStartId(),
+                      attemptedItemInfo.getTrackId()));
+              iterator.remove();
             }
-            break;
-          default:
-            LOG.error("Unknown status: {}", status);
-            break;
-          }
-          // Remove trackID from the attempted list if the attempt has been
-          // completed(success or failure), if any.
-          if (!isInprogress) {
-            storageMovementAttemptedItems
-                .remove(storageMovementAttemptedResult.getTrackId());
           }
         }
-        // Remove trackID from results as processed above.
-        resultsIter.remove();
+        // Remove attempted blocks from movementFinishedBlocks list.
+        finishedBlksIter.remove();
       }
     }
   }
 
   @VisibleForTesting
-  public int resultsCount() {
-    return storageMovementAttemptedResults.size();
+  public int getMovementFinishedBlocksCount() {
+    return movementFinishedBlocks.size();
   }
 
   @VisibleForTesting
@@ -398,7 +235,7 @@ public class BlockStorageMovementAttemptedItems {
   }
 
   public void clearQueues() {
-    storageMovementAttemptedResults.clear();
+    movementFinishedBlocks.clear();
     storageMovementAttemptedItems.clear();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementInfosBatch.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementInfosBatch.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementInfosBatch.java
deleted file mode 100644
index a790c13..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementInfosBatch.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import java.util.List;
-
-import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-
-/**
- * This class represents a batch of blocks under one trackId which needs to move
- * its storage locations to satisfy the storage policy.
- */
-public class BlockStorageMovementInfosBatch {
-  private long trackID;
-  private List<BlockMovingInfo> blockMovingInfos;
-
-  /**
-   * Constructor to create the block storage movement infos batch.
-   *
-   * @param trackID
-   *          - unique identifier which will be used for tracking the given set
-   *          of blocks movement.
-   * @param blockMovingInfos
-   *          - list of block to storage infos.
-   */
-  public BlockStorageMovementInfosBatch(long trackID,
-      List<BlockMovingInfo> blockMovingInfos) {
-    this.trackID = trackID;
-    this.blockMovingInfos = blockMovingInfos;
-  }
-
-  public long getTrackID() {
-    return trackID;
-  }
-
-  public List<BlockMovingInfo> getBlockMovingInfo() {
-    return blockMovingInfos;
-  }
-
-  @Override
-  public String toString() {
-    return new StringBuilder().append("BlockStorageMovementInfosBatch(\n  ")
-        .append("TrackID: ").append(trackID).append("  BlockMovingInfos: ")
-        .append(blockMovingInfos).append(")").toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 2d05090..f39bbe8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -263,7 +263,7 @@ import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
 import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
 import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
 import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -3798,7 +3798,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       boolean requestFullBlockReportLease,
       @Nonnull SlowPeerReports slowPeers,
       @Nonnull SlowDiskReports slowDisks,
-      BlocksStorageMovementResult[] blksMovementResults) throws IOException {
+      BlocksStorageMoveAttemptFinished blksMovementsFinished)
+          throws IOException {
     readLock();
     try {
       //get datanode commands
@@ -3819,11 +3820,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         if (!sps.isRunning()) {
           if (LOG.isDebugEnabled()) {
             LOG.debug(
-                "Storage policy satisfier is not running. So, ignoring block "
-                    + "storage movement results sent by co-ordinator datanode");
+                "Storage policy satisfier is not running. So, ignoring storage"
+                    + "  movement attempt finished block info sent by DN");
           }
         } else {
-          sps.handleBlocksStorageMovementResults(blksMovementResults);
+          sps.handleStorageMovementAttemptFinishedBlks(blksMovementsFinished);
         }
       }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index c861cef..8009990 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -150,7 +150,7 @@ import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -1478,14 +1478,15 @@ public class NameNodeRpcServer implements NamenodeProtocols {
       boolean requestFullBlockReportLease,
       @Nonnull SlowPeerReports slowPeers,
       @Nonnull SlowDiskReports slowDisks,
-      BlocksStorageMovementResult[] blkMovementStatus) throws IOException {
+      BlocksStorageMoveAttemptFinished storageMovementFinishedBlks)
+          throws IOException {
     checkNNStartup();
     verifyRequest(nodeReg);
     return namesystem.handleHeartbeat(nodeReg, report,
         dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
         failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
         slowPeers, slowDisks,
-        blkMovementStatus);
+        storageMovementFinishedBlks);
   }
 
   @Override // DatanodeProtocol

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index a4372d5..a28a806 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.util.Time.monotonicNow;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -44,7 +46,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.util.Daemon;
@@ -82,25 +84,38 @@ public class StoragePolicySatisfier implements Runnable {
   /**
    * Represents the collective analysis status for all blocks.
    */
-  private enum BlocksMovingAnalysisStatus {
-    // Represents that, the analysis skipped due to some conditions. A such
-    // condition is if block collection is in incomplete state.
-    ANALYSIS_SKIPPED_FOR_RETRY,
-    // Represents that, all block storage movement needed blocks found its
-    // targets.
-    ALL_BLOCKS_TARGETS_PAIRED,
-    // Represents that, only fewer or none of the block storage movement needed
-    // block found its eligible targets.
-    FEW_BLOCKS_TARGETS_PAIRED,
-    // Represents that, none of the blocks found for block storage movements.
-    BLOCKS_ALREADY_SATISFIED,
-    // Represents that, the analysis skipped due to some conditions.
-    // Example conditions are if no blocks really exists in block collection or
-    // if analysis is not required on ec files with unsuitable storage policies
-    BLOCKS_TARGET_PAIRING_SKIPPED,
-    // Represents that, All the reported blocks are satisfied the policy but
-    // some of the blocks are low redundant.
-    FEW_LOW_REDUNDANCY_BLOCKS
+  private static class BlocksMovingAnalysis {
+
+    enum Status {
+      // Represents that, the analysis skipped due to some conditions. A such
+      // condition is if block collection is in incomplete state.
+      ANALYSIS_SKIPPED_FOR_RETRY,
+      // Represents that few or all blocks found respective target to do
+      // the storage movement.
+      BLOCKS_TARGETS_PAIRED,
+      // Represents that none of the blocks found respective target to do
+      // the storage movement.
+      NO_BLOCKS_TARGETS_PAIRED,
+      // Represents that, none of the blocks found for block storage movements.
+      BLOCKS_ALREADY_SATISFIED,
+      // Represents that, the analysis skipped due to some conditions.
+      // Example conditions are if no blocks really exists in block collection
+      // or
+      // if analysis is not required on ec files with unsuitable storage
+      // policies
+      BLOCKS_TARGET_PAIRING_SKIPPED,
+      // Represents that, All the reported blocks are satisfied the policy but
+      // some of the blocks are low redundant.
+      FEW_LOW_REDUNDANCY_BLOCKS
+    }
+
+    private Status status = null;
+    private List<Block> assignedBlocks = null;
+
+    BlocksMovingAnalysis(Status status, List<Block> blockMovingInfo) {
+      this.status = status;
+      this.assignedBlocks = blockMovingInfo;
+    }
   }
 
   public StoragePolicySatisfier(final Namesystem namesystem,
@@ -118,8 +133,7 @@ public class StoragePolicySatisfier implements Runnable {
         conf.getLong(
             DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
             DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT),
-        storageMovementNeeded,
-        this);
+        storageMovementNeeded);
     this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(conf);
   }
 
@@ -232,21 +246,25 @@ public class StoragePolicySatisfier implements Runnable {
                 namesystem.getBlockCollection(trackId);
             // Check blockCollectionId existence.
             if (blockCollection != null) {
-              BlocksMovingAnalysisStatus status =
+              BlocksMovingAnalysis status =
                   analyseBlocksStorageMovementsAndAssignToDN(blockCollection);
-              switch (status) {
+              switch (status.status) {
               // Just add to monitor, so it will be retried after timeout
               case ANALYSIS_SKIPPED_FOR_RETRY:
-                // Just add to monitor, so it will be tracked for result and
-                // be removed on successful storage movement result.
-              case ALL_BLOCKS_TARGETS_PAIRED:
-                this.storageMovementsMonitor.add(itemInfo, true);
+                // Just add to monitor, so it will be tracked for report and
+                // be removed on storage movement attempt finished report.
+              case BLOCKS_TARGETS_PAIRED:
+                this.storageMovementsMonitor.add(new AttemptedItemInfo(
+                    itemInfo.getStartId(), itemInfo.getTrackId(),
+                    monotonicNow(), status.assignedBlocks));
                 break;
-              // Add to monitor with allBlcoksAttemptedToSatisfy flag false, so
-              // that it will be tracked and still it will be consider for retry
-              // as analysis was not found targets for storage movement blocks.
-              case FEW_BLOCKS_TARGETS_PAIRED:
-                this.storageMovementsMonitor.add(itemInfo, false);
+              case NO_BLOCKS_TARGETS_PAIRED:
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Adding trackID " + trackId
+                      + " back to retry queue as none of the blocks"
+                      + " found its eligible targets.");
+                }
+                this.storageMovementNeeded.add(itemInfo);
                 break;
               case FEW_LOW_REDUNDANCY_BLOCKS:
                 if (LOG.isDebugEnabled()) {
@@ -310,10 +328,10 @@ public class StoragePolicySatisfier implements Runnable {
     return;
   }
 
-  private BlocksMovingAnalysisStatus analyseBlocksStorageMovementsAndAssignToDN(
+  private BlocksMovingAnalysis analyseBlocksStorageMovementsAndAssignToDN(
       BlockCollection blockCollection) {
-    BlocksMovingAnalysisStatus status =
-        BlocksMovingAnalysisStatus.BLOCKS_ALREADY_SATISFIED;
+    BlocksMovingAnalysis.Status status =
+        BlocksMovingAnalysis.Status.BLOCKS_ALREADY_SATISFIED;
     byte existingStoragePolicyID = blockCollection.getStoragePolicyID();
     BlockStoragePolicy existingStoragePolicy =
         blockManager.getStoragePolicy(existingStoragePolicyID);
@@ -322,17 +340,18 @@ public class StoragePolicySatisfier implements Runnable {
       // So, should we add back? or leave it to user
       LOG.info("BlockCollectionID: {} file is under construction. So, postpone"
           + " this to the next retry iteration", blockCollection.getId());
-      return BlocksMovingAnalysisStatus.ANALYSIS_SKIPPED_FOR_RETRY;
+      return new BlocksMovingAnalysis(
+          BlocksMovingAnalysis.Status.ANALYSIS_SKIPPED_FOR_RETRY,
+          new ArrayList<>());
     }
 
-    // First datanode will be chosen as the co-ordinator node for storage
-    // movements. Later this can be optimized if needed.
-    DatanodeDescriptor coordinatorNode = null;
     BlockInfo[] blocks = blockCollection.getBlocks();
     if (blocks.length == 0) {
       LOG.info("BlockCollectionID: {} file is not having any blocks."
           + " So, skipping the analysis.", blockCollection.getId());
-      return BlocksMovingAnalysisStatus.BLOCKS_TARGET_PAIRING_SKIPPED;
+      return new BlocksMovingAnalysis(
+          BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
+          new ArrayList<>());
     }
     List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
 
@@ -352,7 +371,9 @@ public class StoragePolicySatisfier implements Runnable {
           LOG.warn("The storage policy " + existingStoragePolicy.getName()
               + " is not suitable for Striped EC files. "
               + "So, ignoring to move the blocks");
-          return BlocksMovingAnalysisStatus.BLOCKS_TARGET_PAIRING_SKIPPED;
+          return new BlocksMovingAnalysis(
+              BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
+              new ArrayList<>());
         }
       } else {
         expectedStorageTypes = existingStoragePolicy
@@ -370,30 +391,35 @@ public class StoragePolicySatisfier implements Runnable {
           new LinkedList<StorageType>(Arrays.asList(storageTypes));
       if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
           existing, true)) {
-        boolean computeStatus = computeBlockMovingInfos(blockMovingInfos,
+        boolean blocksPaired = computeBlockMovingInfos(blockMovingInfos,
             blockInfo, expectedStorageTypes, existing, storages);
-        if (computeStatus
-            && status != BlocksMovingAnalysisStatus.FEW_BLOCKS_TARGETS_PAIRED
-            && !blockManager.hasLowRedundancyBlocks(blockCollection)) {
-          status = BlocksMovingAnalysisStatus.ALL_BLOCKS_TARGETS_PAIRED;
+        if (blocksPaired) {
+          status = BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED;
         } else {
-          status = BlocksMovingAnalysisStatus.FEW_BLOCKS_TARGETS_PAIRED;
+          // none of the blocks found its eligible targets for satisfying the
+          // storage policy.
+          status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED;
         }
       } else {
         if (blockManager.hasLowRedundancyBlocks(blockCollection)) {
-          status = BlocksMovingAnalysisStatus.FEW_LOW_REDUNDANCY_BLOCKS;
+          status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
         }
       }
     }
 
-    assignBlockMovingInfosToCoordinatorDn(blockCollection.getId(),
-        blockMovingInfos, coordinatorNode);
-    int count = 0;
+    List<Block> assignedBlockIds = new ArrayList<Block>();
     for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
-      count = count + blkMovingInfo.getSources().length;
+      // Check for at least one block storage movement has been chosen
+      if (blkMovingInfo.getTarget() != null) {
+        // assign block storage movement task to the target node
+        ((DatanodeDescriptor) blkMovingInfo.getTarget())
+            .addBlocksToMoveStorage(blkMovingInfo);
+        LOG.debug("BlockMovingInfo: {}", blkMovingInfo);
+        assignedBlockIds.add(blkMovingInfo.getBlock());
+        blockCount++;
+      }
     }
-    blockCount = blockCount + count;
-    return status;
+    return new BlocksMovingAnalysis(status, assignedBlockIds);
   }
 
   /**
@@ -468,41 +494,6 @@ public class StoragePolicySatisfier implements Runnable {
     return foundMatchingTargetNodesForBlock;
   }
 
-  private void assignBlockMovingInfosToCoordinatorDn(long blockCollectionID,
-      List<BlockMovingInfo> blockMovingInfos,
-      DatanodeDescriptor coordinatorNode) {
-
-    if (blockMovingInfos.size() < 1) {
-      // TODO: Major: handle this case. I think we need retry cases to
-      // be implemented. Idea is, if some files are not getting storage movement
-      // chances, then we can just retry limited number of times and exit.
-      return;
-    }
-
-    // For now, first datanode will be chosen as the co-ordinator. Later
-    // this can be optimized if needed.
-    coordinatorNode = (DatanodeDescriptor) blockMovingInfos.get(0)
-        .getSources()[0];
-
-    boolean needBlockStorageMovement = false;
-    for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
-      // Check for atleast one block storage movement has been chosen
-      if (blkMovingInfo.getTargets().length > 0){
-        needBlockStorageMovement = true;
-        break;
-      }
-    }
-    if (!needBlockStorageMovement) {
-      // Simply return as there is no targets selected for scheduling the block
-      // movement.
-      return;
-    }
-
-    // 'BlockCollectionId' is used as the tracking ID. All the blocks under this
-    // blockCollectionID will be added to this datanode.
-    coordinatorNode.addBlocksToMoveStorage(blockCollectionID, blockMovingInfos);
-  }
-
   /**
    * Find the good target node for each source node for which block storages was
    * misplaced.
@@ -526,10 +517,6 @@ public class StoragePolicySatisfier implements Runnable {
       List<StorageType> expected,
       StorageTypeNodeMap locsForExpectedStorageTypes) {
     boolean foundMatchingTargetNodesForBlock = true;
-    List<DatanodeInfo> sourceNodes = new ArrayList<>();
-    List<StorageType> sourceStorageTypes = new ArrayList<>();
-    List<DatanodeInfo> targetNodes = new ArrayList<>();
-    List<StorageType> targetStorageTypes = new ArrayList<>();
     List<DatanodeDescriptor> excludeNodes = new ArrayList<>();
 
     // Looping over all the source node locations and choose the target
@@ -544,10 +531,15 @@ public class StoragePolicySatisfier implements Runnable {
         StorageTypeNodePair chosenTarget = chooseTargetTypeInSameNode(
             blockInfo, existingTypeNodePair.dn, expected);
         if (chosenTarget != null) {
-          sourceNodes.add(existingTypeNodePair.dn);
-          sourceStorageTypes.add(existingTypeNodePair.storageType);
-          targetNodes.add(chosenTarget.dn);
-          targetStorageTypes.add(chosenTarget.storageType);
+          if (blockInfo.isStriped()) {
+            buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
+                existingTypeNodePair.storageType, chosenTarget.dn,
+                chosenTarget.storageType, blockMovingInfos);
+          } else {
+            buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
+                existingTypeNodePair.storageType, chosenTarget.dn,
+                chosenTarget.storageType, blockMovingInfos);
+          }
           expected.remove(chosenTarget.storageType);
           // TODO: We can increment scheduled block count for this node?
         }
@@ -563,7 +555,7 @@ public class StoragePolicySatisfier implements Runnable {
       StorageTypeNodePair chosenTarget = null;
       // Chosen the target storage within same datanode. So just skipping this
       // source node.
-      if (sourceNodes.contains(existingTypeNodePair.dn)) {
+      if (checkIfAlreadyChosen(blockMovingInfos, existingTypeNodePair.dn)) {
         continue;
       }
       if (chosenTarget == null && blockManager.getDatanodeManager()
@@ -586,10 +578,16 @@ public class StoragePolicySatisfier implements Runnable {
                 Matcher.ANY_OTHER, locsForExpectedStorageTypes, excludeNodes);
       }
       if (null != chosenTarget) {
-        sourceNodes.add(existingTypeNodePair.dn);
-        sourceStorageTypes.add(existingTypeNodePair.storageType);
-        targetNodes.add(chosenTarget.dn);
-        targetStorageTypes.add(chosenTarget.storageType);
+        if (blockInfo.isStriped()) {
+          buildStripedBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
+              existingTypeNodePair.storageType, chosenTarget.dn,
+              chosenTarget.storageType, blockMovingInfos);
+        } else {
+          buildContinuousBlockMovingInfos(blockInfo, existingTypeNodePair.dn,
+              existingTypeNodePair.storageType, chosenTarget.dn,
+              chosenTarget.storageType, blockMovingInfos);
+        }
+
         expected.remove(chosenTarget.storageType);
         excludeNodes.add(chosenTarget.dn);
         // TODO: We can increment scheduled block count for this node?
@@ -605,47 +603,33 @@ public class StoragePolicySatisfier implements Runnable {
       foundMatchingTargetNodesForBlock = false;
     }
 
-    blockMovingInfos.addAll(getBlockMovingInfos(blockInfo, sourceNodes,
-        sourceStorageTypes, targetNodes, targetStorageTypes));
     return foundMatchingTargetNodesForBlock;
   }
 
-  private List<BlockMovingInfo> getBlockMovingInfos(BlockInfo blockInfo,
-      List<DatanodeInfo> sourceNodes, List<StorageType> sourceStorageTypes,
-      List<DatanodeInfo> targetNodes, List<StorageType> targetStorageTypes) {
-    List<BlockMovingInfo> blkMovingInfos = new ArrayList<>();
-    // No source-target node pair exists.
-    if (sourceNodes.size() <= 0) {
-      return blkMovingInfos;
-    }
-
-    if (blockInfo.isStriped()) {
-      buildStripedBlockMovingInfos(blockInfo, sourceNodes, sourceStorageTypes,
-          targetNodes, targetStorageTypes, blkMovingInfos);
-    } else {
-      buildContinuousBlockMovingInfos(blockInfo, sourceNodes,
-          sourceStorageTypes, targetNodes, targetStorageTypes, blkMovingInfos);
+  private boolean checkIfAlreadyChosen(List<BlockMovingInfo> blockMovingInfos,
+      DatanodeDescriptor dn) {
+    for (BlockMovingInfo blockMovingInfo : blockMovingInfos) {
+      if (blockMovingInfo.getSource().equals(dn)) {
+        return true;
+      }
     }
-    return blkMovingInfos;
+    return false;
   }
 
   private void buildContinuousBlockMovingInfos(BlockInfo blockInfo,
-      List<DatanodeInfo> sourceNodes, List<StorageType> sourceStorageTypes,
-      List<DatanodeInfo> targetNodes, List<StorageType> targetStorageTypes,
+      DatanodeInfo sourceNode, StorageType sourceStorageType,
+      DatanodeInfo targetNode, StorageType targetStorageType,
       List<BlockMovingInfo> blkMovingInfos) {
     Block blk = new Block(blockInfo.getBlockId(), blockInfo.getNumBytes(),
         blockInfo.getGenerationStamp());
-    BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk,
-        sourceNodes.toArray(new DatanodeInfo[sourceNodes.size()]),
-        targetNodes.toArray(new DatanodeInfo[targetNodes.size()]),
-        sourceStorageTypes.toArray(new StorageType[sourceStorageTypes.size()]),
-        targetStorageTypes.toArray(new StorageType[targetStorageTypes.size()]));
+    BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode,
+        targetNode, sourceStorageType, targetStorageType);
     blkMovingInfos.add(blkMovingInfo);
   }
 
   private void buildStripedBlockMovingInfos(BlockInfo blockInfo,
-      List<DatanodeInfo> sourceNodes, List<StorageType> sourceStorageTypes,
-      List<DatanodeInfo> targetNodes, List<StorageType> targetStorageTypes,
+      DatanodeInfo sourceNode, StorageType sourceStorageType,
+      DatanodeInfo targetNode, StorageType targetStorageType,
       List<BlockMovingInfo> blkMovingInfos) {
     // For a striped block, it needs to construct internal block at the given
     // index of a block group. Here it is iterating over all the block indices
@@ -655,30 +639,17 @@ public class StoragePolicySatisfier implements Runnable {
     for (StorageAndBlockIndex si : sBlockInfo.getStorageAndIndexInfos()) {
       if (si.getBlockIndex() >= 0) {
         DatanodeDescriptor dn = si.getStorage().getDatanodeDescriptor();
-        DatanodeInfo[] srcNode = new DatanodeInfo[1];
-        StorageType[] srcStorageType = new StorageType[1];
-        DatanodeInfo[] targetNode = new DatanodeInfo[1];
-        StorageType[] targetStorageType = new StorageType[1];
-        for (int i = 0; i < sourceNodes.size(); i++) {
-          DatanodeInfo node = sourceNodes.get(i);
-          if (node.equals(dn)) {
-            srcNode[0] = node;
-            srcStorageType[0] = sourceStorageTypes.get(i);
-            targetNode[0] = targetNodes.get(i);
-            targetStorageType[0] = targetStorageTypes.get(i);
-
-            // construct internal block
-            long blockId = blockInfo.getBlockId() + si.getBlockIndex();
-            long numBytes = StripedBlockUtil.getInternalBlockLength(
-                sBlockInfo.getNumBytes(), sBlockInfo.getCellSize(),
-                sBlockInfo.getDataBlockNum(), si.getBlockIndex());
-            Block blk = new Block(blockId, numBytes,
-                blockInfo.getGenerationStamp());
-            BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, srcNode,
-                targetNode, srcStorageType, targetStorageType);
-            blkMovingInfos.add(blkMovingInfo);
-            break; // found matching source-target nodes
-          }
+        if (sourceNode.equals(dn)) {
+          // construct internal block
+          long blockId = blockInfo.getBlockId() + si.getBlockIndex();
+          long numBytes = StripedBlockUtil.getInternalBlockLength(
+              sBlockInfo.getNumBytes(), sBlockInfo.getCellSize(),
+              sBlockInfo.getDataBlockNum(), si.getBlockIndex());
+          Block blk = new Block(blockId, numBytes,
+              blockInfo.getGenerationStamp());
+          BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, sourceNode,
+              targetNode, sourceStorageType, targetStorageType);
+          blkMovingInfos.add(blkMovingInfo);
         }
       }
     }
@@ -817,18 +788,18 @@ public class StoragePolicySatisfier implements Runnable {
   }
 
   /**
-   * Receives the movement results of collection of blocks associated to a
-   * trackId.
+   * Receives set of storage movement attempt finished blocks report.
    *
-   * @param blksMovementResults
-   *          movement status of the set of blocks associated to a trackId.
+   * @param moveAttemptFinishedBlks
+   *          set of storage movement attempt finished blocks.
    */
-  void handleBlocksStorageMovementResults(
-      BlocksStorageMovementResult[] blksMovementResults) {
-    if (blksMovementResults.length <= 0) {
+  void handleStorageMovementAttemptFinishedBlks(
+      BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks) {
+    if (moveAttemptFinishedBlks.getBlocks().length <= 0) {
       return;
     }
-    storageMovementsMonitor.addResults(blksMovementResults);
+    storageMovementsMonitor
+        .addReportedMovedBlocks(moveAttemptFinishedBlks.getBlocks());
   }
 
   @VisibleForTesting
@@ -906,4 +877,52 @@ public class StoragePolicySatisfier implements Runnable {
       return (startId != trackId);
     }
   }
+
+  /**
+   * This class contains information of an attempted blocks and its last
+   * attempted or reported time stamp. This is used by
+   * {@link BlockStorageMovementAttemptedItems#storageMovementAttemptedItems}.
+   */
+  final static class AttemptedItemInfo extends ItemInfo {
+    private long lastAttemptedOrReportedTime;
+    private final List<Block> blocks;
+
+    /**
+     * AttemptedItemInfo constructor.
+     *
+     * @param rootId
+     *          rootId for trackId
+     * @param trackId
+     *          trackId for file.
+     * @param lastAttemptedOrReportedTime
+     *          last attempted or reported time
+     */
+    AttemptedItemInfo(long rootId, long trackId,
+        long lastAttemptedOrReportedTime,
+        List<Block> blocks) {
+      super(rootId, trackId);
+      this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime;
+      this.blocks = blocks;
+    }
+
+    /**
+     * @return last attempted or reported time stamp.
+     */
+    long getLastAttemptedOrReportedTime() {
+      return lastAttemptedOrReportedTime;
+    }
+
+    /**
+     * Update lastAttemptedOrReportedTime, so that the expiration time will be
+     * postponed to future.
+     */
+    void touchLastReportedTimeStamp() {
+      this.lastAttemptedOrReportedTime = monotonicNow();
+    }
+
+    List<Block> getBlocks() {
+      return this.blocks;
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
index 5dcf4e7..e90317d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.protocol;
 
-import java.util.Arrays;
 import java.util.Collection;
 
 import org.apache.hadoop.fs.StorageType;
@@ -29,22 +28,15 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  * given set of blocks to specified target DataNodes to fulfill the block
  * storage policy.
  *
- * Upon receiving this command, this DataNode coordinates all the block movement
- * by passing the details to
+ * Upon receiving this command, this DataNode pass the array of block movement
+ * details to
  * {@link org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker}
- * service. After the block movement this DataNode sends response back to the
- * NameNode about the movement status.
- *
- * The coordinator datanode will use 'trackId' identifier to coordinate the
- * block movement of the given set of blocks. TrackId is a unique identifier
- * that represents a group of blocks. Namenode will generate this unique value
- * and send it to the coordinator datanode along with the
- * BlockStorageMovementCommand. Datanode will monitor the completion of the
- * block movements that grouped under this trackId and notifies Namenode about
- * the completion status.
+ * service. Later, StoragePolicySatisfyWorker will schedule block movement tasks
+ * for these blocks and monitors the completion of each task. After the block
+ * movement attempt is finished(with success or failure) this DataNode will send
+ * response back to NameNode about the block movement attempt finished details.
  */
 public class BlockStorageMovementCommand extends DatanodeCommand {
-  private final long trackID;
   private final String blockPoolId;
   private final Collection<BlockMovingInfo> blockMovingTasks;
 
@@ -53,30 +45,17 @@ public class BlockStorageMovementCommand extends DatanodeCommand {
    *
    * @param action
    *          protocol specific action
-   * @param trackID
-   *          unique identifier to monitor the given set of block movements
-   * @param blockPoolId
-   *          block pool ID
    * @param blockMovingInfos
    *          block to storage info that will be used for movement
    */
-  public BlockStorageMovementCommand(int action, long trackID,
-      String blockPoolId, Collection<BlockMovingInfo> blockMovingInfos) {
+  public BlockStorageMovementCommand(int action, String blockPoolId,
+      Collection<BlockMovingInfo> blockMovingInfos) {
     super(action);
-    this.trackID = trackID;
     this.blockPoolId = blockPoolId;
     this.blockMovingTasks = blockMovingInfos;
   }
 
   /**
-   * Returns trackID, which will be used to monitor the block movement assigned
-   * to this coordinator datanode.
-   */
-  public long getTrackID() {
-    return trackID;
-  }
-
-  /**
    * Returns block pool ID.
    */
   public String getBlockPoolId() {
@@ -95,33 +74,29 @@ public class BlockStorageMovementCommand extends DatanodeCommand {
    */
   public static class BlockMovingInfo {
     private Block blk;
-    private DatanodeInfo[] sourceNodes;
-    private DatanodeInfo[] targetNodes;
-    private StorageType[] sourceStorageTypes;
-    private StorageType[] targetStorageTypes;
+    private DatanodeInfo sourceNode;
+    private DatanodeInfo targetNode;
+    private StorageType sourceStorageType;
+    private StorageType targetStorageType;
 
     /**
      * Block to storage info constructor.
      *
      * @param block
-     *          block
-     * @param sourceDnInfos
-     *          node that can be the sources of a block move
-     * @param targetDnInfos
-     *          target datanode info
-     * @param srcStorageTypes
+     *          block info
+     * @param sourceDnInfo
+     *          node that can be the source of a block move
+     * @param srcStorageType
      *          type of source storage media
-     * @param targetStorageTypes
-     *          type of destin storage media
      */
-    public BlockMovingInfo(Block block,
-        DatanodeInfo[] sourceDnInfos, DatanodeInfo[] targetDnInfos,
-        StorageType[] srcStorageTypes, StorageType[] targetStorageTypes) {
+    public BlockMovingInfo(Block block, DatanodeInfo sourceDnInfo,
+        DatanodeInfo targetDnInfo, StorageType srcStorageType,
+        StorageType targetStorageType) {
       this.blk = block;
-      this.sourceNodes = sourceDnInfos;
-      this.targetNodes = targetDnInfos;
-      this.sourceStorageTypes = srcStorageTypes;
-      this.targetStorageTypes = targetStorageTypes;
+      this.sourceNode = sourceDnInfo;
+      this.targetNode = targetDnInfo;
+      this.sourceStorageType = srcStorageType;
+      this.targetStorageType = targetStorageType;
     }
 
     public void addBlock(Block block) {
@@ -129,35 +104,33 @@ public class BlockStorageMovementCommand extends DatanodeCommand {
     }
 
     public Block getBlock() {
-      return this.blk;
+      return blk;
     }
 
-    public DatanodeInfo[] getSources() {
-      return sourceNodes;
+    public DatanodeInfo getSource() {
+      return sourceNode;
     }
 
-    public DatanodeInfo[] getTargets() {
-      return targetNodes;
+    public DatanodeInfo getTarget() {
+      return targetNode;
     }
 
-    public StorageType[] getTargetStorageTypes() {
-      return targetStorageTypes;
+    public StorageType getTargetStorageType() {
+      return targetStorageType;
     }
 
-    public StorageType[] getSourceStorageTypes() {
-      return sourceStorageTypes;
+    public StorageType getSourceStorageType() {
+      return sourceStorageType;
     }
 
     @Override
     public String toString() {
       return new StringBuilder().append("BlockMovingInfo(\n  ")
           .append("Moving block: ").append(blk).append(" From: ")
-          .append(Arrays.asList(sourceNodes)).append(" To: [")
-          .append(Arrays.asList(targetNodes)).append("\n  ")
-          .append(" sourceStorageTypes: ")
-          .append(Arrays.toString(sourceStorageTypes))
-          .append(" targetStorageTypes: ")
-          .append(Arrays.toString(targetStorageTypes)).append(")").toString();
+          .append(sourceNode).append(" To: [").append(targetNode).append("\n  ")
+          .append(" sourceStorageType: ").append(sourceStorageType)
+          .append(" targetStorageType: ").append(targetStorageType).append(")")
+          .toString();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMoveAttemptFinished.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMoveAttemptFinished.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMoveAttemptFinished.java
new file mode 100644
index 0000000..c837e013
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMoveAttemptFinished.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+
+/**
+ * This class represents, the blocks for which storage movements has done by
+ * datanodes. The movementFinishedBlocks array contains all the blocks that are
+ * attempted to do the movement and it could be finished with either success or
+ * failure.
+ */
+public class BlocksStorageMoveAttemptFinished {
+
+  private final Block[] movementFinishedBlocks;
+
+  public BlocksStorageMoveAttemptFinished(Block[] moveAttemptFinishedBlocks) {
+    this.movementFinishedBlocks = moveAttemptFinishedBlocks;
+  }
+
+  public Block[] getBlocks() {
+    return movementFinishedBlocks;
+  }
+
+  @Override
+  public String toString() {
+    return new StringBuilder().append("BlocksStorageMovementFinished(\n  ")
+        .append("  blockID: ").append(Arrays.toString(movementFinishedBlocks))
+        .append(")").toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
deleted file mode 100644
index 7f749ec4..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksStorageMovementResult.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.protocol;
-
-/**
- * This class represents, movement status of a set of blocks associated to a
- * track Id.
- */
-public class BlocksStorageMovementResult {
-
-  private final long trackId;
-  private final Status status;
-
-  /**
-   * SUCCESS - If all the blocks associated to track id has moved successfully
-   * or maximum possible movements done.
-   *
-   * <p>
-   * FAILURE - If any of its(trackId) blocks movement failed and requires to
-   * retry these failed blocks movements. Example selected target node is no
-   * more running or no space. So, retrying by selecting new target node might
-   * work.
-   *
-   * <p>
-   * IN_PROGRESS - If all or some of the blocks associated to track id are
-   * still moving.
-   */
-  public enum Status {
-    SUCCESS, FAILURE, IN_PROGRESS;
-  }
-
-  /**
-   * BlocksStorageMovementResult constructor.
-   *
-   * @param trackId
-   *          tracking identifier
-   * @param status
-   *          block movement status
-   */
-  public BlocksStorageMovementResult(long trackId, Status status) {
-    this.trackId = trackId;
-    this.status = status;
-  }
-
-  public long getTrackId() {
-    return trackId;
-  }
-
-  public Status getStatus() {
-    return status;
-  }
-
-  @Override
-  public String toString() {
-    return new StringBuilder().append("BlocksStorageMovementResult(\n  ")
-        .append("track id: ").append(trackId).append("  status: ")
-        .append(status).append(")").toString();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index 5e1f148..fcc2df1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -112,8 +112,7 @@ public interface DatanodeProtocol {
    * @param slowPeers Details of peer DataNodes that were detected as being
    *                  slow to respond to packet writes. Empty report if no
    *                  slow peers were detected by the DataNode.
-   * @param blksMovementResults array of movement status of a set of blocks
-   *                            associated to a trackId.
+   * @param storageMovFinishedBlks array of movement attempt finished blocks
    * @throws IOException on error
    */
   @Idempotent
@@ -128,7 +127,7 @@ public interface DatanodeProtocol {
                                        boolean requestFullBlockReportLease,
                                        @Nonnull SlowPeerReports slowPeers,
                                        @Nonnull SlowDiskReports slowDisks,
-                                       BlocksStorageMovementResult[] blksMovementResults)
+                                       BlocksStorageMoveAttemptFinished storageMovFinishedBlks)
       throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 080f7fa..7c35494 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -162,9 +162,8 @@ message BlockECReconstructionCommandProto {
  * Block storage movement command
  */
 message BlockStorageMovementCommandProto {
-  required uint64 trackID = 1;
-  required string blockPoolId = 2;
-  repeated BlockStorageMovementProto blockStorageMovement = 3;
+  required string blockPoolId = 1;
+  repeated BlockMovingInfoProto blockMovingInfo = 2;
 }
 
 /**
@@ -177,25 +176,20 @@ message DropSPSWorkCommandProto {
 /**
  * Block storage movement information
  */
-message BlockStorageMovementProto {
+message BlockMovingInfoProto {
   required BlockProto block = 1;
-  required DatanodeInfosProto sourceDnInfos = 2;
-  required DatanodeInfosProto targetDnInfos = 3;
-  required StorageTypesProto sourceStorageTypes = 4;
-  required StorageTypesProto targetStorageTypes = 5;
+  required DatanodeInfoProto sourceDnInfo = 2;
+  required DatanodeInfoProto targetDnInfo = 3;
+  required StorageTypeProto sourceStorageType = 4;
+  required StorageTypeProto targetStorageType = 5;
 }
 
 /**
- * Movement status of the set of blocks associated to a trackId.
+ * Blocks for which storage movements has been attempted and finished
+ * with either success or failure.
  */
-message BlocksStorageMovementResultProto {
-  enum Status {
-    SUCCESS = 1; // block movement succeeded
-    FAILURE = 2; // block movement failed and needs to retry
-    IN_PROGRESS = 3; // block movement is still in progress
-  }
-  required uint64 trackID = 1;
-  required Status status = 2;
+message BlocksStorageMoveAttemptFinishedProto {
+  repeated BlockProto blocks = 1;
 }
 
 /**
@@ -255,7 +249,7 @@ message HeartbeatRequestProto {
   optional bool requestFullBlockReportLease = 9 [ default = false ];
   repeated SlowPeerReportProto slowPeers = 10;
   repeated SlowDiskReportProto slowDisks = 11;
-  repeated BlocksStorageMovementResultProto blksMovementResults = 12;
+  optional BlocksStorageMoveAttemptFinishedProto storageMoveAttemptFinishedBlks = 12;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 2653f11..d8739be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4315,24 +4315,35 @@
 
 <property>
   <name>dfs.storage.policy.satisfier.recheck.timeout.millis</name>
-  <value>300000</value>
+  <value>60000</value>
   <description>
     Blocks storage movements monitor re-check interval in milliseconds.
     This check will verify whether any blocks storage movement results arrived from DN
     and also verify if any of file blocks movements not at all reported to DN
     since dfs.storage.policy.satisfier.self.retry.timeout.
-    The default value is 5 * 60 * 1000 (5 mins)
+    The default value is 1 * 60 * 1000 (1 mins)
   </description>
 </property>
 
 <property>
   <name>dfs.storage.policy.satisfier.self.retry.timeout.millis</name>
-  <value>1800000</value>
+  <value>300000</value>
   <description>
-    If any of file related block movements not at all reported by coordinator datanode,
+    If any of file related block movements not at all reported by datanode,
     then after this timeout(in milliseconds), the item will be added back to movement needed list
     at namenode which will be retried for block movements.
-    The default value is 30 * 60 * 1000 (30 mins)
+    The default value is 5 * 60 * 1000 (5 mins)
+  </description>
+</property>
+
+<property>
+  <name>dfs.storage.policy.satisfier.low.max-streams.preference</name>
+  <value>false</value>
+  <description>
+    If true, blocks to move tasks will share equal ratio of number of highest-priority
+    replication streams (dfs.namenode.replication.max-streams) with pending replica and
+    erasure-coded reconstruction tasks. If false, blocks to move tasks will only use
+    the delta number of replication streams. The default value is false.
   </description>
 </property>
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
index da61842..f69a3ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
@@ -104,7 +104,7 @@ Following 2 options will allow users to move the blocks based on new policy set.
 ### <u>S</u>torage <u>P</u>olicy <u>S</u>atisfier (SPS)
 
 When user changes the storage policy on a file/directory, user can call `HdfsAdmin` API `satisfyStoragePolicy()` to move the blocks as per the new policy set.
-The SPS daemon thread runs along with namenode and periodically scans for the storage mismatches between new policy set and the physical blocks placed. This will only track the files/directories for which user invoked satisfyStoragePolicy. If SPS identifies some blocks to be moved for a file, then it will schedule block movement tasks to datanodes. A Coordinator DataNode(C-DN) will track all block movements associated to a file and notify to namenode about movement success/failure. If there are any failures in movement, the SPS will re-attempt by sending new block movement task.
+The SPS daemon thread runs along with namenode and periodically scans for the storage mismatches between new policy set and the physical blocks placed. This will only track the files/directories for which user invoked satisfyStoragePolicy. If SPS identifies some blocks to be moved for a file, then it will schedule block movement tasks to datanodes. If there are any failures in movement, the SPS will re-attempt by sending new block movement tasks.
 
 SPS can be enabled and disabled dynamically without restarting the Namenode.
 
@@ -127,10 +127,10 @@ Detailed design documentation can be found at [Storage Policy Satisfier(SPS) (HD
    enabled and vice versa.
 
 *   **dfs.storage.policy.satisfier.recheck.timeout.millis** - A timeout to re-check the processed block storage movement
-   command results from Co-ordinator Datanode.
+   command results from Datanodes.
 
 *   **dfs.storage.policy.satisfier.self.retry.timeout.millis** - A timeout to retry if no block movement results reported from
-   Co-ordinator Datanode in this configured timeout.
+   Datanode in this configured timeout.
 
 ### Mover - A New Data Migration Tool
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index 2d58732..fc6214e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
@@ -117,7 +117,8 @@ public class TestNameNodePrunesMissingStorages {
       cluster.stopDataNode(0);
       cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0,
           0, null, true, SlowPeerReports.EMPTY_REPORT,
-          SlowDiskReports.EMPTY_REPORT, new BlocksStorageMovementResult[0]);
+          SlowDiskReports.EMPTY_REPORT,
+          new BlocksStorageMoveAttemptFinished(null));
 
       // Check that the missing storage was pruned.
       assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
index 33cf391..cee0410 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@@ -142,7 +142,7 @@ public class InternalDataNodeTestUtils {
             Mockito.anyBoolean(),
             Mockito.any(SlowPeerReports.class),
             Mockito.any(SlowDiskReports.class),
-            Mockito.any(BlocksStorageMovementResult[].class))).thenReturn(
+            Mockito.any(BlocksStorageMoveAttemptFinished.class))).thenReturn(
         new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat(
             HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current()
             .nextLong() | 1L));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 524b8b9..4c75ff8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -160,7 +160,7 @@ public class TestBPOfferService {
           Mockito.anyBoolean(),
           Mockito.any(SlowPeerReports.class),
           Mockito.any(SlowDiskReports.class),
-          Mockito.any(BlocksStorageMovementResult[].class));
+          Mockito.any(BlocksStorageMoveAttemptFinished.class));
     mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
     datanodeCommands[nnIdx] = new DatanodeCommand[0];
     return mock;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 02e8dc7..5df5997 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -83,7 +83,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -223,7 +223,7 @@ public class TestBlockRecovery {
             Mockito.anyBoolean(),
             Mockito.any(SlowPeerReports.class),
             Mockito.any(SlowDiskReports.class),
-            Mockito.any(BlocksStorageMovementResult[].class)))
+            Mockito.any(BlocksStorageMoveAttemptFinished.class)))
         .thenReturn(new HeartbeatResponse(
             new DatanodeCommand[0],
             new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
index b15b530..0dd15c3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java
@@ -50,7 +50,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
@@ -174,7 +174,7 @@ public class TestDataNodeLifeline {
             anyBoolean(),
             any(SlowPeerReports.class),
             any(SlowDiskReports.class),
-            any(BlocksStorageMovementResult[].class));
+            any(BlocksStorageMoveAttemptFinished.class));
 
     // Intercept lifeline to trigger latch count-down on each call.
     doAnswer(new LatchCountingAnswer<Void>(lifelinesSent))
@@ -240,7 +240,7 @@ public class TestDataNodeLifeline {
             anyBoolean(),
             any(SlowPeerReports.class),
             any(SlowDiskReports.class),
-            any(BlocksStorageMovementResult[].class));
+            any(BlocksStorageMoveAttemptFinished.class));
 
     // While waiting on the latch for the expected number of heartbeat messages,
     // poll DataNode tracking information.  We expect that the DataNode always

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
index d7ac3f9..d47da69 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
@@ -44,7 +44,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@@ -224,7 +224,7 @@ public class TestDatanodeProtocolRetryPolicy {
            Mockito.anyBoolean(),
            Mockito.any(SlowPeerReports.class),
            Mockito.any(SlowDiskReports.class),
-           Mockito.any(BlocksStorageMovementResult[].class));
+           Mockito.any(BlocksStorageMoveAttemptFinished.class));
 
     dn = new DataNode(conf, locations, null, null) {
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2666d51f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
index b9f21a0..3732b2e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
@@ -66,7 +66,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.Page
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
+import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -210,7 +210,7 @@ public class TestFsDatasetCache {
           anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
           anyBoolean(), any(SlowPeerReports.class),
           any(SlowDiskReports.class),
-          (BlocksStorageMovementResult[]) any());
+          any(BlocksStorageMoveAttemptFinished.class));
     } finally {
       lock.writeLock().unlock();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message