hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vinayakum...@apache.org
Subject [06/50] hadoop git commit: HDFS-7621. Erasure Coding: update the Balancer/Mover data migration logic. Contributed by Walter Su.
Date Fri, 14 Aug 2015 10:54:20 GMT
HDFS-7621. Erasure Coding: update the Balancer/Mover data migration logic. Contributed by Walter
Su.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7af2a421
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7af2a421
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7af2a421

Branch: refs/heads/HDFS-7285-REBASE
Commit: 7af2a4212a4d7382142d01dcae1f9908ad984be8
Parents: d806987
Author: Zhe Zhang <zhezhang@cloudera.com>
Authored: Wed Jun 3 11:51:58 2015 -0700
Committer: Vinayakumar B <vinayakumarb@apache.org>
Committed: Thu Aug 13 17:04:20 2015 +0530

----------------------------------------------------------------------
 .../src/main/proto/hdfs.proto                   |   3 +
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   3 +
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java |  21 ++-
 .../hadoop/hdfs/server/balancer/Dispatcher.java | 144 +++++++++++++------
 .../server/blockmanagement/BlockManager.java    |  26 +++-
 .../apache/hadoop/hdfs/server/mover/Mover.java  |  38 +++--
 .../server/protocol/BlocksWithLocations.java    |  25 ++++
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  15 ++
 .../hadoop/hdfs/protocolPB/TestPBHelper.java    |  51 ++++---
 .../hdfs/server/balancer/TestBalancer.java      |  76 ++++++++++
 .../hadoop/hdfs/server/mover/TestMover.java     | 123 +++++++++++++++-
 11 files changed, 449 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7af2a421/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
index f64cf8f..e6db596 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
@@ -532,6 +532,9 @@ message BlockWithLocationsProto {
   repeated string datanodeUuids = 2; // Datanodes with replicas of the block
   repeated string storageUuids = 3;  // Storages with replicas of the block
   repeated StorageTypeProto storageTypes = 4;
+
+  optional bytes indices = 5;
+  optional uint32 dataBlockNum = 6;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7af2a421/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 278f897..511ebec 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -277,3 +277,6 @@
 
     HDFS-8453. Erasure coding: properly handle start offset for internal blocks
     in a block group. (Zhe Zhang via jing9)
+
+    HDFS-7621. Erasure Coding: update the Balancer/Mover data migration logic.
+    (Walter Su via zhz)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7af2a421/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 0d7d0cc..54e1df9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -211,6 +211,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlo
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -458,22 +459,32 @@ public class PBHelper {
   }
 
   public static BlockWithLocationsProto convert(BlockWithLocations blk) {
-    return BlockWithLocationsProto.newBuilder()
-        .setBlock(convert(blk.getBlock()))
+    BlockWithLocationsProto.Builder builder = BlockWithLocationsProto
+        .newBuilder().setBlock(convert(blk.getBlock()))
         .addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids()))
         .addAllStorageUuids(Arrays.asList(blk.getStorageIDs()))
