hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z..@apache.org
Subject [01/12] hadoop git commit: HDFS-7782. Erasure coding: pread from files in striped layout. Contributed by Zhe Zhang and Jing Zhao
Date Mon, 13 Apr 2015 21:08:32 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7285 21d0ce4cc -> 6e202bac1 (forced update)


HDFS-7782. Erasure coding: pread from files in striped layout. Contributed by Zhe Zhang and Jing Zhao


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ae8f0c12
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ae8f0c12
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ae8f0c12

Branch: refs/heads/HDFS-7285
Commit: ae8f0c12db2db9212e7e50a4219d24a3c27110e3
Parents: 38fa860
Author: Zhe Zhang <zhz@apache.org>
Authored: Tue Apr 7 11:20:13 2015 -0700
Committer: Zhe Zhang <zhz@apache.org>
Committed: Mon Apr 13 14:08:12 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |  55 +++
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   8 +-
 .../org/apache/hadoop/hdfs/DFSInputStream.java  |  79 +++-
 .../hadoop/hdfs/DFSStripedInputStream.java      | 367 +++++++++++++++++++
 .../hadoop/hdfs/protocol/HdfsConstants.java     |   2 +-
 .../hadoop/hdfs/protocol/LocatedBlock.java      |   4 +
 .../hdfs/protocol/LocatedStripedBlock.java      |   5 +
 .../blockmanagement/BlockInfoStriped.java       |   6 +-
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  92 ++++-
 .../apache/hadoop/hdfs/TestReadStripedFile.java | 304 +++++++++++++++
 .../namenode/TestRecoverStripedBlocks.java      |  88 +----
 11 files changed, 897 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index cd06e09..2ef1d36 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -236,6 +236,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   private static final DFSHedgedReadMetrics HEDGED_READ_METRIC =
       new DFSHedgedReadMetrics();
   private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
+  private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
   private final Sampler<?> traceSampler;
 
   public DfsClientConf getConf() {
@@ -376,6 +377,19 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     if (numThreads > 0) {
       this.initThreadsNumForHedgedReads(numThreads);
     }
+    numThreads = conf.getInt(
+        DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE,
+        DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE);
+    if (numThreads <= 0) {
+      LOG.warn("The value of "
+          + DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE
+          + " must be greater than 0. The current setting is " + numThreads
+          + ". Reset it to the default value "
+          + DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE);
+      numThreads =
+          DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE;
+    }
+    this.initThreadsNumForStripedReads(numThreads);
     this.saslClient = new SaslDataTransferClient(
       conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
       TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
@@ -3148,6 +3162,43 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     }
   }
 
+  /**
+   * Create thread pool for parallel reading in striped layout,
+   * STRIPED_READ_THREAD_POOL, if it does not already exist.
+   * @param num Number of threads for striped reads thread pool.
+   */
+  private void initThreadsNumForStripedReads(int num) {
+    assert num > 0;
+    if (STRIPED_READ_THREAD_POOL != null) {
+      return;
+    }
+    synchronized (DFSClient.class) {
+      if (STRIPED_READ_THREAD_POOL == null) {
+        STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
+            TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+            new Daemon.DaemonFactory() {
+          private final AtomicInteger threadIndex = new AtomicInteger(0);
+
+          @Override
+          public Thread newThread(Runnable r) {
+            Thread t = super.newThread(r);
+            t.setName("stripedRead-" + threadIndex.getAndIncrement());
+            return t;
+          }
+        }, new ThreadPoolExecutor.CallerRunsPolicy() {
+          @Override
+          public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
+            LOG.info("Execution for striped reading rejected, "
+                + "Executing in current thread");
+            // will run in the current thread
+            super.rejectedExecution(runnable, e);
+          }
+        });
+        STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
+      }
+    }
+  }
+
   long getHedgedReadTimeout() {
     return this.hedgedReadThresholdMillis;
   }
@@ -3161,6 +3212,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     return HEDGED_READ_THREAD_POOL;
   }
 
