hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z..@apache.org
Subject [21/36] hadoop git commit: HDFS-7285. Erasure Coding Support inside HDFS.
Date Fri, 14 Aug 2015 17:55:33 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
new file mode 100644
index 0000000..4dc94a0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -0,0 +1,947 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.util;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSStripedOutputStream;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
+import org.apache.hadoop.security.token.Token;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.io.IOException;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * When accessing a file in striped layout, operations on logical byte ranges
+ * in the file need to be mapped to physical byte ranges on block files stored
+ * on DataNodes. This utility class facilities this mapping by defining and
+ * exposing a number of striping-related concepts. The most basic ones are
+ * illustrated in the following diagram. Unless otherwise specified, all
+ * range-related calculations are inclusive (the end offset of the previous
+ * range should be 1 byte lower than the start offset of the next one).
+ *
+ *  | <----  Block Group ----> |   <- Block Group: logical unit composing
+ *  |                          |        striped HDFS files.
+ *  blk_0      blk_1       blk_2   <- Internal Blocks: each internal block
+ *    |          |           |          represents a physically stored local
+ *    v          v           v          block file
+ * +------+   +------+   +------+
+ * |cell_0|   |cell_1|   |cell_2|  <- {@link StripingCell} represents the
+ * +------+   +------+   +------+       logical order that a Block Group should
+ * |cell_3|   |cell_4|   |cell_5|       be accessed: cell_0, cell_1, ...
+ * +------+   +------+   +------+
+ * |cell_6|   |cell_7|   |cell_8|
+ * +------+   +------+   +------+
+ * |cell_9|
+ * +------+  <- A cell contains cellSize bytes of data
+ */
+@InterfaceAudience.Private
+public class StripedBlockUtil {
+
+  /**
+   * This method parses a striped block group into individual blocks.
+   *
+   * @param bg The striped block group
+   * @param cellSize The size of a striping cell
+   * @param dataBlkNum The number of data blocks
+   * @return An array containing the blocks in the group
+   */
+  public static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg,
+      int cellSize, int dataBlkNum, int parityBlkNum) {
+    int locatedBGSize = bg.getBlockIndices().length;
+    LocatedBlock[] lbs = new LocatedBlock[dataBlkNum + parityBlkNum];
+    for (short i = 0; i < locatedBGSize; i++) {
+      final int idx = bg.getBlockIndices()[i];
+      // for now we do not use redundant replica of an internal block
+      if (idx < (dataBlkNum + parityBlkNum) && lbs[idx] == null) {
+        lbs[idx] = constructInternalBlock(bg, i, cellSize,
+            dataBlkNum, idx);
+      }
+    }
+    return lbs;
+  }
+
+  /**
+   * This method creates an internal block at the given index of a block group
+   *
+   * @param idxInReturnedLocs The index in the stored locations in the
+   *                          {@link LocatedStripedBlock} object
+   * @param idxInBlockGroup The logical index in the striped block group
+   * @return The constructed internal block
+   */
+  public static LocatedBlock constructInternalBlock(LocatedStripedBlock bg,
+      int idxInReturnedLocs, int cellSize, int dataBlkNum,
+      int idxInBlockGroup) {
+    final ExtendedBlock blk = constructInternalBlock(
+        bg.getBlock(), cellSize, dataBlkNum, idxInBlockGroup);
+    final LocatedBlock locatedBlock;
+    if (idxInReturnedLocs < bg.getLocations().length) {
+      locatedBlock = new LocatedBlock(blk,
+          new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
+          new String[]{bg.getStorageIDs()[idxInReturnedLocs]},
+          new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]},
+          bg.getStartOffset(), bg.isCorrupt(), null);
+    } else {
+      locatedBlock = new LocatedBlock(blk, null, null, null,
+          bg.getStartOffset(), bg.isCorrupt(), null);
+    }
+    Token<BlockTokenIdentifier>[] blockTokens = bg.getBlockTokens();
+    if (idxInReturnedLocs < blockTokens.length) {
+      locatedBlock.setBlockToken(blockTokens[idxInReturnedLocs]);
+    }
+    return locatedBlock;
+  }
+
+  /**
+   * This method creates an internal {@link ExtendedBlock} at the given index
+   * of a block group.
+   */
+  public static ExtendedBlock constructInternalBlock(ExtendedBlock blockGroup,
+      int cellSize, int dataBlkNum, int idxInBlockGroup) {
+    ExtendedBlock block = new ExtendedBlock(blockGroup);
+    block.setBlockId(blockGroup.getBlockId() + idxInBlockGroup);
+    block.setNumBytes(getInternalBlockLength(blockGroup.getNumBytes(),
+        cellSize, dataBlkNum, idxInBlockGroup));
+    return block;
+  }
+
+  /**
+   * Get the size of an internal block at the given index of a block group
+   *
+   * @param dataSize Size of the block group only counting data blocks
+   * @param cellSize The size of a striping cell
+   * @param numDataBlocks The number of data blocks
+   * @param i The logical index in the striped block group
+   * @return The size of the internal block at the specified index
+   */
+  public static long getInternalBlockLength(long dataSize,
+      int cellSize, int numDataBlocks, int i) {
+    Preconditions.checkArgument(dataSize >= 0);
+    Preconditions.checkArgument(cellSize > 0);
+    Preconditions.checkArgument(numDataBlocks > 0);
+    Preconditions.checkArgument(i >= 0);
+    // Size of each stripe (only counting data blocks)
+    final int stripeSize = cellSize * numDataBlocks;
+    // If block group ends at stripe boundary, each internal block has an equal
+    // share of the group
+    final int lastStripeDataLen = (int)(dataSize % stripeSize);
+    if (lastStripeDataLen == 0) {
+      return dataSize / numDataBlocks;
+    }
+
+    final int numStripes = (int) ((dataSize - 1) / stripeSize + 1);
+    return (numStripes - 1L)*cellSize
+        + lastCellSize(lastStripeDataLen, cellSize, numDataBlocks, i);
+  }
+
+  private static int lastCellSize(int size, int cellSize, int numDataBlocks,
+      int i) {
+    if (i < numDataBlocks) {
+      // parity block size (i.e. i >= numDataBlocks) is the same as 
+      // the first data block size (i.e. i = 0).
+      size -= i*cellSize;
+      if (size < 0) {
+        size = 0;
+      }
+    }
+    return size > cellSize? cellSize: size;
+  }
+
+  /**
+   * Given a byte's offset in an internal block, calculate the offset in
+   * the block group
+   */
+  public static long offsetInBlkToOffsetInBG(int cellSize, int dataBlkNum,
+      long offsetInBlk, int idxInBlockGroup) {
+    int cellIdxInBlk = (int) (offsetInBlk / cellSize);
+    return cellIdxInBlk * cellSize * dataBlkNum // n full stripes before offset
+        + idxInBlockGroup * cellSize // m full cells before offset
+        + offsetInBlk % cellSize; // partial cell
+  }
+
+  /**
+   * Get the next completed striped read task
+   *
+   * @return {@link StripingChunkReadResult} indicating the status of the read task
+   *          succeeded, and the block index of the task. If the method times
+   *          out without getting any completed read tasks, -1 is returned as
+   *          block index.
+   * @throws InterruptedException
+   */
+  public static StripingChunkReadResult getNextCompletedStripedRead(
+      CompletionService<Void> readService, Map<Future<Void>, Integer> futures,
+      final long threshold) throws InterruptedException {
+    Preconditions.checkArgument(!futures.isEmpty());
+    Future<Void> future = null;
+    try {
+      if (threshold > 0) {
+        future = readService.poll(threshold, TimeUnit.MILLISECONDS);
+      } else {
+        future = readService.take();
+      }
+      if (future != null) {
+        future.get();
+        return new StripingChunkReadResult(futures.remove(future),
+            StripingChunkReadResult.SUCCESSFUL);
+      } else {
+        return new StripingChunkReadResult(StripingChunkReadResult.TIMEOUT);
+      }
+    } catch (ExecutionException e) {
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("ExecutionException " + e);
+      }
+      return new StripingChunkReadResult(futures.remove(future),
+          StripingChunkReadResult.FAILED);
+    } catch (CancellationException e) {
+      return new StripingChunkReadResult(futures.remove(future),
+          StripingChunkReadResult.CANCELLED);
+    }
+  }
+
+  /**
+   * Get the total usage of the striped blocks, which is the total of data
+   * blocks and parity blocks
+   *
+   * @param numDataBlkBytes
+   *          Size of the block group only counting data blocks
+   * @param dataBlkNum
+   *          The number of data blocks
+   * @param parityBlkNum
+   *          The number of parity blocks
+   * @param cellSize
+   *          The size of a striping cell
+   * @return The total usage of data blocks and parity blocks
+   */
+  public static long spaceConsumedByStripedBlock(long numDataBlkBytes,
+      int dataBlkNum, int parityBlkNum, int cellSize) {
+    int parityIndex = dataBlkNum + 1;
+    long numParityBlkBytes = getInternalBlockLength(numDataBlkBytes, cellSize,
+        dataBlkNum, parityIndex) * parityBlkNum;
+    return numDataBlkBytes + numParityBlkBytes;
+  }
+
+  /**
+   * Initialize the decoding input buffers based on the chunk states in an
+   * {@link AlignedStripe}. For each chunk that was not initially requested,
+   * schedule a new fetch request with the decoding input buffer as transfer
+   * destination.
+   */
+  public static byte[][] initDecodeInputs(AlignedStripe alignedStripe,
+      int dataBlkNum, int parityBlkNum) {
+    byte[][] decodeInputs =
+        new byte[dataBlkNum + parityBlkNum][(int) alignedStripe.getSpanInBlock()];
+    // read the full data aligned stripe
+    for (int i = 0; i < dataBlkNum; i++) {
+      if (alignedStripe.chunks[i] == null) {
+        final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum);
+        alignedStripe.chunks[i] = new StripingChunk(decodeInputs[decodeIndex]);
+        alignedStripe.chunks[i].addByteArraySlice(0,
+            (int) alignedStripe.getSpanInBlock());
+      }
+    }
+    return decodeInputs;
+  }
+
+  /**
+   * Some fetched {@link StripingChunk} might be stored in original application
+   * buffer instead of prepared decode input buffers. Some others are beyond
+   * the range of the internal blocks and should correspond to all zero bytes.
+   * When all pending requests have returned, this method should be called to
+   * finalize decode input buffers.
+   */
+  public static void finalizeDecodeInputs(final byte[][] decodeInputs,
+      int dataBlkNum, int parityBlkNum, AlignedStripe alignedStripe) {
+    for (int i = 0; i < alignedStripe.chunks.length; i++) {
+      final StripingChunk chunk = alignedStripe.chunks[i];
+      final int decodeIndex = convertIndex4Decode(i, dataBlkNum, parityBlkNum);
+      if (chunk != null && chunk.state == StripingChunk.FETCHED) {
+        chunk.copyTo(decodeInputs[decodeIndex]);
+      } else if (chunk != null && chunk.state == StripingChunk.ALLZERO) {
+        Arrays.fill(decodeInputs[decodeIndex], (byte) 0);
+      } else {
+        decodeInputs[decodeIndex] = null;
+      }
+    }
+  }
+
+  /**
+   * Currently decoding requires parity chunks are before data chunks.
+   * The indices are opposite to what we store in NN. In future we may
+   * improve the decoding to make the indices order the same as in NN.
+   *
+   * @param index The index to convert
+   * @param dataBlkNum The number of data blocks
+   * @param parityBlkNum The number of parity blocks
+   * @return converted index
+   */
+  public static int convertIndex4Decode(int index, int dataBlkNum,
+      int parityBlkNum) {
+    return index < dataBlkNum ? index + parityBlkNum : index - dataBlkNum;
+  }
+
+  public static int convertDecodeIndexBack(int index, int dataBlkNum,
+      int parityBlkNum) {
+    return index < parityBlkNum ? index + dataBlkNum : index - parityBlkNum;
+  }
+
+  /**
+   * Decode based on the given input buffers and schema.
+   */
+  public static void decodeAndFillBuffer(final byte[][] decodeInputs,
+      AlignedStripe alignedStripe, int dataBlkNum, int parityBlkNum,
+      RawErasureDecoder decoder) {
+    // Step 1: prepare indices and output buffers for missing data units
+    int[] decodeIndices = new int[parityBlkNum];
+    int pos = 0;
+    for (int i = 0; i < dataBlkNum; i++) {
+      if (alignedStripe.chunks[i] != null &&
+          alignedStripe.chunks[i].state == StripingChunk.MISSING){
+        decodeIndices[pos++] = convertIndex4Decode(i, dataBlkNum, parityBlkNum);
+      }
+    }
+    decodeIndices = Arrays.copyOf(decodeIndices, pos);
+    byte[][] decodeOutputs =
+        new byte[decodeIndices.length][(int) alignedStripe.getSpanInBlock()];
+
+    // Step 2: decode into prepared output buffers
+    decoder.decode(decodeInputs, decodeIndices, decodeOutputs);
+
+    // Step 3: fill original application buffer with decoded data
+    for (int i = 0; i < decodeIndices.length; i++) {
+      int missingBlkIdx = convertDecodeIndexBack(decodeIndices[i],
+          dataBlkNum, parityBlkNum);
+      StripingChunk chunk = alignedStripe.chunks[missingBlkIdx];
+      if (chunk.state == StripingChunk.MISSING) {
+        chunk.copyFrom(decodeOutputs[i]);
+      }
+    }
+  }
+
+  /**
+   * Similar functionality with {@link #divideByteRangeIntoStripes}, but is used
+   * by stateful read and uses ByteBuffer as reading target buffer. Besides the
+   * read range is within a single stripe thus the calculation logic is simpler.
+   */
+  public static AlignedStripe[] divideOneStripe(ECSchema ecSchema,
+      int cellSize, LocatedStripedBlock blockGroup, long rangeStartInBlockGroup,
+      long rangeEndInBlockGroup, ByteBuffer buf) {
+    final int dataBlkNum = ecSchema.getNumDataUnits();
+    // Step 1: map the byte range to StripingCells
+    StripingCell[] cells = getStripingCellsOfByteRange(ecSchema, cellSize,
+        blockGroup, rangeStartInBlockGroup, rangeEndInBlockGroup);
+
+    // Step 2: get the unmerged ranges on each internal block
+    VerticalRange[] ranges = getRangesForInternalBlocks(ecSchema, cellSize,
+        cells);
+
+    // Step 3: merge into stripes
+    AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecSchema, ranges);
+
+    // Step 4: calculate each chunk's position in destination buffer. Since the
+    // whole read range is within a single stripe, the logic is simpler here.
+    int bufOffset = (int) (rangeStartInBlockGroup % (cellSize * dataBlkNum));
+    for (StripingCell cell : cells) {
+      long cellStart = cell.idxInInternalBlk * cellSize + cell.offset;
+      long cellEnd = cellStart + cell.size - 1;
+      for (AlignedStripe s : stripes) {
+        long stripeEnd = s.getOffsetInBlock() + s.getSpanInBlock() - 1;
+        long overlapStart = Math.max(cellStart, s.getOffsetInBlock());
+        long overlapEnd = Math.min(cellEnd, stripeEnd);
+        int overLapLen = (int) (overlapEnd - overlapStart + 1);
+        if (overLapLen > 0) {
+          Preconditions.checkState(s.chunks[cell.idxInStripe] == null);
+          final int pos = (int) (bufOffset + overlapStart - cellStart);
+          buf.position(pos);
+          buf.limit(pos + overLapLen);
+          s.chunks[cell.idxInStripe] = new StripingChunk(buf.slice());
+        }
+      }
+      bufOffset += cell.size;
+    }
+
+    // Step 5: prepare ALLZERO blocks
+    prepareAllZeroChunks(blockGroup, stripes, cellSize, dataBlkNum);
+    return stripes;
+  }
+
+  /**
+   * This method divides a requested byte range into an array of inclusive
+   * {@link AlignedStripe}.
+   * @param ecSchema The codec schema for the file, which carries the numbers
+   *                 of data / parity blocks
+   * @param cellSize Cell size of stripe
+   * @param blockGroup The striped block group
+   * @param rangeStartInBlockGroup The byte range's start offset in block group
+   * @param rangeEndInBlockGroup The byte range's end offset in block group
+   * @param buf Destination buffer of the read operation for the byte range
+   * @param offsetInBuf Start offset into the destination buffer
+   *
+   * At most 5 stripes will be generated from each logical range, as
+   * demonstrated in the header of {@link AlignedStripe}.
+   */
+  public static AlignedStripe[] divideByteRangeIntoStripes(ECSchema ecSchema,
+      int cellSize, LocatedStripedBlock blockGroup,
+      long rangeStartInBlockGroup, long rangeEndInBlockGroup, byte[] buf,
+      int offsetInBuf) {
+
+    // Step 0: analyze range and calculate basic parameters
+    final int dataBlkNum = ecSchema.getNumDataUnits();
+
+    // Step 1: map the byte range to StripingCells
+    StripingCell[] cells = getStripingCellsOfByteRange(ecSchema, cellSize,
+        blockGroup, rangeStartInBlockGroup, rangeEndInBlockGroup);
+
+    // Step 2: get the unmerged ranges on each internal block
+    VerticalRange[] ranges = getRangesForInternalBlocks(ecSchema, cellSize,
+        cells);
+
+    // Step 3: merge into at most 5 stripes
+    AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecSchema, ranges);
+
+    // Step 4: calculate each chunk's position in destination buffer
+    calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf, offsetInBuf);
+
+    // Step 5: prepare ALLZERO blocks
+    prepareAllZeroChunks(blockGroup, stripes, cellSize, dataBlkNum);
+
+    return stripes;
+  }
+
+  /**
+   * Map the logical byte range to a set of inclusive {@link StripingCell}
+   * instances, each representing the overlap of the byte range to a cell
+   * used by {@link DFSStripedOutputStream} in encoding
+   */
+  @VisibleForTesting
+  private static StripingCell[] getStripingCellsOfByteRange(ECSchema ecSchema,
+      int cellSize, LocatedStripedBlock blockGroup,
+      long rangeStartInBlockGroup, long rangeEndInBlockGroup) {
+    Preconditions.checkArgument(
+        rangeStartInBlockGroup <= rangeEndInBlockGroup &&
+            rangeEndInBlockGroup < blockGroup.getBlockSize());
+    long len = rangeEndInBlockGroup - rangeStartInBlockGroup + 1;
+    int firstCellIdxInBG = (int) (rangeStartInBlockGroup / cellSize);
+    int lastCellIdxInBG = (int) (rangeEndInBlockGroup / cellSize);
+    int numCells = lastCellIdxInBG - firstCellIdxInBG + 1;
+    StripingCell[] cells = new StripingCell[numCells];
+
+    final int firstCellOffset = (int) (rangeStartInBlockGroup % cellSize);
+    final int firstCellSize =
+        (int) Math.min(cellSize - (rangeStartInBlockGroup % cellSize), len);
+    cells[0] = new StripingCell(ecSchema, firstCellSize, firstCellIdxInBG,
+        firstCellOffset);
+    if (lastCellIdxInBG != firstCellIdxInBG) {
+      final int lastCellSize = (int) (rangeEndInBlockGroup % cellSize) + 1;
+      cells[numCells - 1] = new StripingCell(ecSchema, lastCellSize,
+          lastCellIdxInBG, 0);
+    }
+
+    for (int i = 1; i < numCells - 1; i++) {
+      cells[i] = new StripingCell(ecSchema, cellSize, i + firstCellIdxInBG, 0);
+    }
+
+    return cells;
+  }
+
+  /**
+   * Given a logical byte range, mapped to each {@link StripingCell}, calculate
+   * the physical byte range (inclusive) on each stored internal block.
+   */
+  @VisibleForTesting
+  private static VerticalRange[] getRangesForInternalBlocks(ECSchema ecSchema,
+      int cellSize, StripingCell[] cells) {
+    int dataBlkNum = ecSchema.getNumDataUnits();
+    int parityBlkNum = ecSchema.getNumParityUnits();
+
+    VerticalRange ranges[] = new VerticalRange[dataBlkNum + parityBlkNum];
+
+    long earliestStart = Long.MAX_VALUE;
+    long latestEnd = -1;
+    for (StripingCell cell : cells) {
+      // iterate through all cells and update the list of StripeRanges
+      if (ranges[cell.idxInStripe] == null) {
+        ranges[cell.idxInStripe] = new VerticalRange(
+            cell.idxInInternalBlk * cellSize + cell.offset, cell.size);
+      } else {
+        ranges[cell.idxInStripe].spanInBlock += cell.size;
+      }
+      VerticalRange range = ranges[cell.idxInStripe];
+      if (range.offsetInBlock < earliestStart) {
+        earliestStart = range.offsetInBlock;
+      }
+      if (range.offsetInBlock + range.spanInBlock - 1 > latestEnd) {
+        latestEnd = range.offsetInBlock + range.spanInBlock - 1;
+      }
+    }
+
+    // Each parity block should be fetched at maximum range of all data blocks
+    for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum; i++) {
+      ranges[i] = new VerticalRange(earliestStart,
+          latestEnd - earliestStart + 1);
+    }
+
+    return ranges;
+  }
+
+  /**
+   * Merge byte ranges on each internal block into a set of inclusive
+   * {@link AlignedStripe} instances.
+   */
+  private static AlignedStripe[] mergeRangesForInternalBlocks(
+      ECSchema ecSchema, VerticalRange[] ranges) {
+    int dataBlkNum = ecSchema.getNumDataUnits();
+    int parityBlkNum = ecSchema.getNumParityUnits();
+    List<AlignedStripe> stripes = new ArrayList<>();
+    SortedSet<Long> stripePoints = new TreeSet<>();
+    for (VerticalRange r : ranges) {
+      if (r != null) {
+        stripePoints.add(r.offsetInBlock);
+        stripePoints.add(r.offsetInBlock + r.spanInBlock);
+      }
+    }
+
+    long prev = -1;
+    for (long point : stripePoints) {
+      if (prev >= 0) {
+        stripes.add(new AlignedStripe(prev, point - prev,
+            dataBlkNum + parityBlkNum));
+      }
+      prev = point;
+    }
+    return stripes.toArray(new AlignedStripe[stripes.size()]);
+  }
+
+  private static void calcualteChunkPositionsInBuf(int cellSize,
+      AlignedStripe[] stripes, StripingCell[] cells, byte[] buf,
+      int offsetInBuf) {
+    /**
+     *     | <--------------- AlignedStripe --------------->|
+     *
+     *     |<- length_0 ->|<--  length_1  -->|<- length_2 ->|
+     * +------------------+------------------+----------------+
+     * |    cell_0_0_0    |    cell_3_1_0    |   cell_6_2_0   |  <- blk_0
+     * +------------------+------------------+----------------+
+     *   _/                \_______________________
+     *  |                                          |
+     *  v offset_0                                 v offset_1
+     * +----------------------------------------------------------+
+     * |  cell_0_0_0 |  cell_1_0_1 and cell_2_0_2  |cell_3_1_0 ...|   <- buf
+     * |  (partial)  |    (from blk_1 and blk_2)   |              |
+     * +----------------------------------------------------------+
+     *
+     * Cell indexing convention defined in {@link StripingCell}
+     */
+    int done = 0;
+    for (StripingCell cell : cells) {
+      long cellStart = cell.idxInInternalBlk * cellSize + cell.offset;
+      long cellEnd = cellStart + cell.size - 1;
+      for (AlignedStripe s : stripes) {
+        long stripeEnd = s.getOffsetInBlock() + s.getSpanInBlock() - 1;
+        long overlapStart = Math.max(cellStart, s.getOffsetInBlock());
+        long overlapEnd = Math.min(cellEnd, stripeEnd);
+        int overLapLen = (int) (overlapEnd - overlapStart + 1);
+        if (overLapLen <= 0) {
+          continue;
+        }
+        if (s.chunks[cell.idxInStripe] == null) {
+          s.chunks[cell.idxInStripe] = new StripingChunk(buf);
+        }
+        s.chunks[cell.idxInStripe].addByteArraySlice(
+            (int)(offsetInBuf + done + overlapStart - cellStart), overLapLen);
+      }
+      done += cell.size;
+    }
+  }
+
+  /**
+   * If a {@link StripingChunk} maps to a byte range beyond an internal block's
+   * size, the chunk should be treated as zero bytes in decoding.
+   */
+  private static void prepareAllZeroChunks(LocatedStripedBlock blockGroup,
+      AlignedStripe[] stripes, int cellSize, int dataBlkNum) {
+    for (AlignedStripe s : stripes) {
+      for (int i = 0; i < dataBlkNum; i++) {
+        long internalBlkLen = getInternalBlockLength(blockGroup.getBlockSize(),
+            cellSize, dataBlkNum, i);
+        if (internalBlkLen <= s.getOffsetInBlock()) {
+          Preconditions.checkState(s.chunks[i] == null);
+          s.chunks[i] = new StripingChunk(StripingChunk.ALLZERO);
+        }
+      }
+    }
+  }
+
+  /**
+   * Cell is the unit of encoding used in {@link DFSStripedOutputStream}. This
+   * size impacts how a logical offset in the file or block group translates
+   * to physical byte offset in a stored internal block. The StripingCell util
+   * class facilitates this calculation. Each StripingCell is inclusive with
+   * its start and end offsets -- e.g., the end logical offset of cell_0_0_0
+   * should be 1 byte lower than the start logical offset of cell_1_0_1.
+   *
+   *  | <------- Striped Block Group -------> |
+   *    blk_0          blk_1          blk_2
+   *      |              |              |
+   *      v              v              v
+   * +----------+   +----------+   +----------+
+   * |cell_0_0_0|   |cell_1_0_1|   |cell_2_0_2|
+   * +----------+   +----------+   +----------+
+   * |cell_3_1_0|   |cell_4_1_1|   |cell_5_1_2| <- {@link #idxInBlkGroup} = 5
+   * +----------+   +----------+   +----------+    {@link #idxInInternalBlk} = 1
+   *                                               {@link #idxInStripe} = 2
+   * A StripingCell is a special instance of {@link StripingChunk} whose offset
+   * and size align with the cell used when writing data.
+   * TODO: consider parity cells
+   */
+  @VisibleForTesting
+  static class StripingCell {
+    final ECSchema schema;
+    /** Logical order in a block group, used when doing I/O to a block group */
+    final int idxInBlkGroup;
+    final int idxInInternalBlk;
+    final int idxInStripe;
+    /**
+     * When a logical byte range is mapped to a set of cells, it might
+     * partially overlap with the first and last cells. This field and the
+     * {@link #size} variable represent the start offset and size of the
+     * overlap.
+     */
+    final int offset;
+    final int size;
+
+    StripingCell(ECSchema ecSchema, int cellSize, int idxInBlkGroup,
+        int offset) {
+      this.schema = ecSchema;
+      this.idxInBlkGroup = idxInBlkGroup;
+      this.idxInInternalBlk = idxInBlkGroup / ecSchema.getNumDataUnits();
+      this.idxInStripe = idxInBlkGroup -
+          this.idxInInternalBlk * ecSchema.getNumDataUnits();
+      this.offset = offset;
+      this.size = cellSize;
+    }
+  }
+
+  /**
+   * Given a requested byte range on a striped block group, an AlignedStripe
+   * represents an inclusive {@link VerticalRange} that is aligned with both
+   * the byte range and boundaries of all internal blocks. As illustrated in
+   * the diagram, any given byte range on a block group leads to 1~5
+   * AlignedStripe's.
+   *
+   * |<-------- Striped Block Group -------->|
+   * blk_0   blk_1   blk_2      blk_3   blk_4
+   *                 +----+  |  +----+  +----+
+   *                 |full|  |  |    |  |    | <- AlignedStripe0:
+   *         +----+  |~~~~|  |  |~~~~|  |~~~~|      1st cell is partial
+   *         |part|  |    |  |  |    |  |    | <- AlignedStripe1: byte range
+   * +----+  +----+  +----+  |  |~~~~|  |~~~~|      doesn't start at 1st block
+   * |full|  |full|  |full|  |  |    |  |    |
+   * |cell|  |cell|  |cell|  |  |    |  |    | <- AlignedStripe2 (full stripe)
+   * |    |  |    |  |    |  |  |    |  |    |
+   * +----+  +----+  +----+  |  |~~~~|  |~~~~|
+   * |full|  |part|          |  |    |  |    | <- AlignedStripe3: byte range
+   * |~~~~|  +----+          |  |~~~~|  |~~~~|      doesn't end at last block
+   * |    |                  |  |    |  |    | <- AlignedStripe4:
+   * +----+                  |  +----+  +----+      last cell is partial
+   *                         |
+   * <---- data blocks ----> | <--- parity --->
+   *
+   * An AlignedStripe is the basic unit of reading from a striped block group,
+   * because within the AlignedStripe, all internal blocks can be processed in
+   * a uniform manner.
+   *
+   * The coverage of an AlignedStripe on an internal block is represented as a
+   * {@link StripingChunk}.
+   *
+   * To simplify the logic of reading a logical byte range from a block group,
+   * a StripingChunk is either completely in the requested byte range or
+   * completely outside the requested byte range.
+   */
+  public static class AlignedStripe {
+    public VerticalRange range;
+    /** status of each chunk in the stripe */
+    public final StripingChunk[] chunks;
+    public int fetchedChunksNum = 0;
+    public int missingChunksNum = 0;
+
+    public AlignedStripe(long offsetInBlock, long length, int width) {
+      Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
+      this.range = new VerticalRange(offsetInBlock, length);
+      this.chunks = new StripingChunk[width];
+    }
+
+    public boolean include(long pos) {
+      return range.include(pos);
+    }
+
+    public long getOffsetInBlock() {
+      return range.offsetInBlock;
+    }
+
+    public long getSpanInBlock() {
+      return range.spanInBlock;
+    }
+
+    @Override
+    public String toString() {
+      return "Offset=" + range.offsetInBlock + ", length=" + range.spanInBlock +
+          ", fetchedChunksNum=" + fetchedChunksNum +
+          ", missingChunksNum=" + missingChunksNum;
+    }
+  }
+
+  /**
+   * A simple utility class representing an arbitrary vertical inclusive range
+   * starting at {@link #offsetInBlock} and lasting for {@link #spanInBlock}
+   * bytes in an internal block. Note that VerticalRange doesn't necessarily
+   * align with {@link StripingCell}.
+   *
+   * |<- Striped Block Group ->|
+   *  blk_0
+   *    |
+   *    v
+   * +-----+
+   * |~~~~~| <-- {@link #offsetInBlock}
+   * |     |  ^
+   * |     |  |
+   * |     |  | {@link #spanInBlock}
+   * |     |  v
+   * |~~~~~| ---
+   * |     |
+   * +-----+
+   */
+  public static class VerticalRange {
+    /** start offset in the block group (inclusive) */
+    public long offsetInBlock;
+    /** length of the stripe range */
+    public long spanInBlock;
+
+    public VerticalRange(long offsetInBlock, long length) {
+      Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
+      this.offsetInBlock = offsetInBlock;
+      this.spanInBlock = length;
+    }
+
+    /** whether a position is in the range */
+    public boolean include(long pos) {
+      return pos >= offsetInBlock && pos < offsetInBlock + spanInBlock;
+    }
+  }
+
+  /**
+   * Indicates the coverage of an {@link AlignedStripe} on an internal block,
+   * and the state of the chunk in the context of the read request.
+   *
+   * |<---------------- Striped Block Group --------------->|
+   *   blk_0        blk_1        blk_2          blk_3   blk_4
+   *                           +---------+  |  +----+  +----+
+   *     null         null     |REQUESTED|  |  |null|  |null| <- AlignedStripe0
+   *              +---------+  |---------|  |  |----|  |----|
+   *     null     |REQUESTED|  |REQUESTED|  |  |null|  |null| <- AlignedStripe1
+   * +---------+  +---------+  +---------+  |  +----+  +----+
+   * |REQUESTED|  |REQUESTED|    ALLZERO    |  |null|  |null| <- AlignedStripe2
+   * +---------+  +---------+               |  +----+  +----+
+   * <----------- data blocks ------------> | <--- parity --->
+   */
+  public static class StripingChunk {
+    /** Chunk has been successfully fetched */
+    public static final int FETCHED = 0x01;
+    /** Chunk has encountered failed when being fetched */
+    public static final int MISSING = 0x02;
+    /** Chunk being fetched (fetching task is in-flight) */
+    public static final int PENDING = 0x04;
+    /**
+     * Chunk is requested either by application or for decoding, need to
+     * schedule read task
+     */
+    public static final int REQUESTED = 0X08;
+    /**
+     * Internal block is short and has no overlap with chunk. Chunk considered
+     * all-zero bytes in codec calculations.
+     */
+    public static final int ALLZERO = 0X0f;
+
+    /**
+     * If a chunk is completely in requested range, the state transition is:
+     * REQUESTED (when AlignedStripe created) -> PENDING -> {FETCHED | MISSING}
+     * If a chunk is completely outside requested range (including parity
+     * chunks), state transition is:
+     * null (AlignedStripe created) -> REQUESTED (upon failure) -> PENDING ...
+     */
+    public int state = REQUESTED;
+
+    public final ChunkByteArray byteArray;
+    public final ByteBuffer byteBuffer;
+
+    public StripingChunk(byte[] buf) {
+      this.byteArray = new ChunkByteArray(buf);
+      byteBuffer = null;
+    }
+
+    public StripingChunk(ByteBuffer buf) {
+      this.byteArray = null;
+      this.byteBuffer = buf;
+    }
+
+    public StripingChunk(int state) {
+      this.byteArray = null;
+      this.byteBuffer = null;
+      this.state = state;
+    }
+
+    public void addByteArraySlice(int offset, int length) {
+      assert byteArray != null;
+      byteArray.offsetsInBuf.add(offset);
+      byteArray.lengthsInBuf.add(length);
+    }
+
+    void copyTo(byte[] target) {
+      assert byteArray != null;
+      byteArray.copyTo(target);
+    }
+
+    void copyFrom(byte[] src) {
+      assert byteArray != null;
+      byteArray.copyFrom(src);
+    }
+  }
+
+  public static class ChunkByteArray {
+    private final byte[] buf;
+    private final List<Integer> offsetsInBuf;
+    private final List<Integer> lengthsInBuf;
+
+    ChunkByteArray(byte[] buf) {
+      this.buf = buf;
+      this.offsetsInBuf = new ArrayList<>();
+      this.lengthsInBuf = new ArrayList<>();
+    }
+
+    public int[] getOffsets() {
+      int[] offsets = new int[offsetsInBuf.size()];
+      for (int i = 0; i < offsets.length; i++) {
+        offsets[i] = offsetsInBuf.get(i);
+      }
+      return offsets;
+    }
+
+    public int[] getLengths() {
+      int[] lens = new int[this.lengthsInBuf.size()];
+      for (int i = 0; i < lens.length; i++) {
+        lens[i] = this.lengthsInBuf.get(i);
+      }
+      return lens;
+    }
+
+    public byte[] buf() {
+      return buf;
+    }
+
+    void copyTo(byte[] target) {
+      int posInBuf = 0;
+      for (int i = 0; i < offsetsInBuf.size(); i++) {
+        System.arraycopy(buf, offsetsInBuf.get(i),
+            target, posInBuf, lengthsInBuf.get(i));
+        posInBuf += lengthsInBuf.get(i);
+      }
+    }
+
+    void copyFrom(byte[] src) {
+      int srcPos = 0;
+      for (int j = 0; j < offsetsInBuf.size(); j++) {
+        System.arraycopy(src, srcPos, buf, offsetsInBuf.get(j),
+            lengthsInBuf.get(j));
+        srcPos += lengthsInBuf.get(j);
+      }
+    }
+  }
+
+  /**
+   * This class represents result from a striped read request.
+   * If the task was successful or the internal computation failed,
+   * an index is also returned.
+   */
+  public static class StripingChunkReadResult {
+    public static final int SUCCESSFUL = 0x01;
+    public static final int FAILED = 0x02;
+    public static final int TIMEOUT = 0x04;
+    public static final int CANCELLED = 0x08;
+
+    public final int index;
+    public final int state;
+
+    public StripingChunkReadResult(int state) {
+      Preconditions.checkArgument(state == TIMEOUT,
+          "Only timeout result should return negative index.");
+      this.index = -1;
+      this.state = state;
+    }
+
+    public StripingChunkReadResult(int index, int state) {
+      Preconditions.checkArgument(state != TIMEOUT,
+          "Timeout result should return negative index.");
+      this.index = index;
+      this.state = state;
+    }
+
+    @Override
+    public String toString() {
+      return "(index=" + index + ", state =" + state + ")";
+    }
+  }
+
+  /**
+   * Check if the information such as IDs and generation stamps in block-i
+   * match block-j, where block-i and block-j are in the same group.
+   */
+  public static void checkBlocks(int j, ExtendedBlock blockj,
+      int i, ExtendedBlock blocki) throws IOException {
+
+    if (!blocki.getBlockPoolId().equals(blockj.getBlockPoolId())) {
+      throw new IOException("Block pool IDs mismatched: block" + j + "="
+          + blockj + ", block" + i + "=" + blocki);
+    }
+    if (blocki.getBlockId() - i != blockj.getBlockId() - j) {
+      throw new IOException("Block IDs mismatched: block" + j + "="
+          + blockj + ", block" + i + "=" + blocki);
+    }
+    if (blocki.getGenerationStamp() != blockj.getGenerationStamp()) {
+      throw new IOException("Generation stamps mismatched: block" + j + "="
+          + blockj + ", block" + i + "=" + blocki);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index b87e753..9d24b91 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -33,6 +33,7 @@ package hadoop.hdfs.datanode;
 
 import "HAServiceProtocol.proto";
 import "hdfs.proto";
+import "erasurecoding.proto";
 
 /**
  * Information to identify a datanode to a namenode
@@ -58,6 +59,7 @@ message DatanodeCommandProto {
     UnusedUpgradeCommand = 6;
     NullDatanodeCommand = 7;
     BlockIdCommand = 8;
+    BlockECRecoveryCommand = 9;
   }
 
   required Type cmdType = 1;    // Type of the command
@@ -71,6 +73,7 @@ message DatanodeCommandProto {
   optional KeyUpdateCommandProto keyUpdateCmd = 6;
   optional RegisterCommandProto registerCmd = 7;
   optional BlockIdCommandProto blkIdCmd = 8;
+  optional BlockECRecoveryCommandProto blkECRecoveryCmd = 9;
 }
 
 /**
@@ -145,6 +148,13 @@ message RegisterCommandProto {
 }
 
 /**
+ * Block Erasure coding recovery command
+ */
+message BlockECRecoveryCommandProto {
+  repeated BlockECRecoveryInfoProto blockECRecoveryinfo = 1;
+}
+
+/**
  * registration - Information of the datanode registering with the namenode
  */
 message RegisterDatanodeRequestProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto
index 3bd1d91..3233f66 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/fsimage.proto
@@ -73,6 +73,7 @@ message NameSystemSection {
   optional uint64 lastAllocatedBlockId = 5;
   optional uint64 transactionId = 6;
   optional uint64 rollingUpgradeStartTime = 7;
+  optional uint64 lastAllocatedStripedBlockId = 8;
 }
 
 /**
@@ -139,6 +140,8 @@ message INodeSection {
     optional AclFeatureProto acl = 8;
     optional XAttrFeatureProto xAttrs = 9;
     optional uint32 storagePolicyID = 10;
+    optional bool isStriped = 11;
+    optional uint64 stripingCellSize = 12;
   }
 
   message QuotaByStorageTypeEntryProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 8cb7d5f..fcf2bc1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2370,11 +2370,11 @@
   </description>
 </property>
 
-  <property>
-    <name>dfs.datanode.block-pinning.enabled</name>
-    <value>false</value>
-    <description>Whether pin blocks on favored DataNode.</description>
-  </property>
+<property>
+  <name>dfs.datanode.block-pinning.enabled</name>
+  <value>false</value>
+  <description>Whether pin blocks on favored DataNode.</description>
+</property>
 
 <property>
   <name>dfs.client.block.write.locateFollowingBlock.initial.delay.ms</name>
@@ -2412,4 +2412,25 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.datanode.stripedread.threshold.millis</name>
+  <value>5000</value>
+  <description>datanode striped read threshold in millisecond.
+  </description>
+</property>
+
+<property>
+  <name>dfs.datanode.stripedread.threads</name>
+  <value>20</value>
+  <description>datanode striped read thread pool size.
+  </description>
+</property>
+
+<property>
+  <name>dfs.datanode.stripedread.buffer.size</name>
+  <value>262144</value>
+  <description>datanode striped read buffer size.
+  </description>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/CLITestCmdErasureCoding.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/CLITestCmdErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/CLITestCmdErasureCoding.java
new file mode 100644
index 0000000..6c06a8d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/CLITestCmdErasureCoding.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cli;
+
+import org.apache.hadoop.cli.util.CLICommandErasureCodingCli;
+import org.apache.hadoop.cli.util.CLICommandTypes;
+import org.apache.hadoop.cli.util.CLITestCmd;
+import org.apache.hadoop.cli.util.CommandExecutor;
+import org.apache.hadoop.cli.util.ErasureCodingCliCmdExecutor;
+import org.apache.hadoop.hdfs.tools.erasurecode.ECCli;
+
+public class CLITestCmdErasureCoding extends CLITestCmd {
+  public CLITestCmdErasureCoding(String str, CLICommandTypes type) {
+    super(str, type);
+  }
+
+  @Override
+  public CommandExecutor getExecutor(String tag) throws IllegalArgumentException {
+    if (getType() instanceof CLICommandErasureCodingCli)
+      return new ErasureCodingCliCmdExecutor(tag, new ECCli());
+    return super.getExecutor(tag);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestErasureCodingCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestErasureCodingCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestErasureCodingCLI.java
new file mode 100644
index 0000000..5f01ea2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/TestErasureCodingCLI.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.cli;
+
+import org.apache.hadoop.cli.util.CLICommand;
+import org.apache.hadoop.cli.util.CLICommandErasureCodingCli;
+import org.apache.hadoop.cli.util.CommandExecutor.Result;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.xml.sax.SAXException;
+
+public class TestErasureCodingCLI extends CLITestHelper {
+  private final int NUM_OF_DATANODES = 3;
+  private MiniDFSCluster dfsCluster = null;
+  private FileSystem fs = null;
+  private String namenode = null;
+
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+
+    dfsCluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(NUM_OF_DATANODES).build();
+    dfsCluster.waitClusterUp();
+    namenode = conf.get(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "file:///");
+
+    username = System.getProperty("user.name");
+
+    fs = dfsCluster.getFileSystem();
+  }
+
+  @Override
+  protected String getTestFile() {
+    return "testErasureCodingConf.xml";
+  }
+
+  @After
+  @Override
+  public void tearDown() throws Exception {
+    if (fs != null) {
+      fs.close();
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+    }
+    Thread.sleep(2000);
+    super.tearDown();
+  }
+
+  @Override
+  protected String expandCommand(final String cmd) {
+    String expCmd = cmd;
+    expCmd = expCmd.replaceAll("NAMENODE", namenode);
+    expCmd = expCmd.replaceAll("#LF#", System.getProperty("line.separator"));
+    expCmd = super.expandCommand(expCmd);
+    return expCmd;
+  }
+
+  @Override
+  protected TestConfigFileParser getConfigParser() {
+    return new TestErasureCodingAdmin();
+  }
+
+  private class TestErasureCodingAdmin extends
+      CLITestHelper.TestConfigFileParser {
+    @Override
+    public void endElement(String uri, String localName, String qName)
+        throws SAXException {
+      if (qName.equals("ec-admin-command")) {
+        if (testCommands != null) {
+          testCommands.add(new CLITestCmdErasureCoding(charString,
+              new CLICommandErasureCodingCli()));
+        } else if (cleanupCommands != null) {
+          cleanupCommands.add(new CLITestCmdErasureCoding(charString,
+              new CLICommandErasureCodingCli()));
+        }
+      } else {
+        super.endElement(uri, localName, qName);
+      }
+    }
+  }
+
+  @Override
+  protected Result execute(CLICommand cmd) throws Exception {
+    return cmd.getExecutor(namenode).executeCommand(cmd.getCmd());
+  }
+
+  @Test
+  @Override
+  public void testAll() {
+    super.testAll();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/CLICommandErasureCodingCli.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/CLICommandErasureCodingCli.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/CLICommandErasureCodingCli.java
new file mode 100644
index 0000000..aafcd9f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/CLICommandErasureCodingCli.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cli.util;
+
+public class CLICommandErasureCodingCli implements CLICommandTypes {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/ErasureCodingCliCmdExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/ErasureCodingCliCmdExecutor.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/ErasureCodingCliCmdExecutor.java
new file mode 100644
index 0000000..e993313
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cli/util/ErasureCodingCliCmdExecutor.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.cli.util;
+
+import org.apache.hadoop.hdfs.tools.erasurecode.ECCli;
+import org.apache.hadoop.util.ToolRunner;
+
+public class ErasureCodingCliCmdExecutor extends CommandExecutor {
+  protected String namenode = null;
+  protected ECCli admin = null;
+
+  public ErasureCodingCliCmdExecutor(String namenode, ECCli admin) {
+    this.namenode = namenode;
+    this.admin = admin;
+  }
+
+  @Override
+  protected void execute(final String cmd) throws Exception {
+    String[] args = getCommandAsArgs(cmd, "NAMENODE", this.namenode);
+    ToolRunner.run(admin, args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
index 88b7f37..829cf03 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
@@ -165,20 +165,19 @@ public class BlockReaderTestUtil {
    */
   public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead)
       throws IOException {
-    return getBlockReader(cluster, testBlock, offset, lenToRead);
+    return getBlockReader(cluster.getFileSystem(), testBlock, offset, lenToRead);
   }
 
   /**
    * Get a BlockReader for the given block.
    */
-  public static BlockReader getBlockReader(MiniDFSCluster cluster,
-      LocatedBlock testBlock, int offset, int lenToRead) throws IOException {
+  public static BlockReader getBlockReader(final DistributedFileSystem fs,
+      LocatedBlock testBlock, int offset, long lenToRead) throws IOException {
     InetSocketAddress targetAddr = null;
     ExtendedBlock block = testBlock.getBlock();
     DatanodeInfo[] nodes = testBlock.getLocations();
     targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr());
 
-    final DistributedFileSystem fs = cluster.getFileSystem();
     return new BlockReaderFactory(fs.getClient().getConf()).
       setInetSocketAddress(targetAddr).
       setBlock(block).

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index a742757..3f0d6df 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -66,6 +66,12 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
@@ -105,6 +111,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -125,14 +132,19 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
@@ -153,12 +165,8 @@ import org.junit.Assume;
 import org.mockito.internal.util.reflection.Whitebox;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
 
 /** Utilities for HDFS tests */
 public class DFSTestUtil {
@@ -808,15 +816,21 @@ public class DFSTestUtil {
     return os.toByteArray();
   }
 
-  /* Write the given string to the given file */
-  public static void writeFile(FileSystem fs, Path p, String s) 
+  /* Write the given bytes to the given file */
+  public static void writeFile(FileSystem fs, Path p, byte[] bytes)
       throws IOException {
     if (fs.exists(p)) {
       fs.delete(p, true);
     }
-    InputStream is = new ByteArrayInputStream(s.getBytes());
+    InputStream is = new ByteArrayInputStream(bytes);
     FSDataOutputStream os = fs.create(p);
-    IOUtils.copyBytes(is, os, s.length(), true);
+    IOUtils.copyBytes(is, os, bytes.length, true);
+  }
+
+  /* Write the given string to the given file */
+  public static void writeFile(FileSystem fs, Path p, String s)
+      throws IOException {
+    writeFile(fs, p, s.getBytes());
   }
 
   /* Append the given string to the given file */
@@ -1835,7 +1849,7 @@ public class DFSTestUtil {
     dn.setLastUpdate(Time.now() + offset);
     dn.setLastUpdateMonotonic(Time.monotonicNow() + offset);
   }
-
+  
   /**
    * This method takes a set of block locations and fills the provided buffer
    * with expected bytes based on simulated content from
@@ -1859,4 +1873,150 @@ public class DFSTestUtil {
     }
   }
 
+  public static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock(
+      Block block, BlockStatus blockStatus, DatanodeStorage storage) {
+    ReceivedDeletedBlockInfo[] receivedBlocks = new ReceivedDeletedBlockInfo[1];
+    receivedBlocks[0] = new ReceivedDeletedBlockInfo(block, blockStatus, null);
+    StorageReceivedDeletedBlocks[] reports = new StorageReceivedDeletedBlocks[1];
+    reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks);
+    return reports;
+  }
+
+  /**
+   * Creates the metadata of a file in striped layout. This method only
+   * manipulates the NameNode state without injecting data to DataNode.
+   * You should disable periodical heartbeat before use this.
+   *  @param file Path of the file to create
+   * @param dir Parent path of the file
+   * @param numBlocks Number of striped block groups to add to the file
+   * @param numStripesPerBlk Number of striped cells in each block
+   * @param toMkdir
+   */
+  public static void createStripedFile(MiniDFSCluster cluster, Path file, Path dir,
+      int numBlocks, int numStripesPerBlk, boolean toMkdir) throws Exception {
+    DistributedFileSystem dfs = cluster.getFileSystem();
+    // If outer test already created EC zone, dir should be left as null
+    if (toMkdir) {
+      assert dir != null;
+      dfs.mkdirs(dir);
+      try {
+        dfs.getClient().createErasureCodingZone(dir.toString(), null, 0);
+      } catch (IOException e) {
+        if (!e.getMessage().contains("non-empty directory")) {
+          throw e;
+        }
+      }
+    }
+
+    FSDataOutputStream out = null;
+    try {
+      out = dfs.create(file, (short) 1); // create an empty file
+
+      FSNamesystem ns = cluster.getNamesystem();
+      FSDirectory fsdir = ns.getFSDirectory();
+      INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
+
+      ExtendedBlock previous = null;
+      for (int i = 0; i < numBlocks; i++) {
+        Block newBlock = addStripedBlockToFile(cluster.getDataNodes(), dfs, ns,
+            file.toString(), fileNode, dfs.getClient().getClientName(),
+            previous, numStripesPerBlk);
+        previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock);
+      }
+
+      dfs.getClient().namenode.complete(file.toString(),
+          dfs.getClient().getClientName(), previous, fileNode.getId());
+    } finally {
+      IOUtils.cleanup(null, out);
+    }
+  }
+
+  /**
+   * Adds a striped block group to a file. This method only manipulates NameNode
+   * states of the file and the block without injecting data to DataNode.
+   * It does mimic block reports.
+   * You should disable periodical heartbeat before use this.
+   * @param dataNodes List DataNodes to host the striped block group
+   * @param previous Previous block in the file
+   * @param numStripes Number of stripes in each block group
+   * @return The added block group
+   */
+  public static Block addStripedBlockToFile(List<DataNode> dataNodes,
+      DistributedFileSystem fs, FSNamesystem ns, String file, INodeFile fileNode,
+      String clientName, ExtendedBlock previous, int numStripes)
+      throws Exception {
+    fs.getClient().namenode.addBlock(file, clientName, previous, null,
+        fileNode.getId(), null);
+
+    final BlockInfo lastBlock = fileNode.getLastBlock();
+    final int groupSize = fileNode.getPreferredBlockReplication();
+    assert dataNodes.size() >= groupSize;
+    // 1. RECEIVING_BLOCK IBR
+    for (int i = 0; i < groupSize; i++) {
+      DataNode dn = dataNodes.get(i);
+      final Block block = new Block(lastBlock.getBlockId() + i, 0,
+          lastBlock.getGenerationStamp());
+      DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+      StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+          .makeReportForReceivedBlock(block,
+              ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage);
+      for (StorageReceivedDeletedBlocks report : reports) {
+        ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+      }
+    }
+
+    // 2. RECEIVED_BLOCK IBR
+    for (int i = 0; i < groupSize; i++) {
+      DataNode dn = dataNodes.get(i);
+      final Block block = new Block(lastBlock.getBlockId() + i,
+          numStripes * BLOCK_STRIPED_CELL_SIZE, lastBlock.getGenerationStamp());
+      DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+      StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+          .makeReportForReceivedBlock(block,
+              ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
+      for (StorageReceivedDeletedBlocks report : reports) {
+        ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+      }
+    }
+
+    lastBlock.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS);
+    return lastBlock;
+  }
+
+  /**
+   * Because currently DFSStripedOutputStream does not support hflush/hsync,
+   * tests can use this method to flush all the buffered data to DataNodes.
+   */
+  public static ExtendedBlock flushInternal(DFSStripedOutputStream out)
+      throws IOException {
+    out.flushInternal();
+    return out.getBlock();
+  }
+
+  /**
+   * Verify that blocks in striped block group are on different nodes, and every
+   * internal blocks exists.
+   */
+  public static void verifyLocatedStripedBlocks(LocatedBlocks lbs,
+       int groupSize) {
+    for (LocatedBlock lb : lbs.getLocatedBlocks()) {
+      assert lb instanceof LocatedStripedBlock;
+      HashSet<DatanodeInfo> locs = new HashSet<>();
+      for (DatanodeInfo datanodeInfo : lb.getLocations()) {
+        locs.add(datanodeInfo);
+      }
+      assertEquals(groupSize, lb.getLocations().length);
+      assertEquals(groupSize, locs.size());
+
+      // verify that every internal blocks exists
+      int[] blockIndices = ((LocatedStripedBlock) lb).getBlockIndices();
+      assertEquals(groupSize, blockIndices.length);
+      HashSet<Integer> found = new HashSet<>();
+      for (int index : blockIndices) {
+        assert index >=0;
+        found.add(index);
+      }
+      assertEquals(groupSize, found.size());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 7052321..65e26df 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -2113,8 +2113,6 @@ public class MiniDFSCluster {
     int node = -1;
     for (int i = 0; i < dataNodes.size(); i++) {
       DataNode dn = dataNodes.get(i).datanode;
-      LOG.info("DN name=" + dnName + " found DN=" + dn +
-          " with name=" + dn.getDisplayName());
       if (dnName.equals(dn.getDatanodeId().getXferAddr())) {
         node = i;
         break;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
new file mode 100644
index 0000000..ca4b2aa
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.web.ByteRangeInputStream;
+import org.junit.Assert;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class StripedFileTestUtil {
+  public static final Log LOG = LogFactory.getLog(StripedFileTestUtil.class);
+
+  static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+  static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
+
+  static final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  static final int stripesPerBlock = 4;
+  static final int blockSize = cellSize * stripesPerBlock;
+  static final int numDNs = dataBlocks + parityBlocks + 2;
+
+  static final Random random = new Random();
+
+  static byte[] generateBytes(int cnt) {
+    byte[] bytes = new byte[cnt];
+    for (int i = 0; i < cnt; i++) {
+      bytes[i] = getByte(i);
+    }
+    return bytes;
+  }
+
+  static int readAll(FSDataInputStream in, byte[] buf) throws IOException {
+    int readLen = 0;
+    int ret;
+    while ((ret = in.read(buf, readLen, buf.length - readLen)) >= 0 &&
+        readLen <= buf.length) {
+      readLen += ret;
+    }
+    return readLen;
+  }
+
+  static byte getByte(long pos) {
+    final int mod = 29;
+    return (byte) (pos % mod + 1);
+  }
+
+  static void verifyLength(FileSystem fs, Path srcPath, int fileLength)
+      throws IOException {
+    FileStatus status = fs.getFileStatus(srcPath);
+    Assert.assertEquals("File length should be the same", fileLength, status.getLen());
+  }
+
+  static void verifyPread(FileSystem fs, Path srcPath,  int fileLength,
+      byte[] expected, byte[] buf) throws IOException {
+    try (FSDataInputStream in = fs.open(srcPath)) {
+      int[] startOffsets = {0, 1, cellSize - 102, cellSize, cellSize + 102,
+          cellSize * (dataBlocks - 1), cellSize * (dataBlocks - 1) + 102,
+          cellSize * dataBlocks, fileLength - 102, fileLength - 1};
+      for (int startOffset : startOffsets) {
+        startOffset = Math.max(0, Math.min(startOffset, fileLength - 1));
+        int remaining = fileLength - startOffset;
+        int offset = startOffset;
+        final byte[] result = new byte[remaining];
+        while (remaining > 0) {
+          int target = Math.min(remaining, buf.length);
+          in.readFully(offset, buf, 0, target);
+          System.arraycopy(buf, 0, result, offset - startOffset, target);
+          remaining -= target;
+          offset += target;
+        }
+        for (int i = 0; i < fileLength - startOffset; i++) {
+          Assert.assertEquals("Byte at " + (startOffset + i) + " is different, "
+                  + "the startOffset is " + startOffset,
+              expected[startOffset + i], result[i]);
+        }
+      }
+    }
+  }
+
+  static void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
+      byte[] expected, byte[] buf) throws IOException {
+    try (FSDataInputStream in = fs.open(srcPath)) {
+      final byte[] result = new byte[fileLength];
+      int readLen = 0;
+      int ret;
+      while ((ret = in.read(buf, 0, buf.length)) >= 0) {
+        System.arraycopy(buf, 0, result, readLen, ret);
+        readLen += ret;
+      }
+      Assert.assertEquals("The length of file should be the same to write size",
+          fileLength, readLen);
+      Assert.assertArrayEquals(expected, result);
+    }
+  }
+
+  static void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
+      byte[] expected, ByteBuffer buf) throws IOException {
+    try (FSDataInputStream in = fs.open(srcPath)) {
+      ByteBuffer result = ByteBuffer.allocate(fileLength);
+      int readLen = 0;
+      int ret;
+      while ((ret = in.read(buf)) >= 0) {
+        readLen += ret;
+        buf.flip();
+        result.put(buf);
+        buf.clear();
+      }
+      Assert.assertEquals("The length of file should be the same to write size",
+          fileLength, readLen);
+      Assert.assertArrayEquals(expected, result.array());
+    }
+  }
+
+  static void verifySeek(FileSystem fs, Path srcPath, int fileLength)
+      throws IOException {
+    try (FSDataInputStream in = fs.open(srcPath)) {
+      // seek to 1/2 of content
+      int pos = fileLength / 2;
+      assertSeekAndRead(in, pos, fileLength);
+
+      // seek to 1/3 of content
+      pos = fileLength / 3;
+      assertSeekAndRead(in, pos, fileLength);
+
+      // seek to 0 pos
+      pos = 0;
+      assertSeekAndRead(in, pos, fileLength);
+
+      if (fileLength > cellSize) {
+        // seek to cellSize boundary
+        pos = cellSize - 1;
+        assertSeekAndRead(in, pos, fileLength);
+      }
+
+      if (fileLength > cellSize * dataBlocks) {
+        // seek to striped cell group boundary
+        pos = cellSize * dataBlocks - 1;
+        assertSeekAndRead(in, pos, fileLength);
+      }
+
+      if (fileLength > blockSize * dataBlocks) {
+        // seek to striped block group boundary
+        pos = blockSize * dataBlocks - 1;
+        assertSeekAndRead(in, pos, fileLength);
+      }
+
+      if (!(in.getWrappedStream() instanceof ByteRangeInputStream)) {
+        try {
+          in.seek(-1);
+          Assert.fail("Should be failed if seek to negative offset");
+        } catch (EOFException e) {
+          // expected
+        }
+
+        try {
+          in.seek(fileLength + 1);
+          Assert.fail("Should be failed if seek after EOF");
+        } catch (EOFException e) {
+          // expected
+        }
+      }
+    }
+  }
+
+  static void assertSeekAndRead(FSDataInputStream fsdis, int pos,
+      int writeBytes) throws IOException {
+    fsdis.seek(pos);
+    byte[] buf = new byte[writeBytes];
+    int readLen = StripedFileTestUtil.readAll(fsdis, buf);
+    Assert.assertEquals(readLen, writeBytes - pos);
+    for (int i = 0; i < readLen; i++) {
+      Assert.assertEquals("Byte at " + i + " should be the same",
+          StripedFileTestUtil.getByte(pos + i), buf[i]);
+    }
+  }
+
+  static void killDatanode(MiniDFSCluster cluster, DFSStripedOutputStream out,
+      final int dnIndex, final AtomicInteger pos) {
+    final StripedDataStreamer s = out.getStripedDataStreamer(dnIndex);
+    final DatanodeInfo datanode = getDatanodes(s);
+    LOG.info("killDatanode " + dnIndex + ": " + datanode + ", pos=" + pos);
+    cluster.stopDataNode(datanode.getXferAddr());
+  }
+
+  static DatanodeInfo getDatanodes(StripedDataStreamer streamer) {
+    for(;;) {
+      final DatanodeInfo[] datanodes = streamer.getNodes();
+      if (datanodes != null) {
+        Assert.assertEquals(1, datanodes.length);
+        Assert.assertNotNull(datanodes[0]);
+        return datanodes[0];
+      }
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException ignored) {
+        return null;
+      }
+    }
+  }
+
+  /**
+   * Generate n random and different numbers within
+   * specified non-negative integer range
+   * @param min minimum of the range
+   * @param max maximum of the range
+   * @param n number to be generated
+   * @return
+   */
+  public static int[] randomArray(int min, int max, int n){
+    if (n > (max - min + 1) || max < min || min < 0 || max < 0) {
+      return null;
+    }
+    int[] result = new int[n];
+    for (int i = 0; i < n; i++) {
+      result[i] = -1;
+    }
+
+    int count = 0;
+    while(count < n) {
+      int num = (int) (Math.random() * (max - min)) + min;
+      boolean flag = true;
+      for (int j = 0; j < n; j++) {
+        if(num == result[j]){
+          flag = false;
+          break;
+        }
+      }
+      if(flag){
+        result[count] = num;
+        count++;
+      }
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
index d8aceff..1a767c3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java
@@ -250,8 +250,8 @@ public class TestBlockReaderFactory {
           LocatedBlock lblock = locatedBlocks.get(0); // first block
           BlockReader blockReader = null;
           try {
-            blockReader = BlockReaderTestUtil.
-                getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
+            blockReader = BlockReaderTestUtil.getBlockReader(
+                cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN);
             Assert.fail("expected getBlockReader to fail the first time.");
           } catch (Throwable t) { 
             Assert.assertTrue("expected to see 'TCP reads were disabled " +
@@ -265,8 +265,8 @@ public class TestBlockReaderFactory {
 
           // Second time should succeed.
           try {
-            blockReader = BlockReaderTestUtil.
-                getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
+            blockReader = BlockReaderTestUtil.getBlockReader(
+                cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN);
           } catch (Throwable t) { 
             LOG.error("error trying to retrieve a block reader " +
                 "the second time.", t);
@@ -474,8 +474,8 @@ public class TestBlockReaderFactory {
           while (true) {
             BlockReader blockReader = null;
             try {
-              blockReader = BlockReaderTestUtil.
-                  getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
+              blockReader = BlockReaderTestUtil.getBlockReader(
+                  cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN);
               sem.release();
               try {
                 blockReader.readAll(buf, 0, TEST_FILE_LEN);
@@ -514,8 +514,8 @@ public class TestBlockReaderFactory {
     // getting a ClosedChannelException.
     BlockReader blockReader = null;
     try {
-      blockReader = BlockReaderTestUtil.
-          getBlockReader(cluster, lblock, 0, TEST_FILE_LEN);
+      blockReader = BlockReaderTestUtil.getBlockReader(
+          cluster.getFileSystem(), lblock, 0, TEST_FILE_LEN);
       blockReader.readFully(buf, 0, TEST_FILE_LEN);
     } finally {
       if (blockReader != null) blockReader.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
index 441ef9c..c68bd28 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
@@ -257,12 +257,12 @@ public class TestDFSClientRetries {
     Mockito.doReturn(
             new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
                 (short) 777), "owner", "group", new byte[0], new byte[0],
-                1010, 0, null, (byte) 0)).when(mockNN).getFileInfo(anyString());
+                1010, 0, null, (byte) 0, null, 0)).when(mockNN).getFileInfo(anyString());
     
     Mockito.doReturn(
             new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
                 (short) 777), "owner", "group", new byte[0], new byte[0],
-                1010, 0, null, (byte) 0))
+                1010, 0, null, (byte) 0, null, 0))
         .when(mockNN)
         .create(anyString(), (FsPermission) anyObject(), anyString(),
             (EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
@@ -550,7 +550,7 @@ public class TestDFSClientRetries {
       badBlocks.add(badLocatedBlock);
       return new LocatedBlocks(goodBlockList.getFileLength(), false,
                                badBlocks, null, true,
-                               null);
+                               null, null, 0);
     }
   }
   


Mime
View raw message