-        .addAllStorageTypes(convertStorageTypes(blk.getStorageTypes()))
-        .build();
+        .addAllStorageTypes(convertStorageTypes(blk.getStorageTypes()));
+    if (blk instanceof StripedBlockWithLocations) {
+      StripedBlockWithLocations sblk = (StripedBlockWithLocations) blk;
+      builder.setIndices(getByteString(sblk.getIndices()));
+      builder.setDataBlockNum(sblk.getDataBlockNum());
+    }
+    return builder.build();
   }
 
   public static BlockWithLocations convert(BlockWithLocationsProto b) {
     final List<String> datanodeUuids = b.getDatanodeUuidsList();
     final List<String> storageUuids = b.getStorageUuidsList();
     final List<StorageTypeProto> storageTypes = b.getStorageTypesList();
-    return new BlockWithLocations(convert(b.getBlock()),
+    BlockWithLocations blk = new BlockWithLocations(convert(b.getBlock()),
         datanodeUuids.toArray(new String[datanodeUuids.size()]),
         storageUuids.toArray(new String[storageUuids.size()]),
         convertStorageTypes(storageTypes, storageUuids.size()));
+    if (b.hasIndices()) {
+      blk = new StripedBlockWithLocations(blk, b.getIndices().toByteArray(),
+          (short) b.getDataBlockNum());
+    }
+    return blk;
   }
 
   public static BlocksWithLocationsProto convert(BlocksWithLocations blks) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7af2a421/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index b4b06ee..689c0ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.balancer;
 
 import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
@@ -52,6 +53,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -65,6 +67,7 @@ import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
@@ -149,18 +152,17 @@ public class Dispatcher {
     private final Map<Block, DBlock> map = new HashMap<Block, DBlock>();
 
     /**
-     * Get the block from the map;
-     * if the block is not found, create a new block and put it in the map.
+     * Put block in the map if it's not found
+     * @return the block which be put in the map the first time
      */
-    private DBlock get(Block b) {
-      DBlock block = map.get(b);
-      if (block == null) {
-        block = new DBlock(b);
-        map.put(b, block);
+    private DBlock putIfAbsent(Block blk, DBlock dblk) {
+      if (!map.containsKey(blk)) {
+        map.put(blk, dblk);
+        return dblk;
       }
-      return block;
+      return map.get(blk);
     }
-    
+
     /** Remove all blocks except for the moved blocks. */
     private void removeAllButRetain(MovedBlocks<StorageGroup> movedBlocks) {
       for (Iterator<Block> i = map.keySet().iterator(); i.hasNext();) {
@@ -201,9 +203,9 @@ public class Dispatcher {
     }
   }
 
-  /** This class keeps track of a scheduled block move */
+  /** This class keeps track of a scheduled reportedBlock move */
   public class PendingMove {
-    private DBlock block;
+    private DBlock reportedBlock;
     private Source source;
     private DDatanode proxySource;
     private StorageGroup target;
@@ -215,7 +217,7 @@ public class Dispatcher {
 
     @Override
     public String toString() {
-      final Block b = block != null ? block.getBlock() : null;
+      final Block b = reportedBlock != null ? reportedBlock.getBlock() : null;
       String bStr = b != null ? (b + " with size=" + b.getNumBytes() + " ")
           : " ";
       return bStr + "from " + source.getDisplayName() + " to " + target
@@ -224,8 +226,8 @@ public class Dispatcher {
     }
 
     /**
-     * Choose a block & a proxy source for this pendingMove whose source &
-     * target have already been chosen.
+     * Choose a good block/blockGroup from source & Get reportedBlock from
+     * the block & Choose a proxy source for the reportedBlock.
      * 
      * @return true if a block and its proxy are chosen; false otherwise
      */
@@ -249,7 +251,11 @@ public class Dispatcher {
       synchronized (block) {
         synchronized (movedBlocks) {
           if (isGoodBlockCandidate(source, target, targetStorageType, block)) {
-            this.block = block;
+            if (block instanceof DBlockStriped) {
+              reportedBlock = ((DBlockStriped) block).getInternalBlock(source);
+            } else {
+              reportedBlock = block;
+            }
             if (chooseProxySource()) {
               movedBlocks.put(block);
               if (LOG.isDebugEnabled()) {
@@ -276,7 +282,7 @@ public class Dispatcher {
       }
       // if node group is supported, first try add nodes in the same node group
       if (cluster.isNodeGroupAware()) {
-        for (StorageGroup loc : block.getLocations()) {
+        for (StorageGroup loc : reportedBlock.getLocations()) {
           if (cluster.isOnSameNodeGroup(loc.getDatanodeInfo(), targetDN)
               && addTo(loc)) {
             return true;
@@ -284,13 +290,13 @@ public class Dispatcher {
         }
       }
       // check if there is replica which is on the same rack with the target
-      for (StorageGroup loc : block.getLocations()) {
+      for (StorageGroup loc : reportedBlock.getLocations()) {
         if (cluster.isOnSameRack(loc.getDatanodeInfo(), targetDN) && addTo(loc))
{
           return true;
         }
       }
       // find out a non-busy replica
-      for (StorageGroup loc : block.getLocations()) {
+      for (StorageGroup loc : reportedBlock.getLocations()) {
         if (addTo(loc)) {
           return true;
         }
@@ -298,7 +304,7 @@ public class Dispatcher {
       return false;
     }
 
-    /** add to a proxy source for specific block movement */
+    /** add to a proxy source for specific reportedBlock movement */
     private boolean addTo(StorageGroup g) {
       final DDatanode dn = g.getDDatanode();
       if (dn.addPendingBlock(this)) {
@@ -311,6 +317,7 @@ public class Dispatcher {
     /** Dispatch the move to the proxy source & wait for the response. */
     private void dispatch() {
       LOG.info("Start moving " + this);
+      assert !(reportedBlock instanceof DBlockStriped);
 
       Socket sock = new Socket();
       DataOutputStream out = null;
@@ -325,7 +332,7 @@ public class Dispatcher {
         OutputStream unbufOut = sock.getOutputStream();
         InputStream unbufIn = sock.getInputStream();
         ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
-            block.getBlock());
+            reportedBlock.getBlock());
         final KeyManager km = nnc.getKeyManager(); 
         Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb);
         IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
@@ -339,7 +346,7 @@ public class Dispatcher {
 
         sendRequest(out, eb, accessToken);
         receiveResponse(in);
-        nnc.getBytesMoved().addAndGet(block.getNumBytes());
+        nnc.getBytesMoved().addAndGet(reportedBlock.getNumBytes());
         target.getDDatanode().setHasSuccess();
         LOG.info("Successfully moved " + this);
       } catch (IOException e) {
@@ -368,14 +375,14 @@ public class Dispatcher {
       }
     }
 
-    /** Send a block replace request to the output stream */
+    /** Send a reportedBlock replace request to the output stream */
     private void sendRequest(DataOutputStream out, ExtendedBlock eb,
         Token<BlockTokenIdentifier> accessToken) throws IOException {
       new Sender(out).replaceBlock(eb, target.storageType, accessToken,
           source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode);
     }
 
-    /** Receive a block copy response from the input stream */
+    /** Receive a reportedBlock copy response from the input stream */
     private void receiveResponse(DataInputStream in) throws IOException {
       BlockOpResponseProto response =
           BlockOpResponseProto.parseFrom(vintPrefixed(in));
@@ -383,13 +390,13 @@ public class Dispatcher {
         // read intermediate responses
         response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
       }
-      String logInfo = "block move is failed";
+      String logInfo = "reportedBlock move is failed";
       DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
     }
 
     /** reset the object */
     private void reset() {
-      block = null;
+      reportedBlock = null;
       source = null;
       proxySource = null;
       target = null;
@@ -401,6 +408,44 @@ public class Dispatcher {
     public DBlock(Block block) {
       super(block);
     }
+
+    public long getNumBytes(StorageGroup storage) {
+      return super.getNumBytes();
+    }
+  }
+
+  public static class DBlockStriped extends DBlock {
+
+    final byte[] indices;
+    final short dataBlockNum;
+
+    public DBlockStriped(Block block, byte[] indices, short dataBlockNum) {
+      super(block);
+      this.indices = indices;
+      this.dataBlockNum = dataBlockNum;
+    }
+
+    public DBlock getInternalBlock(StorageGroup storage) {
+      int idxInLocs = locations.indexOf(storage);
+      if (idxInLocs == -1) {
+        return null;
+      }
+      byte idxInGroup = indices[idxInLocs];
+      long blkId = getBlock().getBlockId() + idxInGroup;
+      long numBytes = getInternalBlockLength(getNumBytes(),
+          HdfsConstants.BLOCK_STRIPED_CELL_SIZE, dataBlockNum, idxInGroup);
+      Block blk = new Block(getBlock());
+      blk.setBlockId(blkId);
+      blk.setNumBytes(numBytes);
+      DBlock dblk = new DBlock(blk);
+      dblk.addLocation(storage);
+      return dblk;
+    }
+
+    @Override
+    public long getNumBytes(StorageGroup storage) {
+      return getInternalBlock(storage).getNumBytes();
+    }
   }
 
   /** The class represents a desired move. */
@@ -476,7 +521,7 @@ public class Dispatcher {
       private PendingMove addPendingMove(DBlock block, final PendingMove pm) {
         if (getDDatanode().addPendingBlock(pm)) {
           if (pm.markMovedIfGoodBlock(block, getStorageType())) {
-            incScheduledSize(pm.block.getNumBytes());
+            incScheduledSize(pm.reportedBlock.getNumBytes());
             return pm;
           } else {
             getDDatanode().removePendingBlock(pm);
@@ -651,7 +696,8 @@ public class Dispatcher {
      */
     private long getBlockList() throws IOException {
       final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
-      final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size);
+      final BlocksWithLocations newBlksLocs =
+          nnc.getBlocks(getDatanodeInfo(), size);
       if (LOG.isTraceEnabled()) {
         LOG.trace("getBlocks(" + getDatanodeInfo() + ", "
             + StringUtils.TraditionalBinaryPrefix.long2String(size, "B", 2)
@@ -659,16 +705,30 @@ public class Dispatcher {
       }
 
       long bytesReceived = 0;
-      for (BlockWithLocations blk : newBlocks.getBlocks()) {
-        bytesReceived += blk.getBlock().getNumBytes();
+      for (BlockWithLocations blkLocs : newBlksLocs.getBlocks()) {
+
+        DBlock block;
+        if (blkLocs instanceof StripedBlockWithLocations) {
+          StripedBlockWithLocations sblkLocs =
+              (StripedBlockWithLocations) blkLocs;
+          // approximate size
+          bytesReceived += sblkLocs.getBlock().getNumBytes() /
+              sblkLocs.getDataBlockNum();
+          block = new DBlockStriped(sblkLocs.getBlock(), sblkLocs.getIndices(),
+              sblkLocs.getDataBlockNum());
+        } else{
+          bytesReceived += blkLocs.getBlock().getNumBytes();
+          block = new DBlock(blkLocs.getBlock());
+        }
+
         synchronized (globalBlocks) {
-          final DBlock block = globalBlocks.get(blk.getBlock());
+          block = globalBlocks.putIfAbsent(blkLocs.getBlock(), block);
           synchronized (block) {
             block.clearLocations();
 
             // update locations
-            final String[] datanodeUuids = blk.getDatanodeUuids();
-            final StorageType[] storageTypes = blk.getStorageTypes();
+            final String[] datanodeUuids = blkLocs.getDatanodeUuids();
+            final StorageType[] storageTypes = blkLocs.getStorageTypes();
             for (int i = 0; i < datanodeUuids.length; i++) {
               final StorageGroup g = storageGroupMap.get(
                   datanodeUuids[i], storageTypes[i]);
@@ -707,6 +767,8 @@ public class Dispatcher {
      * target throttling has been considered. They are chosen only when they
      * have the capacity to support this block move. The block should be
      * dispatched immediately after this method is returned.
+     * If the block is a block group. Only the internal block on this source
+     * will be dispatched.
      * 
      * @return a move that's good for the source to dispatch immediately.
      */
@@ -718,7 +780,7 @@ public class Dispatcher {
         if (target.addPendingBlock(pendingBlock)) {
           // target is not busy, so do a tentative block allocation
           if (pendingBlock.chooseBlockAndProxy()) {
-            long blockSize = pendingBlock.block.getNumBytes();
+            long blockSize = pendingBlock.reportedBlock.getNumBytes(this);
             incScheduledSize(-blockSize);
             task.size -= blockSize;
             if (task.size == 0) {
@@ -793,7 +855,7 @@ public class Dispatcher {
             blocksToReceive -= getBlockList();
             continue;
           } catch (IOException e) {
-            LOG.warn("Exception while getting block list", e);
+            LOG.warn("Exception while getting reportedBlock list", e);
             return;
           }
         } else {
@@ -937,7 +999,7 @@ public class Dispatcher {
 
 
   public void executePendingMove(final PendingMove p) {
-    // move the block
+    // move the reportedBlock
     final DDatanode targetDn = p.target.getDDatanode();
     ExecutorService moveExecutor = targetDn.getMoveExecutor();
     if (moveExecutor == null) {
@@ -995,17 +1057,17 @@ public class Dispatcher {
       }
     }
 
-    // wait for all block moving to be done
+    // wait for all reportedBlock moving to be done
     waitForMoveCompletion(targets);
 
     return getBytesMoved() - bytesLastMoved;
   }
 
-  /** The sleeping period before checking if block move is completed again */
+  /** The sleeping period before checking if reportedBlock move is completed again */
   static private long blockMoveWaitTime = 30000L;
 
   /**
-   * Wait for all block move confirmations.
+   * Wait for all reportedBlock move confirmations.
    * @return true if there is failed move execution
    */
   public static boolean waitForMoveCompletion(
@@ -1047,7 +1109,7 @@ public class Dispatcher {
    * Decide if the block is a good candidate to be moved from source to target.
    * A block is a good candidate if
    * 1. the block is not in the process of being moved/has not been moved;
-   * 2. the block does not have a replica on the target;
+   * 2. the block does not have a replica/internalBlock on the target;
    * 3. doing the move does not reduce the number of racks that the block has
    */
   private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target,
@@ -1064,7 +1126,7 @@ public class Dispatcher {
     }
     final DatanodeInfo targetDatanode = target.getDatanodeInfo();
     if (source.getDatanodeInfo().equals(targetDatanode)) {
-      // the block is moved inside same DN
+      // the reportedBlock is moved inside same DN
       return true;
     }
 
@@ -1152,7 +1214,7 @@ public class Dispatcher {
     movedBlocks.cleanup();
   }
 
-  /** set the sleeping period for block move completion check */
+  /** set the sleeping period for reportedBlock move completion check */
   @VisibleForTesting
   public static void setBlockMoveWaitTime(long time) {
     blockMoveWaitTime = time;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7af2a421/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 986b437..49e1306 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -3324,9 +3325,10 @@ public class BlockManager implements BlockStatsMXBean {
 
   /**
    * Get all valid locations of the block & add the block to results
-   * return the length of the added block; 0 if the block is not added
+   * @return the length of the added block; 0 if the block is not added. If the
+   * added block is a block group, return its approximate internal block size
    */
-  private long addBlock(Block block, List<BlockWithLocations> results) {
+  private long addBlock(BlockInfo block, List<BlockWithLocations> results) {
     final List<DatanodeStorageInfo> locations = getValidLocations(block);
     if(locations.size() == 0) {
       return 0;
@@ -3340,9 +3342,23 @@ public class BlockManager implements BlockStatsMXBean {
         storageIDs[i] = s.getStorageID();
         storageTypes[i] = s.getStorageType();
       }
-      results.add(new BlockWithLocations(block, datanodeUuids, storageIDs,
-          storageTypes));
-      return block.getNumBytes();
+      BlockWithLocations blkWithLocs = new BlockWithLocations(block,
+          datanodeUuids, storageIDs, storageTypes);
+      if(block.isStriped()) {
+        BlockInfoStriped blockStriped = (BlockInfoStriped) block;
+        byte[] indices = new byte[locations.size()];
+        for (int i = 0; i < locations.size(); i++) {
+          indices[i] =
+              (byte) blockStriped.getStorageBlockIndex(locations.get(i));
+        }
+        results.add(new StripedBlockWithLocations(blkWithLocs, indices,
+            blockStriped.getDataBlockNum()));
+        // approximate size
+        return block.getNumBytes() / blockStriped.getDataBlockNum();
+      }else{
+        results.add(blkWithLocs);
+        return block.getNumBytes();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7af2a421/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index afacebb..29a20e4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -45,12 +45,15 @@ import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
+
 import java.io.BufferedReader;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -174,8 +177,20 @@ public class Mover {
     }
   }
 
-  DBlock newDBlock(Block block, List<MLocation> locations) {
-    final DBlock db = new DBlock(block);
+  DBlock newDBlock(LocatedBlock lb, List<MLocation> locations,
+                   ECSchema ecSchema) {
+    Block blk = lb.getBlock().getLocalBlock();
+    DBlock db;
+    if (lb.isStriped()) {
+      LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
+      byte[] indices = new byte[lsb.getBlockIndices().length];
+      for (int i = 0; i < indices.length; i++) {
+        indices[i] = (byte) lsb.getBlockIndices()[i];
+      }
+      db = new DBlockStriped(blk, indices, (short) ecSchema.getNumDataUnits());
+    } else {
+      db = new DBlock(blk);
+    }
     for(MLocation ml : locations) {
       StorageGroup source = storages.getSource(ml);
       if (source != null) {
@@ -358,9 +373,10 @@ public class Mover {
         LOG.warn("Failed to get the storage policy of file " + fullPath);
         return;
       }
-      final List<StorageType> types = policy.chooseStorageTypes(
+      List<StorageType> types = policy.chooseStorageTypes(
           status.getReplication());
 
+      final ECSchema ecSchema = status.getECSchema();
       final LocatedBlocks locatedBlocks = status.getBlockLocations();
       final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete();
       List<LocatedBlock> lbs = locatedBlocks.getLocatedBlocks();
@@ -370,10 +386,13 @@ public class Mover {
           continue;
         }
         LocatedBlock lb = lbs.get(i);
+        if (lb.isStriped()) {
+          types = policy.chooseStorageTypes((short) lb.getLocations().length);
+        }
         final StorageTypeDiff diff = new StorageTypeDiff(types,
             lb.getStorageTypes());
         if (!diff.removeOverlap(true)) {
-          if (scheduleMoves4Block(diff, lb)) {
+          if (scheduleMoves4Block(diff, lb, ecSchema)) {
             result.updateHasRemaining(diff.existing.size() > 1
                 && diff.expected.size() > 1);
             // One block scheduled successfully, set noBlockMoved to false
@@ -385,10 +404,13 @@ public class Mover {
       }
     }
 
-    boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
+    boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb,
+                                ECSchema ecSchema) {
       final List<MLocation> locations = MLocation.toLocations(lb);
-      Collections.shuffle(locations);
-      final DBlock db = newDBlock(lb.getBlock().getLocalBlock(), locations);
+      if (!(lb instanceof LocatedStripedBlock)) {
+        Collections.shuffle(locations);
+      }
+      final DBlock db = newDBlock(lb, locations, ecSchema);
 
       for (final StorageType t : diff.existing) {
         for (final MLocation ml : locations) {
@@ -781,4 +803,4 @@ public class Mover {
       System.exit(-1);
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7af2a421/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java
index a985dbd..0507faf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.protocol;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.StorageType;
@@ -91,6 +92,30 @@ public class BlocksWithLocations {
     }
   }
 
+  public static class StripedBlockWithLocations extends BlockWithLocations {
+    final byte[] indices;
+    final short dataBlockNum;
+
+    public StripedBlockWithLocations(BlockWithLocations blk, byte[] indices,
+         short dataBlockNum) {
+      super(blk.getBlock(), blk.getDatanodeUuids(), blk.getStorageIDs(),
+          blk.getStorageTypes());
+      Preconditions.checkArgument(
+          blk.getDatanodeUuids().length == indices.length);
+      this.indices = indices;
+      this.dataBlockNum = dataBlockNum;
+
+    }
+
+    public byte[] getIndices() {
+      return indices;
+    }
+
+    public short getDataBlockNum() {
+      return dataBlockNum;
+    }
+  }
+
   private final BlockWithLocations[] blocks;
 
   /** Constructor with one parameter */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7af2a421/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index fe9f795..bbbcc60 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1989,4 +1989,19 @@ public class DFSTestUtil {
     out.flushInternal();
     return out.getBlock();
   }
+
+  /**
+   * Verify that blocks in striped block group are on different nodes.
+   */
+  public static void verifyLocatedStripedBlocks(LocatedBlocks lbs,
+       int groupSize) {
+    for (LocatedBlock lb : lbs.getLocatedBlocks()) {
+      HashSet<DatanodeInfo> locs = new HashSet<>();
+      for (DatanodeInfo datanodeInfo : lb.getLocations()) {
+        locs.add(datanodeInfo);
+      }
+      assertEquals(groupSize, lb.getLocations().length);
+      assertEquals(groupSize, locs.size());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7af2a421/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
index a0b2038..3675e63 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
@@ -78,6 +78,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -189,40 +190,58 @@ public class TestPBHelper {
     assertEquals(b, b2);
   }
 
-  private static BlockWithLocations getBlockWithLocations(int bid) {
+  private static BlockWithLocations getBlockWithLocations(
+      int bid, boolean isStriped) {
     final String[] datanodeUuids = {"dn1", "dn2", "dn3"};
     final String[] storageIDs = {"s1", "s2", "s3"};
     final StorageType[] storageTypes = {
         StorageType.DISK, StorageType.DISK, StorageType.DISK};
-    return new BlockWithLocations(new Block(bid, 0, 1),
+    final byte[] indices = {0, 1, 2};
+    final short dataBlkNum = 6;
+    BlockWithLocations blkLocs = new BlockWithLocations(new Block(bid, 0, 1),
         datanodeUuids, storageIDs, storageTypes);
+    if (isStriped) {
+      blkLocs = new StripedBlockWithLocations(blkLocs, indices, dataBlkNum);
+    }
+    return blkLocs;
   }
 
   private void compare(BlockWithLocations locs1, BlockWithLocations locs2) {
     assertEquals(locs1.getBlock(), locs2.getBlock());
     assertTrue(Arrays.equals(locs1.getStorageIDs(), locs2.getStorageIDs()));
+    if (locs1 instanceof StripedBlockWithLocations) {
+      assertTrue(Arrays.equals(((StripedBlockWithLocations) locs1).getIndices(),
+          ((StripedBlockWithLocations) locs2).getIndices()));
+    }
   }
 
   @Test
   public void testConvertBlockWithLocations() {
-    BlockWithLocations locs = getBlockWithLocations(1);
-    BlockWithLocationsProto locsProto = PBHelper.convert(locs);
-    BlockWithLocations locs2 = PBHelper.convert(locsProto);
-    compare(locs, locs2);
+    boolean[] testSuite = new boolean[]{false, true};
+    for (int i = 0; i < testSuite.length; i++) {
+      BlockWithLocations locs = getBlockWithLocations(1, testSuite[i]);
+      BlockWithLocationsProto locsProto = PBHelper.convert(locs);
+      BlockWithLocations locs2 = PBHelper.convert(locsProto);
+      compare(locs, locs2);
+    }
   }
 
   @Test
   public void testConvertBlocksWithLocations() {
-    BlockWithLocations[] list = new BlockWithLocations[] {
-        getBlockWithLocations(1), getBlockWithLocations(2) };
-    BlocksWithLocations locs = new BlocksWithLocations(list);
-    BlocksWithLocationsProto locsProto = PBHelper.convert(locs);
-    BlocksWithLocations locs2 = PBHelper.convert(locsProto);
-    BlockWithLocations[] blocks = locs.getBlocks();
-    BlockWithLocations[] blocks2 = locs2.getBlocks();
-    assertEquals(blocks.length, blocks2.length);
-    for (int i = 0; i < blocks.length; i++) {
-      compare(blocks[i], blocks2[i]);
+    boolean[] testSuite = new boolean[]{false, true};
+    for (int i = 0; i < testSuite.length; i++) {
+      BlockWithLocations[] list = new BlockWithLocations[]{
+          getBlockWithLocations(1, testSuite[i]),
+          getBlockWithLocations(2, testSuite[i])};
+      BlocksWithLocations locs = new BlocksWithLocations(list);
+      BlocksWithLocationsProto locsProto = PBHelper.convert(locs);
+      BlocksWithLocations locs2 = PBHelper.convert(locsProto);
+      BlockWithLocations[] blocks = locs.getBlocks();
+      BlockWithLocations[] blocks2 = locs2.getBlocks();
+      assertEquals(blocks.length, blocks2.length);
+      for (int j = 0; j < blocks.length; j++) {
+        compare(blocks[j], blocks2[j]);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7af2a421/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 194aa0f..16e4c2c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -80,6 +80,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
@@ -144,6 +145,21 @@ public class TestBalancer {
     LazyPersistTestCase.initCacheManipulator();
   }
 
+  int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+  int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
+  int groupSize = dataBlocks + parityBlocks;
+  private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private final static int stripesPerBlock = 4;
+  static int DEFAULT_STRIPE_BLOCK_SIZE = cellSize * stripesPerBlock;
+
+  static void initConfWithStripe(Configuration conf) {
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_STRIPE_BLOCK_SIZE);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+    SimulatedFSDataset.setFactory(conf);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
+    conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
+  }
+
   /* create a file with a length of <code>fileLen</code> */
   static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen,
       short replicationFactor, int nnIndex)
@@ -1529,6 +1545,66 @@ public class TestBalancer {
     }
   }
 
+  @Test(timeout = 100000)
+  public void testBalancerWithStripedFile() throws Exception {
+    Configuration conf = new Configuration();
+    initConfWithStripe(conf);
+    int numOfDatanodes = dataBlocks + parityBlocks + 2;
+    int numOfRacks = dataBlocks;
+    long capacity = 20 * DEFAULT_STRIPE_BLOCK_SIZE;
+    long[] capacities = new long[numOfDatanodes];
+    for (int i = 0; i < capacities.length; i++) {
+      capacities[i] = capacity;
+    }
+    String[] racks = new String[numOfDatanodes];
+    for (int i = 0; i < numOfDatanodes; i++) {
+      racks[i] = "/rack" + (i % numOfRacks);
+    }
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numOfDatanodes)
+        .racks(racks)
+        .simulatedCapacities(capacities)
+        .build();
+
+    try {
+      cluster.waitActive();
+      client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(),
+          ClientProtocol.class).getProxy();
+      client.createErasureCodingZone("/", null, 0);
+
+      long totalCapacity = sum(capacities);
+
+      // fill up the cluster with 30% data. It'll be 45% full plus parity.
+      long fileLen = totalCapacity * 3 / 10;
+      long totalUsedSpace = fileLen * (dataBlocks + parityBlocks) / dataBlocks;
+      FileSystem fs = cluster.getFileSystem(0);
+      DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, r.nextLong());
+
+      // verify locations of striped blocks
+      LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
+      DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
+
+      // add one datanode
+      String newRack = "/rack" + (++numOfRacks);
+      cluster.startDataNodes(conf, 1, true, null,
+          new String[]{newRack}, null, new long[]{capacity});
+      totalCapacity += capacity;
+      cluster.triggerHeartbeats();
+
+      // run balancer and validate results
+      Balancer.Parameters p = Balancer.Parameters.DEFAULT;
+      Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+      runBalancer(conf, totalUsedSpace, totalCapacity, p, 0);
+
+      // verify locations of striped blocks
+      locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
+      DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   /**
    * @param args
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7af2a421/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
index d3d814c..b4e17db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -34,7 +35,12 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.NameNodeProxies;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher;
 import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock;
 import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
 import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
@@ -99,7 +105,7 @@ public class TestMover {
       final LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
       final List<MLocation> locations = MLocation.toLocations(lb);
       final MLocation ml = locations.get(0);
-      final DBlock db = mover.newDBlock(lb.getBlock().getLocalBlock(), locations);
+      final DBlock db = mover.newDBlock(lb, locations, null);
 
       final List<StorageType> storageTypes = new ArrayList<StorageType>(
           Arrays.asList(StorageType.DEFAULT, StorageType.DEFAULT));
@@ -409,4 +415,119 @@ public class TestMover {
       cluster.shutdown();
     }
   }
+
+  int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+  int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
+  private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private final static int stripesPerBlock = 4;
+  static int DEFAULT_STRIPE_BLOCK_SIZE = cellSize * stripesPerBlock;
+
+  static void initConfWithStripe(Configuration conf) {
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_STRIPE_BLOCK_SIZE);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
+    Dispatcher.setBlockMoveWaitTime(3000L);
+  }
+
+  @Test(timeout = 300000)
+  public void testMoverWithStripedFile() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConfWithStripe(conf);
+
+    // start 10 datanodes
+    int numOfDatanodes =10;
+    int storagesPerDatanode=2;
+    long capacity = 10 * DEFAULT_STRIPE_BLOCK_SIZE;
+    long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
+    for (int i = 0; i < numOfDatanodes; i++) {
+      for(int j=0;j<storagesPerDatanode;j++){
+        capacities[i][j]=capacity;
+      }
+    }
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numOfDatanodes)
+        .storagesPerDatanode(storagesPerDatanode)
+        .storageTypes(new StorageType[][]{
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.DISK},
+            {StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.ARCHIVE},
+            {StorageType.DISK, StorageType.ARCHIVE}})
+        .storageCapacities(capacities)
+        .build();
+
+    try {
+      cluster.waitActive();
+
+      // set "/bar" directory with HOT storage policy.
+      ClientProtocol client = NameNodeProxies.createProxy(conf,
+          cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
+      String barDir = "/bar";
+      client.mkdirs(barDir, new FsPermission((short) 777), true);
+      client.setStoragePolicy(barDir,
+          HdfsConstants.HOT_STORAGE_POLICY_NAME);
+      // set "/bar" directory with EC zone.
+      client.createErasureCodingZone(barDir, null, 0);
+
+      // write file to barDir
+      final String fooFile = "/bar/foo";
+      long fileLen = 20 * DEFAULT_STRIPE_BLOCK_SIZE ;
+      DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile),
+          fileLen,(short) 3, 0);
+
+      // verify storage types and locations
+      LocatedBlocks locatedBlocks =
+          client.getBlockLocations(fooFile, 0, fileLen);
+      for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()){
+        for( StorageType type : lb.getStorageTypes()){
+          Assert.assertEquals(StorageType.DISK, type);
+        }
+      }
+      DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
+          dataBlocks + parityBlocks);
+
+      // start 5 more datanodes
+      numOfDatanodes +=5;
+      capacities = new long[5][storagesPerDatanode];
+      for (int i = 0; i < 5; i++) {
+        for(int j=0;j<storagesPerDatanode;j++){
+          capacities[i][j]=capacity;
+        }
+      }
+      cluster.startDataNodes(conf, 5,
+          new StorageType[][]{
+              {StorageType.ARCHIVE, StorageType.ARCHIVE},
+              {StorageType.ARCHIVE, StorageType.ARCHIVE},
+              {StorageType.ARCHIVE, StorageType.ARCHIVE},
+              {StorageType.ARCHIVE, StorageType.ARCHIVE},
+              {StorageType.ARCHIVE, StorageType.ARCHIVE}},
+          true, null, null, null,capacities, null, false, false, false, null);
+      cluster.triggerHeartbeats();
+
+      // move file to ARCHIVE
+      client.setStoragePolicy(barDir, "COLD");
+      // run Mover
+      int rc = ToolRunner.run(conf, new Mover.Cli(),
+          new String[] { "-p", barDir });
+      Assert.assertEquals("Movement to ARCHIVE should be successfull", 0, rc);
+
+      // verify storage types and locations
+      locatedBlocks = client.getBlockLocations(fooFile, 0, fileLen);
+      for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()){
+        for( StorageType type : lb.getStorageTypes()){
+          Assert.assertEquals(StorageType.ARCHIVE, type);
+        }
+      }
+      DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
+          dataBlocks + parityBlocks);
+
+    }finally{
+      cluster.shutdown();
+    }
+  }
 }


Mime
View raw message