+  ThreadPoolExecutor getStripedReadsThreadPool() {
+    return STRIPED_READ_THREAD_POOL;
+  }
+
   boolean isHedgedReadsEnabled() {
     return (HEDGED_READ_THREAD_POOL != null) &&
       HEDGED_READ_THREAD_POOL.getMaximumPoolSize() > 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 6185604..35e279d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -645,7 +645,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "dfs.namenode.reject-unresolved-dn-topology-mapping";
   public static final boolean DFS_REJECT_UNRESOLVED_DN_TOPOLOGY_MAPPING_DEFAULT =
       false;
-  
+
+  public static final String DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE =
+      "dfs.client.striped.read.threadpool.size";
+  // With default 3+2 schema, each normal read could span 3 DNs. So this
+  // default value accommodates 6 read streams
+  public static final int DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE = 18;
+
   // Slow io warning log threshold settings for dfsclient and datanode.
   public static final String DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY =
     "dfs.datanode.slow.io.warning.threshold.ms";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index dd0f6fe..e94c41a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -43,6 +43,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ByteBufferReadable;
@@ -93,7 +94,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   @VisibleForTesting
   public static boolean tcpReadsDisabledForTesting = false;
   private long hedgedReadOpsLoopNumForTesting = 0;
-  private final DFSClient dfsClient;
+  protected final DFSClient dfsClient;
   private AtomicBoolean closed = new AtomicBoolean(false);
   private final String src;
   private final boolean verifyChecksum;
@@ -440,7 +441,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * @return located block
    * @throws IOException
    */
-  private LocatedBlock getBlockAt(long offset) throws IOException {
+  protected LocatedBlock getBlockAt(long offset) throws IOException {
     synchronized(infoLock) {
       assert (locatedBlocks != null) : "locatedBlocks is null";
 
@@ -705,7 +706,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * Wraps different possible read implementations so that readBuffer can be
    * strategy-agnostic.
    */
-  private interface ReaderStrategy {
+  interface ReaderStrategy {
     public int doRead(BlockReader blockReader, int off, int len)
         throws ChecksumException, IOException;
   }
@@ -1048,7 +1049,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     return errMsgr.toString();
   }
 
-  private void fetchBlockByteRange(long blockStartOffset, long start, long end,
+  protected void fetchBlockByteRange(long blockStartOffset, long start, long end,
       byte[] buf, int offset,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
@@ -1090,13 +1091,42 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     };
   }
 
+  /**
+   * Used when reading contiguous blocks
+   */
   private void actualGetFromOneDataNode(final DNAddrPair datanode,
       long blockStartOffset, final long start, final long end, byte[] buf,
       int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
+    final int length = (int) (end - start + 1);
+    actualGetFromOneDataNode(datanode, block, start, end, buf,
+        new int[]{offset}, new int[]{length}, corruptedBlockMap);
+  }
+
+  /**
+   * Read data from one DataNode.
+   * @param datanode the datanode from which to read data
+   * @param block the block to read
+   * @param startInBlk the startInBlk offset of the block
+   * @param endInBlk the endInBlk offset of the block
+   * @param buf the given byte array into which the data is read
+   * @param offsets the data may be read into multiple segments of the buf
+   *                (when reading a striped block). this array indicates the
+   *                offset of each buf segment.
+   * @param lengths the length of each buf segment
+   * @param corruptedBlockMap map recording list of datanodes with corrupted
+   *                          block replica
+   */
+  void actualGetFromOneDataNode(final DNAddrPair datanode,
+      LocatedBlock block, final long startInBlk, final long endInBlk,
+      byte[] buf, int[] offsets, int[] lengths,
+      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      throws IOException {
     DFSClientFaultInjector.get().startFetchFromDatanode();
     int refetchToken = 1; // only need to get a new access token once
     int refetchEncryptionKey = 1; // only need to get a new encryption key once
+    final int len = (int) (endInBlk - startInBlk + 1);
+    checkReadPortions(offsets, lengths, len);
 
     while (true) {
       // cached block locations may have been updated by chooseDataNode()
@@ -1117,7 +1147,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       try {
         DFSClientFaultInjector.get().fetchFromDatanodeException();
         Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
-        int len = (int) (end - start + 1);
         reader = new BlockReaderFactory(dfsClient.getConf()).
             setInetSocketAddress(targetAddr).
             setRemotePeerFactory(dfsClient).
@@ -1126,7 +1155,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
             setFileName(src).
             setBlock(block.getBlock()).
             setBlockToken(blockToken).
-            setStartOffset(start).
+            setStartOffset(startInBlk).
             setVerifyChecksum(verifyChecksum).
             setClientName(dfsClient.clientName).
             setLength(len).
@@ -1136,12 +1165,14 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
             setUserGroupInformation(dfsClient.ugi).
             setConfiguration(dfsClient.getConfiguration()).
             build();
-        int nread = reader.readAll(buf, offset, len);
-        updateReadStatistics(readStatistics, nread, reader);
+        for (int i = 0; i < offsets.length; i++) {
+          int nread = reader.readAll(buf, offsets[i], lengths[i]);
+          updateReadStatistics(readStatistics, nread, reader);
 
-        if (nread != len) {
-          throw new IOException("truncated return from reader.read(): " +
-                                "excpected " + len + ", got " + nread);
+          if (nread != len) {
+            throw new IOException("truncated return from reader.read(): " +
+                "excpected " + len + ", got " + nread);
+          }
         }
         DFSClientFaultInjector.get().readFromDatanodeDelay();
         return;
@@ -1187,6 +1218,25 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   }
 
   /**
+   * This method verifies that the read portions are valid and do not overlap
+   * with each other.
+   */
+  private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) {
+    Preconditions.checkArgument(offsets.length == lengths.length &&
+        offsets.length > 0);
+    int sum = 0;
+    for (int i = 0; i < lengths.length; i++) {
+      if (i > 0) {
+        int gap = offsets[i] - offsets[i - 1];
+        // make sure read portions do not overlap with each other
+        Preconditions.checkArgument(gap >= lengths[i - 1]);
+      }
+      sum += lengths[i];
+    }
+    Preconditions.checkArgument(sum == totalLen);
+  }
+
+  /**
    * Like {@link #fetchBlockByteRange(LocatedBlock, long, long, byte[],
    * int, Map)} except we start up a second, parallel, 'hedged' read
    * if the first read is taking longer than configured amount of
@@ -1407,10 +1457,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       long targetStart = position - blk.getStartOffset();
       long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
       try {
-        if (dfsClient.isHedgedReadsEnabled()) {
+        if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) {
           hedgedFetchBlockByteRange(blk.getStartOffset(), targetStart,
-              targetStart + bytesToRead - 1, buffer, offset,
-              corruptedBlockMap);
+              targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
         } else {
           fetchBlockByteRange(blk.getStartOffset(), targetStart,
               targetStart + bytesToRead - 1, buffer, offset,
@@ -1606,7 +1655,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   }
 
   /** Utility class to encapsulate data node info and its address. */
-  private static final class DNAddrPair {
+  static final class DNAddrPair {
     final DatanodeInfo info;
     final InetSocketAddress addr;
     final StorageType storageType;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
new file mode 100644
index 0000000..077b0f8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+
+/******************************************************************************
+ * DFSStripedInputStream reads from striped block groups, illustrated below:
+ *
+ * | <- Striped Block Group -> |
+ *  blk_0      blk_1       blk_2   <- A striped block group has
+ *    |          |           |          {@link #groupSize} blocks
+ *    v          v           v
+ * +------+   +------+   +------+
+ * |cell_0|   |cell_1|   |cell_2|  <- The logical read order should be
+ * +------+   +------+   +------+       cell_0, cell_1, ...
+ * |cell_3|   |cell_4|   |cell_5|
+ * +------+   +------+   +------+
+ * |cell_6|   |cell_7|   |cell_8|
+ * +------+   +------+   +------+
+ * |cell_9|
+ * +------+  <- A cell contains {@link #cellSize} bytes of data
+ *
+ * Three styles of read will eventually be supported:
+ *   1. Stateful read: TODO: HDFS-8033
+ *   2. pread without decode support
+ *     This is implemented by calculating the portion of read from each block and
+ *     issuing requests to each DataNode in parallel.
+ *   3. pread with decode support: TODO: will be supported after HDFS-7678
+ *****************************************************************************/
+public class DFSStripedInputStream extends DFSInputStream {
+  /**
+   * This method plans the read portion from each block in the stripe
+   * @param groupSize The size / width of the striping group
+   * @param cellSize The size of each striping cell
+   * @param startInBlk Starting offset in the striped block
+   * @param len Length of the read request
+   * @param bufOffset  Initial offset in the result buffer
+   * @return array of {@link ReadPortion}, each representing the portion of I/O
+   *         for an individual block in the group
+   */
+  @VisibleForTesting
+  static ReadPortion[] planReadPortions(final int groupSize,
+      final int cellSize, final long startInBlk, final int len, int bufOffset) {
+    ReadPortion[] results = new ReadPortion[groupSize];
+    for (int i = 0; i < groupSize; i++) {
+      results[i] = new ReadPortion();
+    }
+
+    // cellIdxInBlk is the index of the cell in the block
+    // E.g., cell_3 is the 2nd cell in blk_0
+    int cellIdxInBlk = (int) (startInBlk / (cellSize * groupSize));
+
+    // blkIdxInGroup is the index of the block in the striped block group
+    // E.g., blk_2 is the 3rd block in the group
+    final int blkIdxInGroup = (int) (startInBlk / cellSize % groupSize);
+    results[blkIdxInGroup].startOffsetInBlock = cellSize * cellIdxInBlk +
+        startInBlk % cellSize;
+    boolean crossStripe = false;
+    for (int i = 1; i < groupSize; i++) {
+      if (blkIdxInGroup + i >= groupSize && !crossStripe) {
+        cellIdxInBlk++;
+        crossStripe = true;
+      }
+      results[(blkIdxInGroup + i) % groupSize].startOffsetInBlock =
+          cellSize * cellIdxInBlk;
+    }
+
+    int firstCellLen = Math.min(cellSize - (int) (startInBlk % cellSize), len);
+    results[blkIdxInGroup].offsetsInBuf.add(bufOffset);
+    results[blkIdxInGroup].lengths.add(firstCellLen);
+    results[blkIdxInGroup].readLength += firstCellLen;
+
+    int i = (blkIdxInGroup + 1) % groupSize;
+    for (int done = firstCellLen; done < len; done += cellSize) {
+      ReadPortion rp = results[i];
+      rp.offsetsInBuf.add(done + bufOffset);
+      final int readLen = Math.min(len - done, cellSize);
+      rp.lengths.add(readLen);
+      rp.readLength += readLen;
+      i = (i + 1) % groupSize;
+    }
+    return results;
+  }
+
+  /**
+   * This method parses a striped block group into individual blocks.
+   *
+   * @param bg The striped block group
+   * @param dataBlkNum the number of data blocks
+   * @return An array containing the blocks in the group
+   */
+  @VisibleForTesting
+  static LocatedBlock[] parseStripedBlockGroup(LocatedStripedBlock bg,
+      int dataBlkNum, int cellSize) {
+    int locatedBGSize = bg.getBlockIndices().length;
+    // TODO not considering missing blocks for now, only identify data blocks
+    LocatedBlock[] lbs = new LocatedBlock[dataBlkNum];
+    for (short i = 0; i < locatedBGSize; i++) {
+      final int idx = bg.getBlockIndices()[i];
+      if (idx < dataBlkNum && lbs[idx] == null) {
+        lbs[idx] = constructInternalBlock(bg, i, cellSize, idx);
+      }
+    }
+    return lbs;
+  }
+
+  private static LocatedBlock constructInternalBlock(LocatedStripedBlock bg,
+      int idxInReturnedLocs, int cellSize, int idxInBlockGroup) {
+    final ExtendedBlock blk = new ExtendedBlock(bg.getBlock());
+    blk.setBlockId(bg.getBlock().getBlockId() + idxInBlockGroup);
+    // TODO: fix the numBytes computation
+
+    return new LocatedBlock(blk,
+        new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
+        new String[]{bg.getStorageIDs()[idxInReturnedLocs]},
+        new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]},
+        bg.getStartOffset() + idxInBlockGroup * cellSize, bg.isCorrupt(),
+        null);
+  }
+
+
+  private int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS;
+
+  DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum)
+      throws IOException {
+    super(dfsClient, src, verifyChecksum);
+    DFSClient.LOG.debug("Creating an striped input stream for file " + src);
+  }
+
+  @Override
+  public synchronized int read(final ByteBuffer buf) throws IOException {
+    throw new UnsupportedActionException("Stateful read is not supported");
+  }
+
+  @Override
+  public synchronized int read(final byte buf[], int off, int len)
+      throws IOException {
+    throw new UnsupportedActionException("Stateful read is not supported");
+  }
+
+  /**
+   * | <--------- LocatedStripedBlock (ID = 0) ---------> |
+   * LocatedBlock (0) | LocatedBlock (1) | LocatedBlock (2)
+   *                      ^
+   *                    offset
+   * On a striped file, the super method {@link DFSInputStream#getBlockAt}
+   * treats a striped block group as a single {@link LocatedBlock} object,
+   * which includes target in its range. This method adds the logic of:
+   *   1. Analyzing the index of required block based on offset
+   *   2. Parsing the block group to obtain the block location on that index
+   */
+  @Override
+  protected LocatedBlock getBlockAt(long blkStartOffset) throws IOException {
+    LocatedBlock lb = super.getBlockAt(blkStartOffset);
+    assert lb instanceof LocatedStripedBlock : "NameNode should return a " +
+        "LocatedStripedBlock for a striped file";
+
+    int idx = (int) (((blkStartOffset - lb.getStartOffset()) / cellSize)
+        % groupSize);
+    // If indexing information is returned, iterate through the index array
+    // to find the entry for position idx in the group
+    LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
+    int i = 0;
+    for (; i < lsb.getBlockIndices().length; i++) {
+      if (lsb.getBlockIndices()[i] == idx) {
+        break;
+      }
+    }
+    if (DFSClient.LOG.isDebugEnabled()) {
+      DFSClient.LOG.debug("getBlockAt for striped blocks, offset="
+          + blkStartOffset + ". Obtained block " + lb + ", idx=" + idx);
+    }
+    return constructInternalBlock(lsb, i, cellSize, idx);
+  }
+
+  private LocatedBlock getBlockGroupAt(long offset) throws IOException {
+    return super.getBlockAt(offset);
+  }
+
+  /**
+   * Real implementation of pread.
+   */
+  @Override
+  protected void fetchBlockByteRange(LocatedBlock block, long start,
+      long end, byte[] buf, int offset,
+      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      throws IOException {
+    Map<Future<Void>, Integer> futures = new HashMap<>();
+    CompletionService<Void> stripedReadsService =
+        new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
+    int len = (int) (end - start + 1);
+
+    // Refresh the striped block group
+    block = getBlockGroupAt(block.getStartOffset());
+    assert block instanceof LocatedStripedBlock : "NameNode" +
+        " should return a LocatedStripedBlock for a striped file";
+    LocatedStripedBlock blockGroup = (LocatedStripedBlock) block;
+
+    // Planning the portion of I/O for each shard
+    ReadPortion[] readPortions = planReadPortions(groupSize, cellSize, start,
+        len, offset);
+
+    // Parse group to get chosen DN location
+    LocatedBlock[] blks = parseStripedBlockGroup(blockGroup, groupSize, cellSize);
+
+    for (short i = 0; i < groupSize; i++) {
+      ReadPortion rp = readPortions[i];
+      if (rp.readLength <= 0) {
+        continue;
+      }
+      DatanodeInfo loc = blks[i].getLocations()[0];
+      StorageType type = blks[i].getStorageTypes()[0];
+      DNAddrPair dnAddr = new DNAddrPair(loc, NetUtils.createSocketAddr(
+          loc.getXferAddr(dfsClient.getConf().connectToDnViaHostname)), type);
+      Callable<Void> readCallable = getFromOneDataNode(dnAddr, blks[i],
+          rp.startOffsetInBlock, rp.startOffsetInBlock + rp.readLength - 1, buf,
+          rp.getOffsets(), rp.getLengths(), corruptedBlockMap, i);
+      Future<Void> getFromDNRequest = stripedReadsService.submit(readCallable);
+      DFSClient.LOG.debug("Submitting striped read request for " + blks[i]);
+      futures.put(getFromDNRequest, (int) i);
+    }
+    while (!futures.isEmpty()) {
+      try {
+        waitNextCompletion(stripedReadsService, futures);
+      } catch (InterruptedException ie) {
+        // Ignore and retry
+      }
+    }
+  }
+
+  private Callable<Void> getFromOneDataNode(final DNAddrPair datanode,
+      final LocatedBlock block, final long start, final long end,
+      final byte[] buf, final int[] offsets, final int[] lengths,
+      final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
+      final int hedgedReadId) {
+    final Span parentSpan = Trace.currentSpan();
+    return new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        TraceScope scope =
+            Trace.startSpan("Parallel reading " + hedgedReadId, parentSpan);
+        try {
+          actualGetFromOneDataNode(datanode, block, start,
+              end, buf, offsets, lengths, corruptedBlockMap);
+        } finally {
+          scope.close();
+        }
+        return null;
+      }
+    };
+  }
+
+  private void waitNextCompletion(CompletionService<Void> stripedReadsService,
+      Map<Future<Void>, Integer> futures) throws InterruptedException {
+    if (futures.isEmpty()) {
+      throw new InterruptedException("Futures already empty");
+    }
+    Future<Void> future = null;
+    try {
+      future = stripedReadsService.take();
+      future.get();
+      futures.remove(future);
+    } catch (ExecutionException | CancellationException e) {
+      // already logged in the Callable
+      futures.remove(future);
+    }
+    throw new InterruptedException("let's retry");
+  }
+
+  public void setCellSize(int cellSize) {
+    this.cellSize = cellSize;
+  }
+
+  /**
+   * This class represents the portion of I/O associated with each block in the
+   * striped block group.
+   */
+  static class ReadPortion {
+    /**
+     * startOffsetInBlock
+     *     |
+     *     v
+     *     |<-lengths[0]->|<-  lengths[1]  ->|<-lengths[2]->|
+     * +------------------+------------------+----------------+
+     * |      cell_0      |      cell_3      |     cell_6     |  <- blk_0
+     * +------------------+------------------+----------------+
+     *   _/                \_______________________
+     *  |                                          |
+     *  v offsetsInBuf[0]                          v offsetsInBuf[1]
+     * +------------------------------------------------------+
+     * |  cell_0     |      cell_1 and cell_2      |cell_3 ...|   <- buf
+     * |  (partial)  |    (from blk_1 and blk_2)   |          |
+     * +------------------------------------------------------+
+     */
+    private long startOffsetInBlock = 0;
+    private long readLength = 0;
+    private final List<Integer> offsetsInBuf = new ArrayList<>();
+    private final List<Integer> lengths = new ArrayList<>();
+
+    int[] getOffsets() {
+      int[] offsets = new int[offsetsInBuf.size()];
+      for (int i = 0; i < offsets.length; i++) {
+        offsets[i] = offsetsInBuf.get(i);
+      }
+      return offsets;
+    }
+
+    int[] getLengths() {
+      int[] lens = new int[this.lengths.size()];
+      for (int i = 0; i < lens.length; i++) {
+        lens[i] = this.lengths.get(i);
+      }
+      return lens;
+    }
+
+    long getReadLength() {
+      return readLength;
+    }
+
+    long getStartOffsetInBlock() {
+      return startOffsetInBlock;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
index b2882af..a888aa4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
@@ -186,5 +186,5 @@ public class HdfsConstants {
   public static final byte MAX_BLOCKS_IN_GROUP = 16;
 
   // The chunk size for striped block which is used by erasure coding
-  public static final int BLOCK_STRIPED_CHUNK_SIZE = 64 * 1024;
+  public static final int BLOCK_STRIPED_CELL_SIZE = 128 * 1024;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
index a38e8f2..4ba8193 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
@@ -212,5 +212,9 @@ public class LocatedBlock {
         + "; locs=" + Arrays.asList(locs)
         + "}";
   }
+
+  public boolean isStriped() {
+    return false;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
index 97e3a69..98614db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
@@ -65,4 +65,9 @@ public class LocatedStripedBlock extends LocatedBlock {
   public int[] getBlockIndices() {
     return this.blockIndices;
   }
+
+  @Override
+  public boolean isStriped() {
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
index 4a85efb..20b0c5c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CHUNK_SIZE;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
 
 /**
  * Subclass of {@link BlockInfo}, presenting a block group in erasure coding.
@@ -203,8 +203,8 @@ public class BlockInfoStriped extends BlockInfo {
     // In case striped blocks, total usage by this striped blocks should
     // be the total of data blocks and parity blocks because
     // `getNumBytes` is the total of actual data block size.
-    return ((getNumBytes() - 1) / (dataBlockNum * BLOCK_STRIPED_CHUNK_SIZE) + 1)
-        * BLOCK_STRIPED_CHUNK_SIZE * parityBlockNum + getNumBytes();
+    return ((getNumBytes() - 1) / (dataBlockNum * BLOCK_STRIPED_CELL_SIZE) + 1)
+        * BLOCK_STRIPED_CELL_SIZE * parityBlockNum + getNumBytes();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index dc91c37..e3a85cc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -64,6 +64,12 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeoutException;
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
@@ -100,6 +106,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -107,7 +114,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
@@ -122,8 +128,10 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeLayoutVersion;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
@@ -131,7 +139,6 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
-import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.io.IOUtils;
@@ -151,12 +158,8 @@ import org.apache.log4j.Level;
 import org.junit.Assume;
 import org.mockito.internal.util.reflection.Whitebox;
 
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
 
 /** Utilities for HDFS tests */
 public class DFSTestUtil {
@@ -1807,4 +1810,77 @@ public class DFSTestUtil {
     reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks);
     return reports;
   }
+
+  public static void createECFile(MiniDFSCluster cluster, Path file, Path dir,
+      int numBlocks, int numStripesPerBlk) throws Exception {
+    DistributedFileSystem dfs = cluster.getFileSystem();
+    dfs.mkdirs(dir);
+    dfs.getClient().createErasureCodingZone(dir.toString());
+
+    FSDataOutputStream out = null;
+    try {
+      out = dfs.create(file, (short) 1); // create an empty file
+
+      FSNamesystem ns = cluster.getNamesystem();
+      FSDirectory fsdir = ns.getFSDirectory();
+      INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
+
+      ExtendedBlock previous = null;
+      for (int i = 0; i < numBlocks; i++) {
+        Block newBlock = createBlock(cluster.getDataNodes(), dfs, ns,
+            file.toString(), fileNode, dfs.getClient().getClientName(),
+            previous, numStripesPerBlk);
+        previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock);
+      }
+
+      dfs.getClient().namenode.complete(file.toString(),
+          dfs.getClient().getClientName(), previous, fileNode.getId());
+    } finally {
+      IOUtils.cleanup(null, out);
+    }
+  }
+
+  static Block createBlock(List<DataNode> dataNodes, DistributedFileSystem fs,
+      FSNamesystem ns, String file, INodeFile fileNode, String clientName,
+      ExtendedBlock previous, int numStripes) throws Exception {
+    fs.getClient().namenode.addBlock(file, clientName, previous, null,
+        fileNode.getId(), null);
+
+    final BlockInfo lastBlock = fileNode.getLastBlock();
+    final int groupSize = fileNode.getBlockReplication();
+    // 1. RECEIVING_BLOCK IBR
+    int i = 0;
+    for (DataNode dn : dataNodes) {
+      if (i < groupSize) {
+        final Block block = new Block(lastBlock.getBlockId() + i++, 0,
+            lastBlock.getGenerationStamp());
+        DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+        StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+            .makeReportForReceivedBlock(block,
+                ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage);
+        for (StorageReceivedDeletedBlocks report : reports) {
+          ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+        }
+      }
+    }
+
+    // 2. RECEIVED_BLOCK IBR
+    i = 0;
+    for (DataNode dn : dataNodes) {
+      if (i < groupSize) {
+        final Block block = new Block(lastBlock.getBlockId() + i++,
+            numStripes * BLOCK_STRIPED_CELL_SIZE, lastBlock.getGenerationStamp());
+        DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+        StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+            .makeReportForReceivedBlock(block,
+                ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
+        for (StorageReceivedDeletedBlocks report : reports) {
+          ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+        }
+      }
+    }
+
+    lastBlock.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS);
+    return lastBlock;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
new file mode 100644
index 0000000..0032bdd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFile.java
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import static org.apache.hadoop.hdfs.DFSStripedInputStream.ReadPortion;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+public class TestReadStripedFile {
+
+  public static final Log LOG = LogFactory.getLog(TestReadStripedFile.class);
+
+  private MiniDFSCluster cluster;
+  private Configuration conf = new Configuration();
+  private DistributedFileSystem fs;
+  private final Path dirPath = new Path("/striped");
+  private Path filePath = new Path(dirPath, "file");
+  private final short GROUP_SIZE = HdfsConstants.NUM_DATA_BLOCKS;
+  private final short TOTAL_SIZE = HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS;
+  private final int CELLSIZE = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private final int NUM_STRIPE_PER_BLOCK = 2;
+  private final int BLOCKSIZE = 2 * GROUP_SIZE * CELLSIZE;
+
+  @Before
+  public void setup() throws IOException {
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
+    SimulatedFSDataset.setFactory(conf);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(TOTAL_SIZE)
+        .build();
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  private void testPlanReadPortions(int startInBlk, int length,
+      int bufferOffset, int[] readLengths, int[] offsetsInBlock,
+      int[][] bufferOffsets, int[][] bufferLengths) {
+    ReadPortion[] results = DFSStripedInputStream.planReadPortions(GROUP_SIZE,
+        CELLSIZE, startInBlk, length, bufferOffset);
+    assertEquals(GROUP_SIZE, results.length);
+
+    for (int i = 0; i < GROUP_SIZE; i++) {
+      assertEquals(readLengths[i], results[i].getReadLength());
+      assertEquals(offsetsInBlock[i], results[i].getStartOffsetInBlock());
+      final int[] bOffsets = results[i].getOffsets();
+      assertArrayEquals(bufferOffsets[i], bOffsets);
+      final int[] bLengths = results[i].getLengths();
+      assertArrayEquals(bufferLengths[i], bLengths);
+    }
+  }
+
+  /**
+   * Test {@link DFSStripedInputStream#planReadPortions}
+   */
+  @Test
+  public void testPlanReadPortions() {
+    /**
+     * start block offset is 0, read cellSize - 10
+     */
+    testPlanReadPortions(0, CELLSIZE - 10, 0,
+        new int[]{CELLSIZE - 10, 0, 0}, new int[]{0, 0, 0},
+        new int[][]{new int[]{0}, new int[]{}, new int[]{}},
+        new int[][]{new int[]{CELLSIZE - 10}, new int[]{}, new int[]{}});
+
+    /**
+     * start block offset is 0, read 3 * cellSize
+     */
+    testPlanReadPortions(0, GROUP_SIZE * CELLSIZE, 0,
+        new int[]{CELLSIZE, CELLSIZE, CELLSIZE}, new int[]{0, 0, 0},
+        new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{CELLSIZE * 2}},
+        new int[][]{new int[]{CELLSIZE}, new int[]{CELLSIZE}, new int[]{CELLSIZE}});
+
+    /**
+     * start block offset is 0, read cellSize + 10
+     */
+    testPlanReadPortions(0, CELLSIZE + 10, 0,
+        new int[]{CELLSIZE, 10, 0}, new int[]{0, 0, 0},
+        new int[][]{new int[]{0}, new int[]{CELLSIZE}, new int[]{}},
+        new int[][]{new int[]{CELLSIZE}, new int[]{10}, new int[]{}});
+
+    /**
+     * start block offset is 0, read 5 * cellSize + 10, buffer start offset is 100
+     */
+    testPlanReadPortions(0, 5 * CELLSIZE + 10, 100,
+        new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE + 10}, new int[]{0, 0, 0},
+        new int[][]{new int[]{100, 100 + CELLSIZE * GROUP_SIZE},
+            new int[]{100 + CELLSIZE, 100 + CELLSIZE * 4},
+            new int[]{100 + CELLSIZE * 2, 100 + CELLSIZE * 5}},
+        new int[][]{new int[]{CELLSIZE, CELLSIZE},
+            new int[]{CELLSIZE, CELLSIZE},
+            new int[]{CELLSIZE, 10}});
+
+    /**
+     * start block offset is 2, read 3 * cellSize
+     */
+    testPlanReadPortions(2, GROUP_SIZE * CELLSIZE, 100,
+        new int[]{CELLSIZE, CELLSIZE, CELLSIZE},
+        new int[]{2, 0, 0},
+        new int[][]{new int[]{100, 100 + GROUP_SIZE * CELLSIZE - 2},
+            new int[]{100 + CELLSIZE - 2},
+            new int[]{100 + CELLSIZE * 2 - 2}},
+        new int[][]{new int[]{CELLSIZE - 2, 2},
+            new int[]{CELLSIZE},
+            new int[]{CELLSIZE}});
+
+    /**
+     * start block offset is 2, read 3 * cellSize + 10
+     */
+    testPlanReadPortions(2, GROUP_SIZE * CELLSIZE + 10, 0,
+        new int[]{CELLSIZE + 10, CELLSIZE, CELLSIZE},
+        new int[]{2, 0, 0},
+        new int[][]{new int[]{0, GROUP_SIZE * CELLSIZE - 2},
+            new int[]{CELLSIZE - 2},
+            new int[]{CELLSIZE * 2 - 2}},
+        new int[][]{new int[]{CELLSIZE - 2, 12},
+            new int[]{CELLSIZE},
+            new int[]{CELLSIZE}});
+
+    /**
+     * start block offset is cellSize * 2 - 1, read 5 * cellSize + 10
+     */
+    testPlanReadPortions(CELLSIZE * 2 - 1, 5 * CELLSIZE + 10, 0,
+        new int[]{CELLSIZE * 2, CELLSIZE + 10, CELLSIZE * 2},
+        new int[]{CELLSIZE, CELLSIZE - 1, 0},
+        new int[][]{new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1},
+            new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1},
+            new int[]{1, 3 * CELLSIZE + 1}},
+        new int[][]{new int[]{CELLSIZE, CELLSIZE},
+            new int[]{1, CELLSIZE, 9},
+            new int[]{CELLSIZE, CELLSIZE}});
+
+    /**
+     * start block offset is cellSize * 6 - 1, read 7 * cellSize + 10
+     */
+    testPlanReadPortions(CELLSIZE * 6 - 1, 7 * CELLSIZE + 10, 0,
+        new int[]{CELLSIZE * 3, CELLSIZE * 2 + 9, CELLSIZE * 2 + 1},
+        new int[]{CELLSIZE * 2, CELLSIZE * 2, CELLSIZE * 2 - 1},
+        new int[][]{new int[]{1, 3 * CELLSIZE + 1, 6 * CELLSIZE + 1},
+            new int[]{CELLSIZE + 1, 4 * CELLSIZE + 1, 7 * CELLSIZE + 1},
+            new int[]{0, 2 * CELLSIZE + 1, 5 * CELLSIZE + 1}},
+        new int[][]{new int[]{CELLSIZE, CELLSIZE, CELLSIZE},
+            new int[]{CELLSIZE, CELLSIZE, 9},
+            new int[]{1, CELLSIZE, CELLSIZE}});
+  }
+
+  private LocatedStripedBlock createDummyLocatedBlock() {
+    final long blockGroupID = -1048576;
+    DatanodeInfo[] locs = new DatanodeInfo[TOTAL_SIZE];
+    String[] storageIDs = new String[TOTAL_SIZE];
+    StorageType[] storageTypes = new StorageType[TOTAL_SIZE];
+    int[] indices = new int[TOTAL_SIZE];
+    for (int i = 0; i < TOTAL_SIZE; i++) {
+      locs[i] = new DatanodeInfo(cluster.getDataNodes().get(i).getDatanodeId());
+      storageIDs[i] = cluster.getDataNodes().get(i).getDatanodeUuid();
+      storageTypes[i] = StorageType.DISK;
+      indices[i] = (i + 2) % GROUP_SIZE;
+    }
+    return new LocatedStripedBlock(new ExtendedBlock("pool", blockGroupID),
+        locs, storageIDs, storageTypes, indices, 0, false, null);
+  }
+
+  @Test
+  public void testParseDummyStripedBlock() {
+    LocatedStripedBlock lsb = createDummyLocatedBlock();
+    LocatedBlock[] blocks = DFSStripedInputStream.parseStripedBlockGroup(
+        lsb, GROUP_SIZE, CELLSIZE);
+    assertEquals(GROUP_SIZE, blocks.length);
+    for (int j = 0; j < GROUP_SIZE; j++) {
+      assertFalse(blocks[j].isStriped());
+      assertEquals(j,
+          BlockIdManager.getBlockIndex(blocks[j].getBlock().getLocalBlock()));
+      assertEquals(j * CELLSIZE, blocks[j].getStartOffset());
+    }
+  }
+
+  @Test
+  public void testParseStripedBlock() throws Exception {
+    final int numBlocks = 4;
+    DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks,
+        NUM_STRIPE_PER_BLOCK);
+    LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+        filePath.toString(), 0, BLOCKSIZE * numBlocks);
+
+    assertEquals(4, lbs.locatedBlockCount());
+    List<LocatedBlock> lbList = lbs.getLocatedBlocks();
+    for (LocatedBlock lb : lbList) {
+      assertTrue(lb.isStriped());
+    }
+
+    for (int i = 0; i < numBlocks; i++) {
+      LocatedStripedBlock lsb = (LocatedStripedBlock) (lbs.get(i));
+      LocatedBlock[] blks = DFSStripedInputStream.parseStripedBlockGroup(lsb,
+          GROUP_SIZE, CELLSIZE);
+      assertEquals(GROUP_SIZE, blks.length);
+      for (int j = 0; j < GROUP_SIZE; j++) {
+        assertFalse(blks[j].isStriped());
+        assertEquals(j,
+            BlockIdManager.getBlockIndex(blks[j].getBlock().getLocalBlock()));
+        assertEquals(i * BLOCKSIZE + j * CELLSIZE, blks[j].getStartOffset());
+      }
+    }
+  }
+
+  /**
+   * Test {@link DFSStripedInputStream#getBlockAt(long)}
+   */
+  @Test
+  public void testGetBlock() throws Exception {
+    final int numBlocks = 4;
+    DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks,
+        NUM_STRIPE_PER_BLOCK);
+    LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+        filePath.toString(), 0, BLOCKSIZE * numBlocks);
+    final DFSStripedInputStream in =
+        new DFSStripedInputStream(fs.getClient(), filePath.toString(), false);
+
+    List<LocatedBlock> lbList = lbs.getLocatedBlocks();
+    for (LocatedBlock aLbList : lbList) {
+      LocatedStripedBlock lsb = (LocatedStripedBlock) aLbList;
+      LocatedBlock[] blks = DFSStripedInputStream.parseStripedBlockGroup(lsb,
+          GROUP_SIZE, CELLSIZE);
+      for (int j = 0; j < GROUP_SIZE; j++) {
+        LocatedBlock refreshed = in.getBlockAt(blks[j].getStartOffset());
+        assertEquals(blks[j].getBlock(), refreshed.getBlock());
+        assertEquals(blks[j].getStartOffset(), refreshed.getStartOffset());
+        assertArrayEquals(blks[j].getLocations(), refreshed.getLocations());
+      }
+    }
+  }
+
+  @Test
+  public void testPread() throws Exception {
+    final int numBlocks = 4;
+    DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks,
+        NUM_STRIPE_PER_BLOCK);
+    LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+        filePath.toString(), 0, BLOCKSIZE);
+
+    assert lbs.get(0) instanceof LocatedStripedBlock;
+    LocatedStripedBlock bg = (LocatedStripedBlock)(lbs.get(0));
+    for (int i = 0; i < GROUP_SIZE; i++) {
+      Block blk = new Block(bg.getBlock().getBlockId() + i, BLOCKSIZE,
+          bg.getBlock().getGenerationStamp());
+      blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
+      cluster.injectBlocks(i, Arrays.asList(blk),
+          bg.getBlock().getBlockPoolId());
+    }
+    DFSStripedInputStream in =
+        new DFSStripedInputStream(fs.getClient(), filePath.toString(), false);
+    in.setCellSize(CELLSIZE);
+    int readSize = BLOCKSIZE;
+    byte[] readBuffer = new byte[readSize];
+    int ret = in.read(0, readBuffer, 0, readSize);
+
+    assertEquals(readSize, ret);
+    // TODO: verify read results with patterned data from HDFS-8117
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae8f0c12/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
index d965ae7..b2ff6c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java
@@ -18,15 +18,11 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
@@ -36,19 +32,14 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockECRecoveryInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
-import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
-import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
-import org.apache.hadoop.io.IOUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.UUID;
 
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CHUNK_SIZE;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
 import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -84,83 +75,10 @@ public class TestRecoverStripedBlocks {
     }
   }
 
-  public static void createECFile(MiniDFSCluster cluster, Path file, Path dir,
-      int numBlocks) throws Exception {
-    DistributedFileSystem dfs = cluster.getFileSystem();
-    dfs.mkdirs(dir);
-    dfs.getClient().getNamenode().createErasureCodingZone(dir.toString());
-
-    FSDataOutputStream out = null;
-    try {
-      out = dfs.create(file, (short) 1); // create an empty file
-
-      FSNamesystem ns = cluster.getNamesystem();
-      FSDirectory fsdir = ns.getFSDirectory();
-      INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
-
-      ExtendedBlock previous = null;
-      for (int i = 0; i < numBlocks; i++) {
-        Block newBlock = createBlock(cluster.getDataNodes(), ns,
-            file.toString(), fileNode, dfs.getClient().getClientName(),
-            previous);
-        previous = new ExtendedBlock(ns.getBlockPoolId(), newBlock);
-      }
-
-      ns.completeFile(file.toString(), dfs.getClient().getClientName(),
-          previous, fileNode.getId());
-    } finally {
-      IOUtils.cleanup(null, out);
-    }
-  }
-
-  static Block createBlock(List<DataNode> dataNodes, FSNamesystem ns,
-      String file, INodeFile fileNode, String clientName,
-      ExtendedBlock previous) throws Exception {
-    ns.getAdditionalBlock(file, fileNode.getId(), clientName, previous, null,
-        null);
-
-    final BlockInfo lastBlock = fileNode.getLastBlock();
-    final int groupSize = fileNode.getBlockReplication();
-    // 1. RECEIVING_BLOCK IBR
-    int i = 0;
-    for (DataNode dn : dataNodes) {
-      if (i < groupSize) {
-        final Block block = new Block(lastBlock.getBlockId() + i++, 0,
-            lastBlock.getGenerationStamp());
-        DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
-        StorageReceivedDeletedBlocks[] reports = DFSTestUtil
-            .makeReportForReceivedBlock(block,
-                ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage);
-        for (StorageReceivedDeletedBlocks report : reports) {
-          ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
-        }
-      }
-    }
-
-    // 2. RECEIVED_BLOCK IBR
-    i = 0;
-    for (DataNode dn : dataNodes) {
-      if (i < groupSize) {
-        final Block block = new Block(lastBlock.getBlockId() + i++,
-            BLOCK_STRIPED_CHUNK_SIZE, lastBlock.getGenerationStamp());
-        DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
-        StorageReceivedDeletedBlocks[] reports = DFSTestUtil
-            .makeReportForReceivedBlock(block,
-                ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
-        for (StorageReceivedDeletedBlocks report : reports) {
-          ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
-        }
-      }
-    }
-
-    lastBlock.setNumBytes(BLOCK_STRIPED_CHUNK_SIZE * NUM_DATA_BLOCKS);
-    return lastBlock;
-  }
-
   @Test
   public void testMissingStripedBlock() throws Exception {
     final int numBlocks = 4;
-    createECFile(cluster, filePath, dirPath, numBlocks);
+    DFSTestUtil.createECFile(cluster, filePath, dirPath, numBlocks, 1);
 
     // make sure the file is complete in NN
     final INodeFile fileNode = cluster.getNamesystem().getFSDirectory()
@@ -172,7 +90,7 @@ public class TestRecoverStripedBlocks {
     for (BlockInfo blk : blocks) {
       assertTrue(blk.isStriped());
       assertTrue(blk.isComplete());
-      assertEquals(BLOCK_STRIPED_CHUNK_SIZE * NUM_DATA_BLOCKS, blk.getNumBytes());
+      assertEquals(BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS, blk.getNumBytes());
       final BlockInfoStriped sb = (BlockInfoStriped) blk;
       assertEquals(GROUP_SIZE, sb.numNodes());
     }


Mime
View raw message