hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z..@apache.org
Subject [29/36] hadoop git commit: HDFS-7285. Erasure Coding Support inside HDFS.
Date Fri, 14 Aug 2015 17:55:41 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
new file mode 100644
index 0000000..3612063
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -0,0 +1,939 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.io.ByteBufferPool;
+
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.AlignedStripe;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunk;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
+import org.apache.hadoop.util.DirectBufferPool;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.Collection;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+
+/**
+ * DFSStripedInputStream reads from striped block groups
+ */
+public class DFSStripedInputStream extends DFSInputStream {
+
+  private static class ReaderRetryPolicy {
+    private int fetchEncryptionKeyTimes = 1;
+    private int fetchTokenTimes = 1;
+
+    void refetchEncryptionKey() {
+      fetchEncryptionKeyTimes--;
+    }
+
+    void refetchToken() {
+      fetchTokenTimes--;
+    }
+
+    boolean shouldRefetchEncryptionKey() {
+      return fetchEncryptionKeyTimes > 0;
+    }
+
+    boolean shouldRefetchToken() {
+      return fetchTokenTimes > 0;
+    }
+  }
+
+  /** Used to indicate the buffered data's range in the block group */
+  private static class StripeRange {
+    /** start offset in the block group (inclusive) */
+    final long offsetInBlock;
+    /** length of the stripe range */
+    final long length;
+
+    StripeRange(long offsetInBlock, long length) {
+      Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0);
+      this.offsetInBlock = offsetInBlock;
+      this.length = length;
+    }
+
+    boolean include(long pos) {
+      return pos >= offsetInBlock && pos < offsetInBlock + length;
+    }
+  }
+
+  private static class BlockReaderInfo {
+    final BlockReader reader;
+    final DatanodeInfo datanode;
+    /**
+     * when initializing block readers, their starting offsets are set to the same
+     * number: the smallest internal block offsets among all the readers. This is
+     * because it is possible that for some internal blocks we have to read
+     * "backwards" for decoding purpose. We thus use this offset array to track
+     * offsets for all the block readers so that we can skip data if necessary.
+     */
+    long blockReaderOffset;
+    LocatedBlock targetBlock;
+    /**
+     * We use this field to indicate whether we should use this reader. In case
+     * we hit any issue with this reader, we set this field to true and avoid
+     * using it for the next stripe.
+     */
+    boolean shouldSkip = false;
+
+    BlockReaderInfo(BlockReader reader, LocatedBlock targetBlock,
+        DatanodeInfo dn, long offset) {
+      this.reader = reader;
+      this.targetBlock = targetBlock;
+      this.datanode = dn;
+      this.blockReaderOffset = offset;
+    }
+
+    void setOffset(long offset) {
+      this.blockReaderOffset = offset;
+    }
+
+    void skip() {
+      this.shouldSkip = true;
+    }
+  }
+
+  private static final DirectBufferPool bufferPool = new DirectBufferPool();
+
+  private final BlockReaderInfo[] blockReaders;
+  private final int cellSize;
+  private final short dataBlkNum;
+  private final short parityBlkNum;
+  private final int groupSize;
+  /** the buffer for a complete stripe */
+  private ByteBuffer curStripeBuf;
+  private ByteBuffer parityBuf;
+  private final ECSchema schema;
+  private final RawErasureDecoder decoder;
+
+  /**
+   * indicate the start/end offset of the current buffered stripe in the
+   * block group
+   */
+  private StripeRange curStripeRange;
+  private final CompletionService<Void> readingService;
+
+  DFSStripedInputStream(DFSClient dfsClient, String src,
+      boolean verifyChecksum, ECSchema schema, int cellSize,
+      LocatedBlocks locatedBlocks) throws IOException {
+    super(dfsClient, src, verifyChecksum, locatedBlocks);
+
+    assert schema != null;
+    this.schema = schema;
+    this.cellSize = cellSize;
+    dataBlkNum = (short) schema.getNumDataUnits();
+    parityBlkNum = (short) schema.getNumParityUnits();
+    groupSize = dataBlkNum + parityBlkNum;
+    blockReaders = new BlockReaderInfo[groupSize];
+    curStripeRange = new StripeRange(0, 0);
+    readingService =
+        new ExecutorCompletionService<>(dfsClient.getStripedReadsThreadPool());
+    decoder = CodecUtil.createRSRawDecoder(dfsClient.getConfiguration(),
+        dataBlkNum, parityBlkNum);
+    if (DFSClient.LOG.isDebugEnabled()) {
+      DFSClient.LOG.debug("Creating an striped input stream for file " + src);
+    }
+  }
+
+  private void resetCurStripeBuffer() {
+    if (curStripeBuf == null) {
+      curStripeBuf = bufferPool.getBuffer(cellSize * dataBlkNum);
+    }
+    curStripeBuf.clear();
+    curStripeRange = new StripeRange(0, 0);
+  }
+
+  private ByteBuffer getParityBuffer() {
+    if (parityBuf == null) {
+      parityBuf = bufferPool.getBuffer(cellSize * parityBlkNum);
+    }
+    parityBuf.clear();
+    return parityBuf;
+  }
+
+  /**
+   * When seeking into a new block group, create blockReader for each internal
+   * block in the group.
+   */
+  private synchronized void blockSeekTo(long target) throws IOException {
+    if (target >= getFileLength()) {
+      throw new IOException("Attempted to read past end of file");
+    }
+
+    // Will be getting a new BlockReader.
+    closeCurrentBlockReaders();
+
+    // Compute desired striped block group
+    LocatedStripedBlock targetBlockGroup = getBlockGroupAt(target);
+    // Update current position
+    this.pos = target;
+    this.blockEnd = targetBlockGroup.getStartOffset() +
+        targetBlockGroup.getBlockSize() - 1;
+    currentLocatedBlock = targetBlockGroup;
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    super.close();
+    if (curStripeBuf != null) {
+      bufferPool.returnBuffer(curStripeBuf);
+      curStripeBuf = null;
+    }
+    if (parityBuf != null) {
+      bufferPool.returnBuffer(parityBuf);
+      parityBuf = null;
+    }
+  }
+
+  /**
+   * Extend the super method with the logic of switching between cells.
+   * When reaching the end of a cell, proceed to the next cell and read it
+   * with the next blockReader.
+   */
+  @Override
+  protected void closeCurrentBlockReaders() {
+    resetCurStripeBuffer();
+    if (blockReaders ==  null || blockReaders.length == 0) {
+      return;
+    }
+    for (int i = 0; i < groupSize; i++) {
+      closeReader(blockReaders[i]);
+      blockReaders[i] = null;
+    }
+    blockEnd = -1;
+  }
+
+  private void closeReader(BlockReaderInfo readerInfo) {
+    if (readerInfo != null) {
+      IOUtils.cleanup(DFSClient.LOG, readerInfo.reader);
+      readerInfo.skip();
+    }
+  }
+
+  private long getOffsetInBlockGroup() {
+    return getOffsetInBlockGroup(pos);
+  }
+
+  private long getOffsetInBlockGroup(long pos) {
+    return pos - currentLocatedBlock.getStartOffset();
+  }
+
+  /**
+   * Read a new stripe covering the current position, and store the data in the
+   * {@link #curStripeBuf}.
+   */
+  private void readOneStripe(
+      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      throws IOException {
+    resetCurStripeBuffer();
+
+    // compute stripe range based on pos
+    final long offsetInBlockGroup = getOffsetInBlockGroup();
+    final long stripeLen = cellSize * dataBlkNum;
+    final int stripeIndex = (int) (offsetInBlockGroup / stripeLen);
+    final int stripeBufOffset = (int) (offsetInBlockGroup % stripeLen);
+    final int stripeLimit = (int) Math.min(currentLocatedBlock.getBlockSize()
+        - (stripeIndex * stripeLen), stripeLen);
+    StripeRange stripeRange = new StripeRange(offsetInBlockGroup,
+        stripeLimit - stripeBufOffset);
+
+    LocatedStripedBlock blockGroup = (LocatedStripedBlock) currentLocatedBlock;
+    AlignedStripe[] stripes = StripedBlockUtil.divideOneStripe(schema, cellSize,
+        blockGroup, offsetInBlockGroup,
+        offsetInBlockGroup + stripeRange.length - 1, curStripeBuf);
+    final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
+        blockGroup, cellSize, dataBlkNum, parityBlkNum);
+    // read the whole stripe
+    for (AlignedStripe stripe : stripes) {
+      // Parse group to get chosen DN location
+      StripeReader sreader = new StatefulStripeReader(readingService, stripe,
+          blks, blockReaders, corruptedBlockMap);
+      sreader.readStripe();
+    }
+    curStripeBuf.position(stripeBufOffset);
+    curStripeBuf.limit(stripeLimit);
+    curStripeRange = stripeRange;
+  }
+
+  private Callable<Void> readCells(final BlockReader reader,
+      final DatanodeInfo datanode, final long currentReaderOffset,
+      final long targetReaderOffset, final ByteBufferStrategy[] strategies,
+      final ExtendedBlock currentBlock,
+      final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+    return new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        // reader can be null if getBlockReaderWithRetry failed or
+        // the reader hit exception before
+        if (reader == null) {
+          throw new IOException("The BlockReader is null. " +
+              "The BlockReader creation failed or the reader hit exception.");
+        }
+        Preconditions.checkState(currentReaderOffset <= targetReaderOffset);
+        if (currentReaderOffset < targetReaderOffset) {
+          long skipped = reader.skip(targetReaderOffset - currentReaderOffset);
+          Preconditions.checkState(
+              skipped == targetReaderOffset - currentReaderOffset);
+        }
+        int result = 0;
+        for (ByteBufferStrategy strategy : strategies) {
+          result += readToBuffer(reader, datanode, strategy, currentBlock,
+              corruptedBlockMap);
+        }
+        return null;
+      }
+    };
+  }
+
+  private int readToBuffer(BlockReader blockReader,
+      DatanodeInfo currentNode, ByteBufferStrategy strategy,
+      ExtendedBlock currentBlock,
+      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      throws IOException {
+    final int targetLength = strategy.buf.remaining();
+    int length = 0;
+    try {
+      while (length < targetLength) {
+        int ret = strategy.doRead(blockReader, 0, 0);
+        if (ret < 0) {
+          throw new IOException("Unexpected EOS from the reader");
+        }
+        length += ret;
+      }
+      return length;
+    } catch (ChecksumException ce) {
+      DFSClient.LOG.warn("Found Checksum error for "
+          + currentBlock + " from " + currentNode
+          + " at " + ce.getPos());
+      // we want to remember which block replicas we have tried
+      addIntoCorruptedBlockMap(currentBlock, currentNode,
+          corruptedBlockMap);
+      throw ce;
+    } catch (IOException e) {
+      DFSClient.LOG.warn("Exception while reading from "
+          + currentBlock + " of " + src + " from "
+          + currentNode, e);
+      throw e;
+    }
+  }
+
+  /**
+   * Seek to a new arbitrary location
+   */
+  @Override
+  public synchronized void seek(long targetPos) throws IOException {
+    if (targetPos > getFileLength()) {
+      throw new EOFException("Cannot seek after EOF");
+    }
+    if (targetPos < 0) {
+      throw new EOFException("Cannot seek to negative offset");
+    }
+    if (closed.get()) {
+      throw new IOException("Stream is closed!");
+    }
+    if (targetPos <= blockEnd) {
+      final long targetOffsetInBlk = getOffsetInBlockGroup(targetPos);
+      if (curStripeRange.include(targetOffsetInBlk)) {
+        int bufOffset = getStripedBufOffset(targetOffsetInBlk);
+        curStripeBuf.position(bufOffset);
+        pos = targetPos;
+        return;
+      }
+    }
+    pos = targetPos;
+    blockEnd = -1;
+  }
+
+  private int getStripedBufOffset(long offsetInBlockGroup) {
+    final long stripeLen = cellSize * dataBlkNum;
+    // compute the position in the curStripeBuf based on "pos"
+    return (int) (offsetInBlockGroup % stripeLen);
+  }
+
+  @Override
+  public synchronized boolean seekToNewSource(long targetPos)
+      throws IOException {
+    return false;
+  }
+
+  @Override
+  protected synchronized int readWithStrategy(ReaderStrategy strategy,
+      int off, int len) throws IOException {
+    dfsClient.checkOpen();
+    if (closed.get()) {
+      throw new IOException("Stream closed");
+    }
+    Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap =
+        new ConcurrentHashMap<>();
+    if (pos < getFileLength()) {
+      try {
+        if (pos > blockEnd) {
+          blockSeekTo(pos);
+        }
+        int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
+        synchronized (infoLock) {
+          if (locatedBlocks.isLastBlockComplete()) {
+            realLen = (int) Math.min(realLen,
+                locatedBlocks.getFileLength() - pos);
+          }
+        }
+
+        /** Number of bytes already read into buffer */
+        int result = 0;
+        while (result < realLen) {
+          if (!curStripeRange.include(getOffsetInBlockGroup())) {
+            readOneStripe(corruptedBlockMap);
+          }
+          int ret = copyToTargetBuf(strategy, off + result, realLen - result);
+          result += ret;
+          pos += ret;
+        }
+        if (dfsClient.stats != null) {
+          dfsClient.stats.incrementBytesRead(result);
+        }
+        return result;
+      } finally {
+        // Check if need to report block replicas corruption either read
+        // was successful or ChecksumException occured.
+        reportCheckSumFailure(corruptedBlockMap,
+            currentLocatedBlock.getLocations().length);
+      }
+    }
+    return -1;
+  }
+
+  /**
+   * Copy the data from {@link #curStripeBuf} into the given buffer
+   * @param strategy the ReaderStrategy containing the given buffer
+   * @param offset the offset of the given buffer. Used only when strategy is
+   *               a ByteArrayStrategy
+   * @param length target length
+   * @return number of bytes copied
+   */
+  private int copyToTargetBuf(ReaderStrategy strategy, int offset, int length) {
+    final long offsetInBlk = getOffsetInBlockGroup();
+    int bufOffset = getStripedBufOffset(offsetInBlk);
+    curStripeBuf.position(bufOffset);
+    return strategy.copyFrom(curStripeBuf, offset,
+        Math.min(length, curStripeBuf.remaining()));
+  }
+
+  /**
+   * The super method {@link DFSInputStream#refreshLocatedBlock} refreshes
+   * cached LocatedBlock by executing {@link DFSInputStream#getBlockAt} again.
+   * This method extends the logic by first remembering the index of the
+   * internal block, and re-parsing the refreshed block group with the same
+   * index.
+   */
+  @Override
+  protected LocatedBlock refreshLocatedBlock(LocatedBlock block)
+      throws IOException {
+    int idx = BlockIdManager.getBlockIndex(block.getBlock().getLocalBlock());
+    LocatedBlock lb = getBlockGroupAt(block.getStartOffset());
+    // If indexing information is returned, iterate through the index array
+    // to find the entry for position idx in the group
+    LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
+    int i = 0;
+    for (; i < lsb.getBlockIndices().length; i++) {
+      if (lsb.getBlockIndices()[i] == idx) {
+        break;
+      }
+    }
+    if (DFSClient.LOG.isDebugEnabled()) {
+      DFSClient.LOG.debug("refreshLocatedBlock for striped blocks, offset="
+          + block.getStartOffset() + ". Obtained block " + lb + ", idx=" + idx);
+    }
+    return StripedBlockUtil.constructInternalBlock(
+        lsb, i, cellSize, dataBlkNum, idx);
+  }
+
+  private LocatedStripedBlock getBlockGroupAt(long offset) throws IOException {
+    LocatedBlock lb = super.getBlockAt(offset);
+    assert lb instanceof LocatedStripedBlock : "NameNode" +
+        " should return a LocatedStripedBlock for a striped file";
+    return (LocatedStripedBlock)lb;
+  }
+
+  /**
+   * Real implementation of pread.
+   */
+  @Override
+  protected void fetchBlockByteRange(LocatedBlock block, long start,
+      long end, byte[] buf, int offset,
+      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      throws IOException {
+    // Refresh the striped block group
+    LocatedStripedBlock blockGroup = getBlockGroupAt(block.getStartOffset());
+
+    AlignedStripe[] stripes = StripedBlockUtil.divideByteRangeIntoStripes(
+        schema, cellSize, blockGroup, start, end, buf, offset);
+    CompletionService<Void> readService = new ExecutorCompletionService<>(
+        dfsClient.getStripedReadsThreadPool());
+    final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
+        blockGroup, cellSize, dataBlkNum, parityBlkNum);
+    final BlockReaderInfo[] preaderInfos = new BlockReaderInfo[groupSize];
+    try {
+      for (AlignedStripe stripe : stripes) {
+        // Parse group to get chosen DN location
+        StripeReader preader = new PositionStripeReader(readService, stripe,
+            blks, preaderInfos, corruptedBlockMap);
+        preader.readStripe();
+      }
+    } finally {
+      for (BlockReaderInfo preaderInfo : preaderInfos) {
+        closeReader(preaderInfo);
+      }
+    }
+  }
+
+  /**
+   * The reader for reading a complete {@link AlignedStripe}. Note that an
+   * {@link AlignedStripe} may cross multiple stripes with cellSize width.
+   */
+  private abstract class StripeReader {
+    final Map<Future<Void>, Integer> futures = new HashMap<>();
+    final AlignedStripe alignedStripe;
+    final CompletionService<Void> service;
+    final LocatedBlock[] targetBlocks;
+    final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap;
+    final BlockReaderInfo[] readerInfos;
+
+    StripeReader(CompletionService<Void> service, AlignedStripe alignedStripe,
+        LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos,
+        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+      this.service = service;
+      this.alignedStripe = alignedStripe;
+      this.targetBlocks = targetBlocks;
+      this.readerInfos = readerInfos;
+      this.corruptedBlockMap = corruptedBlockMap;
+    }
+
+    /** prepare all the data chunks */
+    abstract void prepareDecodeInputs();
+
+    /** prepare the parity chunk and block reader if necessary */
+    abstract boolean prepareParityChunk(int index) throws IOException;
+
+    abstract void decode();
+
+    void updateState4SuccessRead(StripingChunkReadResult result) {
+      Preconditions.checkArgument(
+          result.state == StripingChunkReadResult.SUCCESSFUL);
+      readerInfos[result.index].setOffset(alignedStripe.getOffsetInBlock()
+          + alignedStripe.getSpanInBlock());
+    }
+
+    private void checkMissingBlocks() throws IOException {
+      if (alignedStripe.missingChunksNum > parityBlkNum) {
+        clearFutures(futures.keySet());
+        throw new IOException(alignedStripe.missingChunksNum
+            + " missing blocks, the stripe is: " + alignedStripe);
+      }
+    }
+
+    /**
+     * We need decoding. Thus go through all the data chunks and make sure we
+     * submit read requests for all of them.
+     */
+    private void readDataForDecoding() throws IOException {
+      prepareDecodeInputs();
+      for (int i = 0; i < dataBlkNum; i++) {
+        Preconditions.checkNotNull(alignedStripe.chunks[i]);
+        if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) {
+          if (!readChunk(targetBlocks[i], i)) {
+            alignedStripe.missingChunksNum++;
+          }
+        }
+      }
+      checkMissingBlocks();
+    }
+
+    void readParityChunks(int num) throws IOException {
+      for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num;
+           i++) {
+        if (alignedStripe.chunks[i] == null) {
+          if (prepareParityChunk(i) && readChunk(targetBlocks[i], i)) {
+            j++;
+          } else {
+            alignedStripe.missingChunksNum++;
+          }
+        }
+      }
+      checkMissingBlocks();
+    }
+
+    boolean createBlockReader(LocatedBlock block, int chunkIndex)
+        throws IOException {
+      BlockReader reader = null;
+      final ReaderRetryPolicy retry = new ReaderRetryPolicy();
+      DNAddrPair dnInfo = new DNAddrPair(null, null, null);
+
+      while(true) {
+        try {
+          // the cached block location might have been re-fetched, so always
+          // get it from cache.
+          block = refreshLocatedBlock(block);
+          targetBlocks[chunkIndex] = block;
+
+          // internal block has one location, just rule out the deadNodes
+          dnInfo = getBestNodeDNAddrPair(block, null);
+          if (dnInfo == null) {
+            break;
+          }
+          reader = getBlockReader(block, alignedStripe.getOffsetInBlock(),
+              block.getBlockSize() - alignedStripe.getOffsetInBlock(),
+              dnInfo.addr, dnInfo.storageType, dnInfo.info);
+        } catch (IOException e) {
+          if (e instanceof InvalidEncryptionKeyException &&
+              retry.shouldRefetchEncryptionKey()) {
+            DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+                + "encryption key was invalid when connecting to " + dnInfo.addr
+                + " : " + e);
+            dfsClient.clearDataEncryptionKey();
+            retry.refetchEncryptionKey();
+          } else if (retry.shouldRefetchToken() &&
+              tokenRefetchNeeded(e, dnInfo.addr)) {
+            fetchBlockAt(block.getStartOffset());
+            retry.refetchToken();
+          } else {
+            //TODO: handles connection issues
+            DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " +
+                "block" + block.getBlock(), e);
+            // re-fetch the block in case the block has been moved
+            fetchBlockAt(block.getStartOffset());
+            addToDeadNodes(dnInfo.info);
+          }
+        }
+        if (reader != null) {
+          readerInfos[chunkIndex] = new BlockReaderInfo(reader, block,
+              dnInfo.info, alignedStripe.getOffsetInBlock());
+          return true;
+        }
+      }
+      return false;
+    }
+
+    private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) {
+      if (chunk.byteBuffer != null) {
+        ByteBufferStrategy strategy = new ByteBufferStrategy(chunk.byteBuffer);
+        return new ByteBufferStrategy[]{strategy};
+      } else {
+        ByteBufferStrategy[] strategies =
+            new ByteBufferStrategy[chunk.byteArray.getOffsets().length];
+        for (int i = 0; i < strategies.length; i++) {
+          ByteBuffer buffer = ByteBuffer.wrap(chunk.byteArray.buf(),
+              chunk.byteArray.getOffsets()[i], chunk.byteArray.getLengths()[i]);
+          strategies[i] = new ByteBufferStrategy(buffer);
+        }
+        return strategies;
+      }
+    }
+
+    boolean readChunk(final LocatedBlock block, int chunkIndex)
+        throws IOException {
+      final StripingChunk chunk = alignedStripe.chunks[chunkIndex];
+      if (block == null) {
+        chunk.state = StripingChunk.MISSING;
+        return false;
+      }
+      if (readerInfos[chunkIndex] == null) {
+        if (!createBlockReader(block, chunkIndex)) {
+          chunk.state = StripingChunk.MISSING;
+          return false;
+        }
+      } else if (readerInfos[chunkIndex].shouldSkip) {
+        chunk.state = StripingChunk.MISSING;
+        return false;
+      }
+
+      chunk.state = StripingChunk.PENDING;
+      Callable<Void> readCallable = readCells(readerInfos[chunkIndex].reader,
+          readerInfos[chunkIndex].datanode,
+          readerInfos[chunkIndex].blockReaderOffset,
+          alignedStripe.getOffsetInBlock(), getReadStrategies(chunk),
+          block.getBlock(), corruptedBlockMap);
+
+      Future<Void> request = service.submit(readCallable);
+      futures.put(request, chunkIndex);
+      return true;
+    }
+
+    /** read the whole stripe. do decoding if necessary */
+    void readStripe() throws IOException {
+      for (int i = 0; i < dataBlkNum; i++) {
+        if (alignedStripe.chunks[i] != null &&
+            alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
+          if (!readChunk(targetBlocks[i], i)) {
+            alignedStripe.missingChunksNum++;
+          }
+        }
+      }
+      // There are missing block locations at this stage. Thus we need to read
+      // the full stripe and one more parity block.
+      if (alignedStripe.missingChunksNum > 0) {
+        checkMissingBlocks();
+        readDataForDecoding();
+        // read parity chunks
+        readParityChunks(alignedStripe.missingChunksNum);
+      }
+      // TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks
+
+      // Input buffers for potential decode operation, which remains null until
+      // first read failure
+      while (!futures.isEmpty()) {
+        try {
+          StripingChunkReadResult r = StripedBlockUtil
+              .getNextCompletedStripedRead(service, futures, 0);
+          if (DFSClient.LOG.isDebugEnabled()) {
+            DFSClient.LOG.debug("Read task returned: " + r + ", for stripe "
+                + alignedStripe);
+          }
+          StripingChunk returnedChunk = alignedStripe.chunks[r.index];
+          Preconditions.checkNotNull(returnedChunk);
+          Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING);
+
+          if (r.state == StripingChunkReadResult.SUCCESSFUL) {
+            returnedChunk.state = StripingChunk.FETCHED;
+            alignedStripe.fetchedChunksNum++;
+            updateState4SuccessRead(r);
+            if (alignedStripe.fetchedChunksNum == dataBlkNum) {
+              clearFutures(futures.keySet());
+              break;
+            }
+          } else {
+            returnedChunk.state = StripingChunk.MISSING;
+            // close the corresponding reader
+            closeReader(readerInfos[r.index]);
+
+            final int missing = alignedStripe.missingChunksNum;
+            alignedStripe.missingChunksNum++;
+            checkMissingBlocks();
+
+            readDataForDecoding();
+            readParityChunks(alignedStripe.missingChunksNum - missing);
+          }
+        } catch (InterruptedException ie) {
+          String err = "Read request interrupted";
+          DFSClient.LOG.error(err);
+          clearFutures(futures.keySet());
+          // Don't decode if read interrupted
+          throw new InterruptedIOException(err);
+        }
+      }
+
+      if (alignedStripe.missingChunksNum > 0) {
+        decode();
+      }
+    }
+  }
+
+  class PositionStripeReader extends StripeReader {
+    private byte[][] decodeInputs = null;
+
+    PositionStripeReader(CompletionService<Void> service,
+        AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
+        BlockReaderInfo[] readerInfos,
+        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+      super(service, alignedStripe, targetBlocks, readerInfos,
+          corruptedBlockMap);
+    }
+
+    @Override
+    void prepareDecodeInputs() {
+      if (decodeInputs == null) {
+        decodeInputs = StripedBlockUtil.initDecodeInputs(alignedStripe,
+            dataBlkNum, parityBlkNum);
+      }
+    }
+
+    @Override
+    boolean prepareParityChunk(int index) {
+      Preconditions.checkState(index >= dataBlkNum &&
+          alignedStripe.chunks[index] == null);
+      final int decodeIndex = StripedBlockUtil.convertIndex4Decode(index,
+          dataBlkNum, parityBlkNum);
+      alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]);
+      alignedStripe.chunks[index].addByteArraySlice(0,
+          (int) alignedStripe.getSpanInBlock());
+      return true;
+    }
+
+    @Override
+    void decode() {
+      StripedBlockUtil.finalizeDecodeInputs(decodeInputs, dataBlkNum,
+          parityBlkNum, alignedStripe);
+      StripedBlockUtil.decodeAndFillBuffer(decodeInputs, alignedStripe,
+          dataBlkNum, parityBlkNum, decoder);
+    }
+  }
+
+  class StatefulStripeReader extends StripeReader {
+    ByteBuffer[] decodeInputs;
+
+    StatefulStripeReader(CompletionService<Void> service,
+        AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
+        BlockReaderInfo[] readerInfos,
+        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+      super(service, alignedStripe, targetBlocks, readerInfos,
+          corruptedBlockMap);
+    }
+
+    @Override
+    void prepareDecodeInputs() {
+      if (decodeInputs == null) {
+        decodeInputs = new ByteBuffer[dataBlkNum + parityBlkNum];
+        ByteBuffer cur = curStripeBuf.duplicate();
+        StripedBlockUtil.VerticalRange range = alignedStripe.range;
+        for (int i = 0; i < dataBlkNum; i++) {
+          cur.limit(cur.capacity());
+          int pos = (int) (range.offsetInBlock % cellSize + cellSize * i);
+          cur.position(pos);
+          cur.limit((int) (pos + range.spanInBlock));
+          final int decodeIndex = StripedBlockUtil.convertIndex4Decode(i,
+              dataBlkNum, parityBlkNum);
+          decodeInputs[decodeIndex] = cur.slice();
+          if (alignedStripe.chunks[i] == null) {
+            alignedStripe.chunks[i] = new StripingChunk(
+                decodeInputs[decodeIndex]);
+          }
+        }
+      }
+    }
+
+    @Override
+    boolean prepareParityChunk(int index) throws IOException {
+      Preconditions.checkState(index >= dataBlkNum
+          && alignedStripe.chunks[index] == null);
+      if (blockReaders[index] != null && blockReaders[index].shouldSkip) {
+        alignedStripe.chunks[index] = new StripingChunk(StripingChunk.MISSING);
+        // we have failed the block reader before
+        return false;
+      }
+      final int decodeIndex = StripedBlockUtil.convertIndex4Decode(index,
+          dataBlkNum, parityBlkNum);
+      ByteBuffer buf = getParityBuffer().duplicate();
+      buf.position(cellSize * decodeIndex);
+      buf.limit(cellSize * decodeIndex + (int) alignedStripe.range.spanInBlock);
+      decodeInputs[decodeIndex] = buf.slice();
+      alignedStripe.chunks[index] = new StripingChunk(decodeInputs[decodeIndex]);
+      return true;
+    }
+
+    @Override
+    void decode() {
+      // TODO no copy for data chunks. this depends on HADOOP-12047
+      final int span = (int) alignedStripe.getSpanInBlock();
+      for (int i = 0; i < alignedStripe.chunks.length; i++) {
+        final int decodeIndex = StripedBlockUtil.convertIndex4Decode(i,
+            dataBlkNum, parityBlkNum);
+        if (alignedStripe.chunks[i] != null &&
+            alignedStripe.chunks[i].state == StripingChunk.ALLZERO) {
+          for (int j = 0; j < span; j++) {
+            decodeInputs[decodeIndex].put((byte) 0);
+          }
+          decodeInputs[decodeIndex].flip();
+        } else if (alignedStripe.chunks[i] != null &&
+            alignedStripe.chunks[i].state == StripingChunk.FETCHED) {
+          decodeInputs[decodeIndex].position(0);
+          decodeInputs[decodeIndex].limit(span);
+        }
+      }
+      int[] decodeIndices = new int[parityBlkNum];
+      int pos = 0;
+      for (int i = 0; i < alignedStripe.chunks.length; i++) {
+        if (alignedStripe.chunks[i] != null &&
+            alignedStripe.chunks[i].state == StripingChunk.MISSING) {
+          int  decodeIndex = StripedBlockUtil.convertIndex4Decode(i,
+              dataBlkNum, parityBlkNum);
+          if (i < dataBlkNum) {
+            decodeIndices[pos++] = decodeIndex;
+          } else {
+            decodeInputs[decodeIndex] = null;
+          }
+        }
+      }
+      decodeIndices = Arrays.copyOf(decodeIndices, pos);
+
+      final int decodeChunkNum = decodeIndices.length;
+      ByteBuffer[] outputs = new ByteBuffer[decodeChunkNum];
+      for (int i = 0; i < decodeChunkNum; i++) {
+        outputs[i] = decodeInputs[decodeIndices[i]];
+        outputs[i].position(0);
+        outputs[i].limit((int) alignedStripe.range.spanInBlock);
+        decodeInputs[decodeIndices[i]] = null;
+      }
+
+      decoder.decode(decodeInputs, decodeIndices, outputs);
+    }
+  }
+
+  /**
+   * May need online read recovery, zero-copy read doesn't make
+   * sense, so don't support it.
+   */
+  @Override
+  public synchronized ByteBuffer read(ByteBufferPool bufferPool,
+      int maxLength, EnumSet<ReadOption> opts)
+          throws IOException, UnsupportedOperationException {
+    throw new UnsupportedOperationException(
+        "Not support enhanced byte buffer access.");
+  }
+
+  @Override
+  public synchronized void releaseBuffer(ByteBuffer buffer) {
+    throw new UnsupportedOperationException(
+        "Not support enhanced byte buffer access.");
+  }
+
+  /** A variation to {@link DFSInputStream#cancelAll} */
+  private void clearFutures(Collection<Future<Void>> futures) {
+    for (Future<Void> future : futures) {
+      future.cancel(false);
+    }
+    futures.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
new file mode 100644
index 0000000..746b791
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -0,0 +1,653 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.io.erasurecode.CodecUtil;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Progressable;
+import org.apache.htrace.Sampler;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * This class supports writing files in striped layout and erasure coded format.
+ * Each stripe contains a sequence of cells.
+ */
+@InterfaceAudience.Private
+public class DFSStripedOutputStream extends DFSOutputStream {
+  static class MultipleBlockingQueue<T> {
+    private final List<BlockingQueue<T>> queues;
+
+    MultipleBlockingQueue(int numQueue, int queueSize) {
+      queues = new ArrayList<>(numQueue);
+      for (int i = 0; i < numQueue; i++) {
+        queues.add(new LinkedBlockingQueue<T>(queueSize));
+      }
+    }
+
+    boolean isEmpty() {
+      for(int i = 0; i < queues.size(); i++) {
+        if (!queues.get(i).isEmpty()) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    int numQueues() {
+      return queues.size();
+    }
+
+    void offer(int i, T object) {
+      final boolean b = queues.get(i).offer(object);
+      Preconditions.checkState(b, "Failed to offer " + object
+          + " to queue, i=" + i);
+    }
+
+    T take(int i) throws InterruptedIOException {
+      try {
+        return queues.get(i).take();
+      } catch(InterruptedException ie) {
+        throw DFSUtil.toInterruptedIOException("take interrupted, i=" + i, ie);
+      }
+    }
+
+    T poll(int i) {
+      return queues.get(i).poll();
+    }
+
+    T peek(int i) {
+      return queues.get(i).peek();
+    }
+  }
+
+  /** Coordinate the communication between the streamers. */
+  class Coordinator {
+    private final MultipleBlockingQueue<LocatedBlock> followingBlocks;
+    private final MultipleBlockingQueue<ExtendedBlock> endBlocks;
+
+    private final MultipleBlockingQueue<LocatedBlock> newBlocks;
+    private final MultipleBlockingQueue<ExtendedBlock> updateBlocks;
+
+    Coordinator(final DfsClientConf conf, final int numDataBlocks,
+        final int numAllBlocks) {
+      followingBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
+      endBlocks = new MultipleBlockingQueue<>(numDataBlocks, 1);
+
+      newBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
+      updateBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
+    }
+
+    MultipleBlockingQueue<LocatedBlock> getFollowingBlocks() {
+      return followingBlocks;
+    }
+
+    MultipleBlockingQueue<LocatedBlock> getNewBlocks() {
+      return newBlocks;
+    }
+
+    MultipleBlockingQueue<ExtendedBlock> getUpdateBlocks() {
+      return updateBlocks;
+    }
+
+    StripedDataStreamer getStripedDataStreamer(int i) {
+      return DFSStripedOutputStream.this.getStripedDataStreamer(i);
+    }
+
+    void offerEndBlock(int i, ExtendedBlock block) {
+      endBlocks.offer(i, block);
+    }
+
+    ExtendedBlock takeEndBlock(int i) throws InterruptedIOException {
+      return endBlocks.take(i);
+    }
+
+    boolean hasAllEndBlocks() {
+      for(int i = 0; i < endBlocks.numQueues(); i++) {
+        if (endBlocks.peek(i) == null) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    void setBytesEndBlock(int i, long newBytes, ExtendedBlock block) {
+      ExtendedBlock b = endBlocks.peek(i);
+      if (b == null) {
+        // streamer just has failed, put end block and continue
+        b = block;
+        offerEndBlock(i, b);
+      }
+      b.setNumBytes(newBytes);
+    }
+
+    /** @return a block representing the entire block group. */
+    ExtendedBlock getBlockGroup() {
+      final StripedDataStreamer s0 = getStripedDataStreamer(0);
+      final ExtendedBlock b0 = s0.getBlock();
+      if (b0 == null) {
+        return null;
+      }
+
+      final boolean atBlockGroupBoundary = s0.getBytesCurBlock() == 0 && b0.getNumBytes() > 0;
+      final ExtendedBlock block = new ExtendedBlock(b0);
+      long numBytes = b0.getNumBytes();
+      for (int i = 1; i < numDataBlocks; i++) {
+        final StripedDataStreamer si = getStripedDataStreamer(i);
+        final ExtendedBlock bi = si.getBlock();
+        if (bi != null && bi.getGenerationStamp() > block.getGenerationStamp()) {
+          block.setGenerationStamp(bi.getGenerationStamp());
+        }
+        numBytes += atBlockGroupBoundary? bi.getNumBytes(): si.getBytesCurBlock();
+      }
+      block.setNumBytes(numBytes);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("getBlockGroup: " + block + ", numBytes=" + block.getNumBytes());
+      }
+      return block;
+    }
+  }
+
+  /** Buffers for writing the data and parity cells of a stripe. */
+  class CellBuffers {
+    private final ByteBuffer[] buffers;
+    private final byte[][] checksumArrays;
+
+    CellBuffers(int numParityBlocks) throws InterruptedException{
+      if (cellSize % bytesPerChecksum != 0) {
+        throw new HadoopIllegalArgumentException("Invalid values: "
+            + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (="
+            + bytesPerChecksum + ") must divide cell size (=" + cellSize + ").");
+      }
+
+      checksumArrays = new byte[numParityBlocks][];
+      final int size = getChecksumSize() * (cellSize / bytesPerChecksum);
+      for (int i = 0; i < checksumArrays.length; i++) {
+        checksumArrays[i] = new byte[size];
+      }
+
+      buffers = new ByteBuffer[numAllBlocks];
+      for (int i = 0; i < buffers.length; i++) {
+        buffers[i] = ByteBuffer.wrap(byteArrayManager.newByteArray(cellSize));
+      }
+    }
+
+    private ByteBuffer[] getBuffers() {
+      return buffers;
+    }
+
+    byte[] getChecksumArray(int i) {
+      return checksumArrays[i - numDataBlocks];
+    }
+
+    private int addTo(int i, byte[] b, int off, int len) {
+      final ByteBuffer buf = buffers[i];
+      final int pos = buf.position() + len;
+      Preconditions.checkState(pos <= cellSize);
+      buf.put(b, off, len);
+      return pos;
+    }
+
+    private void clear() {
+      for (int i = 0; i< numAllBlocks; i++) {
+        buffers[i].clear();
+        if (i >= numDataBlocks) {
+          Arrays.fill(buffers[i].array(), (byte) 0);
+        }
+      }
+    }
+
+    private void release() {
+      for (int i = 0; i < numAllBlocks; i++) {
+        byteArrayManager.release(buffers[i].array());
+      }
+    }
+
+    private void flipDataBuffers() {
+      for (int i = 0; i < numDataBlocks; i++) {
+        buffers[i].flip();
+      }
+    }
+  }
+
+  private final Coordinator coordinator;
+  private final CellBuffers cellBuffers;
+  private final RawErasureEncoder encoder;
+  private final List<StripedDataStreamer> streamers;
+  private final DFSPacket[] currentPackets; // current Packet of each streamer
+
+  /** Size of each striping cell, must be a multiple of bytesPerChecksum */
+  private final int cellSize;
+  private final int numAllBlocks;
+  private final int numDataBlocks;
+
+  @Override
+  ExtendedBlock getBlock() {
+    return coordinator.getBlockGroup();
+  }
+
+  /** Construct a new output stream for creating a file. */
+  DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
+                         EnumSet<CreateFlag> flag, Progressable progress,
+                         DataChecksum checksum, String[] favoredNodes)
+                         throws IOException {
+    super(dfsClient, src, stat, flag, progress, checksum, favoredNodes);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Creating DFSStripedOutputStream for " + src);
+    }
+
+    final ECSchema schema = stat.getECSchema();
+    final int numParityBlocks = schema.getNumParityUnits();
+    cellSize = stat.getStripeCellSize();
+    numDataBlocks = schema.getNumDataUnits();
+    numAllBlocks = numDataBlocks + numParityBlocks;
+
+    encoder = CodecUtil.createRSRawEncoder(dfsClient.getConfiguration(),
+        numDataBlocks, numParityBlocks);
+
+    coordinator = new Coordinator(dfsClient.getConf(),
+        numDataBlocks, numAllBlocks);
+    try {
+      cellBuffers = new CellBuffers(numParityBlocks);
+    } catch (InterruptedException ie) {
+      throw DFSUtil.toInterruptedIOException(
+          "Failed to create cell buffers", ie);
+    }
+
+    List<StripedDataStreamer> s = new ArrayList<>(numAllBlocks);
+    for (short i = 0; i < numAllBlocks; i++) {
+      StripedDataStreamer streamer = new StripedDataStreamer(stat,
+          dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
+          favoredNodes, i, coordinator);
+      s.add(streamer);
+    }
+    streamers = Collections.unmodifiableList(s);
+    currentPackets = new DFSPacket[streamers.size()];
+    setCurrentStreamer(0);
+  }
+
+  StripedDataStreamer getStripedDataStreamer(int i) {
+    return streamers.get(i);
+  }
+
+  int getCurrentIndex() {
+    return getCurrentStreamer().getIndex();
+  }
+
+  private synchronized StripedDataStreamer getCurrentStreamer() {
+    return (StripedDataStreamer)streamer;
+  }
+
+  private synchronized StripedDataStreamer setCurrentStreamer(int newIdx)
+      throws IOException {
+    // backup currentPacket for current streamer
+    int oldIdx = streamers.indexOf(streamer);
+    if (oldIdx >= 0) {
+      currentPackets[oldIdx] = currentPacket;
+    }
+
+    streamer = streamers.get(newIdx);
+    currentPacket = currentPackets[newIdx];
+    adjustChunkBoundary();
+
+    return getCurrentStreamer();
+  }
+
+  /**
+   * Encode the buffers, i.e. compute parities.
+   *
+   * @param buffers data buffers + parity buffers
+   */
+  private static void encode(RawErasureEncoder encoder, int numData,
+      ByteBuffer[] buffers) {
+    final ByteBuffer[] dataBuffers = new ByteBuffer[numData];
+    final ByteBuffer[] parityBuffers = new ByteBuffer[buffers.length - numData];
+    System.arraycopy(buffers, 0, dataBuffers, 0, dataBuffers.length);
+    System.arraycopy(buffers, numData, parityBuffers, 0, parityBuffers.length);
+
+    encoder.encode(dataBuffers, parityBuffers);
+  }
+
+
+  private void checkStreamers() throws IOException {
+    int count = 0;
+    for(StripedDataStreamer s : streamers) {
+      if (!s.isFailed()) {
+        if (s.getBlock() != null) {
+          s.getErrorState().initExternalError();
+        }
+        count++;
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("checkStreamers: " + streamers);
+      LOG.debug("count=" + count);
+    }
+    if (count < numDataBlocks) {
+      throw new IOException("Failed: the number of remaining blocks = "
+          + count + " < the number of data blocks = " + numDataBlocks);
+    }
+  }
+
+  private void handleStreamerFailure(String err,
+                                     Exception e) throws IOException {
+    LOG.warn("Failed: " + err + ", " + this, e);
+    getCurrentStreamer().setFailed(true);
+    checkStreamers();
+    currentPacket = null;
+  }
+
+  @Override
+  protected synchronized void writeChunk(byte[] bytes, int offset, int len,
+      byte[] checksum, int ckoff, int cklen) throws IOException {
+    final int index = getCurrentIndex();
+    final StripedDataStreamer current = getCurrentStreamer();
+    final int pos = cellBuffers.addTo(index, bytes, offset, len);
+    final boolean cellFull = pos == cellSize;
+
+    final long oldBytes = current.getBytesCurBlock();
+    if (!current.isFailed()) {
+      try {
+        super.writeChunk(bytes, offset, len, checksum, ckoff, cklen);
+      } catch(Exception e) {
+        handleStreamerFailure("offset=" + offset + ", length=" + len, e);
+      }
+    }
+
+    if (current.isFailed()) {
+      final long newBytes = oldBytes + len;
+      coordinator.setBytesEndBlock(index, newBytes, current.getBlock());
+      current.setBytesCurBlock(newBytes);
+    }
+
+    // Two extra steps are needed when a striping cell is full:
+    // 1. Forward the current index pointer
+    // 2. Generate parity packets if a full stripe of data cells are present
+    if (cellFull) {
+      int next = index + 1;
+      //When all data cells in a stripe are ready, we need to encode
+      //them and generate some parity cells. These cells will be
+      //converted to packets and put to their DataStreamer's queue.
+      if (next == numDataBlocks) {
+        cellBuffers.flipDataBuffers();
+        writeParityCells();
+        next = 0;
+      }
+      setCurrentStreamer(next);
+    }
+  }
+
+  private int stripeDataSize() {
+    return numDataBlocks * cellSize;
+  }
+
+  @Override
+  public void hflush() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void hsync() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void start() {
+    for (StripedDataStreamer streamer : streamers) {
+      streamer.start();
+    }
+  }
+
+  @Override
+  synchronized void abort() throws IOException {
+    if (isClosed()) {
+      return;
+    }
+    for (StripedDataStreamer streamer : streamers) {
+      streamer.getLastException().set(new IOException("Lease timeout of "
+          + (dfsClient.getConf().getHdfsTimeout()/1000) +
+          " seconds expired."));
+    }
+    closeThreads(true);
+    dfsClient.endFileLease(fileId);
+  }
+
+  @Override
+  boolean isClosed() {
+    if (closed) {
+      return true;
+    }
+    for(StripedDataStreamer s : streamers) {
+      if (!s.streamerClosed()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  protected void closeThreads(boolean force) throws IOException {
+    final MultipleIOException.Builder b = new MultipleIOException.Builder();
+    try {
+      for (StripedDataStreamer streamer : streamers) {
+        try {
+          streamer.close(force);
+          streamer.join();
+          streamer.closeSocket();
+        } catch (Exception e) {
+          try {
+            handleStreamerFailure("force=" + force, e);
+          } catch (IOException ioe) {
+            b.add(ioe);
+          }
+        } finally {
+          streamer.setSocketToNull();
+        }
+      }
+    } finally {
+      setClosed();
+    }
+    final IOException ioe = b.build();
+    if (ioe != null) {
+      throw ioe;
+    }
+  }
+
+  /**
+   * Simply add bytesCurBlock together. Note that this result is not accurately
+   * the size of the block group.
+   */
+  private long getCurrentSumBytes() {
+    long sum = 0;
+    for (int i = 0; i < numDataBlocks; i++) {
+      sum += streamers.get(i).getBytesCurBlock();
+    }
+    return sum;
+  }
+
+  private void writeParityCellsForLastStripe() throws IOException {
+    final long currentBlockGroupBytes = getCurrentSumBytes();
+    if (currentBlockGroupBytes % stripeDataSize() == 0) {
+      return;
+    }
+
+    final int firstCellSize =
+        (int)(getStripedDataStreamer(0).getBytesCurBlock() % cellSize);
+    final int parityCellSize = firstCellSize > 0 && firstCellSize < cellSize?
+        firstCellSize : cellSize;
+    final ByteBuffer[] buffers = cellBuffers.getBuffers();
+
+    for (int i = 0; i < numAllBlocks; i++) {
+      // Pad zero bytes to make all cells exactly the size of parityCellSize
+      // If internal block is smaller than parity block, pad zero bytes.
+      // Also pad zero bytes to all parity cells
+      final int position = buffers[i].position();
+      assert position <= parityCellSize : "If an internal block is smaller" +
+          " than parity block, then its last cell should be small than last" +
+          " parity cell";
+      for (int j = 0; j < parityCellSize - position; j++) {
+        buffers[i].put((byte) 0);
+      }
+      buffers[i].flip();
+    }
+
+    writeParityCells();
+  }
+
+  void writeParityCells() throws IOException {
+    final ByteBuffer[] buffers = cellBuffers.getBuffers();
+    //encode the data cells
+    encode(encoder, numDataBlocks, buffers);
+    for (int i = numDataBlocks; i < numAllBlocks; i++) {
+      writeParity(i, buffers[i], cellBuffers.getChecksumArray(i));
+    }
+    cellBuffers.clear();
+  }
+
+  void writeParity(int index, ByteBuffer buffer, byte[] checksumBuf
+      ) throws IOException {
+    final StripedDataStreamer current = setCurrentStreamer(index);
+    final int len = buffer.limit();
+
+    final long oldBytes = current.getBytesCurBlock();
+    if (!current.isFailed()) {
+      try {
+        DataChecksum sum = getDataChecksum();
+        sum.calculateChunkedSums(buffer.array(), 0, len, checksumBuf, 0);
+        for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
+          int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
+          int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
+          super.writeChunk(buffer.array(), i, chunkLen, checksumBuf, ckOffset,
+              getChecksumSize());
+        }
+      } catch(Exception e) {
+        handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e);
+      }
+    }
+
+    if (current.isFailed()) {
+      final long newBytes = oldBytes + len;
+      current.setBytesCurBlock(newBytes);
+    }
+  }
+
+  @Override
+  void setClosed() {
+    super.setClosed();
+    for (int i = 0; i < numAllBlocks; i++) {
+      streamers.get(i).release();
+    }
+    cellBuffers.release();
+  }
+
+  @Override
+  protected synchronized void closeImpl() throws IOException {
+    if (isClosed()) {
+      final MultipleIOException.Builder b = new MultipleIOException.Builder();
+      for(int i = 0; i < streamers.size(); i++) {
+        final StripedDataStreamer si = getStripedDataStreamer(i);
+        try {
+          si.getLastException().check(true);
+        } catch (IOException e) {
+          b.add(e);
+        }
+      }
+      final IOException ioe = b.build();
+      if (ioe != null) {
+        throw ioe;
+      }
+      return;
+    }
+
+    try {
+      // flush from all upper layers
+      try {
+        flushBuffer();
+        // if the last stripe is incomplete, generate and write parity cells
+        writeParityCellsForLastStripe();
+        enqueueAllCurrentPackets();
+      } catch(Exception e) {
+        handleStreamerFailure("closeImpl", e);
+      }
+
+      for (int i = 0; i < numAllBlocks; i++) {
+        final StripedDataStreamer s = setCurrentStreamer(i);
+        if (!s.isFailed()) {
+          try {
+            if (s.getBytesCurBlock() > 0) {
+              setCurrentPacketToEmpty();
+            }
+            // flush all data to Datanode
+            flushInternal();
+          } catch(Exception e) {
+            handleStreamerFailure("closeImpl", e);
+          }
+        }
+      }
+
+      closeThreads(false);
+      final ExtendedBlock lastBlock = coordinator.getBlockGroup();
+      TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
+      try {
+        completeFile(lastBlock);
+      } finally {
+        scope.close();
+      }
+      dfsClient.endFileLease(fileId);
+    } catch (ClosedChannelException ignored) {
+    } finally {
+      setClosed();
+    }
+  }
+
+  private void enqueueAllCurrentPackets() throws IOException {
+    int idx = streamers.indexOf(getCurrentStreamer());
+    for(int i = 0; i < streamers.size(); i++) {
+      setCurrentStreamer(i);
+      if (currentPacket != null) {
+        enqueueCurrentPacket();
+      }
+    }
+    setCurrentStreamer(idx);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index cae56c0..c06a435 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -36,6 +36,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_PAS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.io.PrintStream;
 import java.io.UnsupportedEncodingException;
 import java.net.InetAddress;
@@ -55,7 +56,6 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import javax.net.SocketFactory;
 
-import com.google.common.collect.Sets;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.Option;
@@ -96,6 +96,7 @@ import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import com.google.protobuf.BlockingService;
 
 @InterfaceAudience.Private
@@ -1527,4 +1528,10 @@ public class DFSUtil {
         DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "").isEmpty();
   }
 
+  public static InterruptedIOException toInterruptedIOException(String message,
+      InterruptedException e) {
+    final InterruptedIOException iioe = new InterruptedIOException(message);
+    iioe.initCause(e);
+    return iioe;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 8dd85b7..c78199e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -209,6 +209,7 @@ class DataStreamer extends Daemon {
 
   static class ErrorState {
     private boolean error = false;
+    private boolean externalError = false;
     private int badNodeIndex = -1;
     private int restartingNodeIndex = -1;
     private long restartingNodeDeadline = 0;
@@ -220,6 +221,7 @@ class DataStreamer extends Daemon {
 
     synchronized void reset() {
       error = false;
+      externalError = false;
       badNodeIndex = -1;
       restartingNodeIndex = -1;
       restartingNodeDeadline = 0;
@@ -229,14 +231,24 @@ class DataStreamer extends Daemon {
       return error;
     }
 
+    synchronized boolean hasExternalErrorOnly() {
+      return error && externalError && !isNodeMarked();
+    }
+
     synchronized boolean hasDatanodeError() {
-      return error && isNodeMarked();
+      return error && (isNodeMarked() || externalError);
     }
 
     synchronized void setError(boolean err) {
       this.error = err;
     }
 
+    synchronized void initExternalError() {
+      setError(true);
+      this.externalError = true;
+    }
+
+
     synchronized void setBadNodeIndex(int index) {
       this.badNodeIndex = index;
     }
@@ -335,7 +347,7 @@ class DataStreamer extends Daemon {
   }
 
   private volatile boolean streamerClosed = false;
-  private ExtendedBlock block; // its length is number of bytes acked
+  protected ExtendedBlock block; // its length is number of bytes acked
   private Token<BlockTokenIdentifier> accessToken;
   private DataOutputStream blockStream;
   private DataInputStream blockReplyStream;
@@ -366,12 +378,12 @@ class DataStreamer extends Daemon {
   private final LastExceptionInStreamer lastException = new LastExceptionInStreamer();
   private Socket s;
 
-  private final DFSClient dfsClient;
-  private final String src;
+  protected final DFSClient dfsClient;
+  protected final String src;
   /** Only for DataTransferProtocol.writeBlock(..) */
   private final DataChecksum checksum4WriteBlock;
   private final Progressable progress;
-  private final HdfsFileStatus stat;
+  protected final HdfsFileStatus stat;
   // appending to existing partial block
   private volatile boolean appendChunk = false;
   // both dataQueue and ackQueue are protected by dataQueue lock
@@ -397,11 +409,13 @@ class DataStreamer extends Daemon {
   private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
   private final String[] favoredNodes;
 
-  private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src,
+  private DataStreamer(HdfsFileStatus stat, ExtendedBlock block,
+                       DFSClient dfsClient, String src,
                        Progressable progress, DataChecksum checksum,
                        AtomicReference<CachingStrategy> cachingStrategy,
                        ByteArrayManager byteArrayManage,
                        boolean isAppend, String[] favoredNodes) {
+    this.block = block;
     this.dfsClient = dfsClient;
     this.src = src;
     this.progress = progress;
@@ -426,9 +440,8 @@ class DataStreamer extends Daemon {
                String src, Progressable progress, DataChecksum checksum,
                AtomicReference<CachingStrategy> cachingStrategy,
                ByteArrayManager byteArrayManage, String[] favoredNodes) {
-    this(stat, dfsClient, src, progress, checksum, cachingStrategy,
+    this(stat, block, dfsClient, src, progress, checksum, cachingStrategy,
         byteArrayManage, false, favoredNodes);
-    this.block = block;
     stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
   }
 
@@ -442,10 +455,9 @@ class DataStreamer extends Daemon {
                String src, Progressable progress, DataChecksum checksum,
                AtomicReference<CachingStrategy> cachingStrategy,
                ByteArrayManager byteArrayManage) throws IOException {
-    this(stat, dfsClient, src, progress, checksum, cachingStrategy,
+    this(stat, lastBlock.getBlock(), dfsClient, src, progress, checksum, cachingStrategy,
         byteArrayManage, true, null);
     stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
-    block = lastBlock.getBlock();
     bytesSent = block.getNumBytes();
     accessToken = lastBlock.getBlockToken();
   }
@@ -488,7 +500,7 @@ class DataStreamer extends Daemon {
     stage = BlockConstructionStage.DATA_STREAMING;
   }
 
-  private void endBlock() {
+  protected void endBlock() {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Closing old block " + block);
     }
@@ -574,7 +586,7 @@ class DataStreamer extends Daemon {
         // get new block from namenode.
         if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
           if(LOG.isDebugEnabled()) {
-            LOG.debug("Allocating new block");
+            LOG.debug("Allocating new block " + this);
           }
           setPipeline(nextBlockOutputStream());
           initDataStreaming();
@@ -592,10 +604,7 @@ class DataStreamer extends Daemon {
         long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
         if (lastByteOffsetInBlock > stat.getBlockSize()) {
           throw new IOException("BlockSize " + stat.getBlockSize() +
-              " is smaller than data size. " +
-              " Offset of packet in block " +
-              lastByteOffsetInBlock +
-              " Aborting file " + src);
+              " < lastByteOffsetInBlock, " + this + ", " + one);
         }
 
         if (one.isLastPacketInBlock()) {
@@ -1069,6 +1078,10 @@ class DataStreamer extends Daemon {
     if (!errorState.hasDatanodeError()) {
       return false;
     }
+    if (errorState.hasExternalErrorOnly() && block == null) {
+      // block is not yet initialized, handle external error later.
+      return false;
+    }
     if (response != null) {
       LOG.info("Error Recovery for " + block +
           " waiting for responder to exit. ");
@@ -1397,15 +1410,28 @@ class DataStreamer extends Daemon {
   }
 
   LocatedBlock updateBlockForPipeline() throws IOException {
+    return callUpdateBlockForPipeline(block);
+  }
+
+  LocatedBlock callUpdateBlockForPipeline(ExtendedBlock newBlock) throws IOException {
     return dfsClient.namenode.updateBlockForPipeline(
-        block, dfsClient.clientName);
+        newBlock, dfsClient.clientName);
+  }
+
+  static ExtendedBlock newBlock(ExtendedBlock b, final long newGS) {
+    return new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(),
+        b.getNumBytes(), newGS);
   }
 
   /** update pipeline at the namenode */
   ExtendedBlock updatePipeline(long newGS) throws IOException {
-    final ExtendedBlock newBlock = new ExtendedBlock(
-        block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
-    dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
+    final ExtendedBlock newBlock = newBlock(block, newGS);
+    return callUpdatePipeline(block, newBlock);
+  }
+
+  ExtendedBlock callUpdatePipeline(ExtendedBlock oldBlock, ExtendedBlock newBlock)
+      throws IOException {
+    dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock, newBlock,
         nodes, storageIDs);
     return newBlock;
   }
@@ -1738,6 +1764,10 @@ class DataStreamer extends Daemon {
     return accessToken;
   }
 
+  ErrorState getErrorState() {
+    return errorState;
+  }
+
   /**
    * Put a packet to the data queue
    *
@@ -1750,7 +1780,7 @@ class DataStreamer extends Daemon {
       dataQueue.addLast(packet);
       lastQueuedSeqno = packet.getSeqno();
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Queued packet " + packet.getSeqno());
+        LOG.debug("Queued " + packet + ", " + this);
       }
       dataQueue.notifyAll();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 0197cfb..4c9f9cb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
@@ -89,6 +90,7 @@ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Credentials;
@@ -2298,4 +2300,68 @@ public class DistributedFileSystem extends FileSystem {
       throws IOException {
     return dfs.getInotifyEventStream(lastReadTxid);
   }
+
+  /**
+   * Create the erasurecoding zone
+   * 
+   * @param path Directory to create the ec zone
+   * @param schema ECSchema for the zone. If not specified default will be used.
+   * @param cellSize Cellsize for the striped erasure coding
+   * @throws IOException
+   */
+  public void createErasureCodingZone(final Path path, final ECSchema schema,
+      final int cellSize) throws IOException {
+    Path absF = fixRelativePart(path);
+    new FileSystemLinkResolver<Void>() {
+      @Override
+      public Void doCall(final Path p) throws IOException,
+          UnresolvedLinkException {
+        dfs.createErasureCodingZone(getPathName(p), schema, cellSize);
+        return null;
+      }
+
+      @Override
+      public Void next(final FileSystem fs, final Path p) throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          DistributedFileSystem myDfs = (DistributedFileSystem) fs;
+          myDfs.createErasureCodingZone(p, schema, cellSize);
+          return null;
+        }
+        throw new UnsupportedOperationException(
+            "Cannot createErasureCodingZone through a symlink to a "
+                + "non-DistributedFileSystem: " + path + " -> " + p);
+      }
+    }.resolve(this, absF);
+  }
+
+  /**
+   * Get ErasureCoding zone information for the specified path
+   * 
+   * @param path
+   * @return Returns the zone information if path is in EC zone, null otherwise
+   * @throws IOException
+   */
+  public ErasureCodingZone getErasureCodingZone(final Path path)
+      throws IOException {
+    Path absF = fixRelativePart(path);
+    return new FileSystemLinkResolver<ErasureCodingZone>() {
+      @Override
+      public ErasureCodingZone doCall(final Path p) throws IOException,
+          UnresolvedLinkException {
+        return dfs.getErasureCodingZone(getPathName(p));
+      }
+
+      @Override
+      public ErasureCodingZone next(final FileSystem fs, final Path p)
+          throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          DistributedFileSystem myDfs = (DistributedFileSystem) fs;
+          return myDfs.getErasureCodingZone(p);
+        }
+        throw new UnsupportedOperationException(
+            "Cannot getErasureCodingZone through a symlink to a "
+                + "non-DistributedFileSystem: " + path + " -> " + p);
+      }
+    }.resolve(this, absF);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
index d70f419..70cce7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
@@ -505,4 +505,9 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
   public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
     return null;
   }
+
+  @Override
+  public DataChecksum getDataChecksum() {
+    return checksum;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
index c368d65..cce44b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
@@ -474,4 +474,9 @@ public class RemoteBlockReader2  implements BlockReader {
   public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
     return null;
   }
+
+  @Override
+  public DataChecksum getDataChecksum() {
+    return checksum;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
new file mode 100644
index 0000000..2d51dc4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.hdfs.DFSStripedOutputStream.Coordinator;
+import org.apache.hadoop.hdfs.DFSStripedOutputStream.MultipleBlockingQueue;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.util.ByteArrayManager;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * This class extends {@link DataStreamer} to support writing striped blocks
+ * to datanodes.
+ * A {@link DFSStripedOutputStream} has multiple {@link StripedDataStreamer}s.
+ * Whenever the streamers need to talk the namenode, only the fastest streamer
+ * sends an rpc call to the namenode and then populates the result for the
+ * other streamers.
+ */
+public class StripedDataStreamer extends DataStreamer {
+  /**
+   * This class is designed for multiple threads to share a
+   * {@link MultipleBlockingQueue}. Initially, the queue is empty. The earliest
+   * thread calling poll populates entries to the queue and the other threads
+   * will wait for it. Once the entries are populated, all the threads can poll
+   * their entries.
+   *
+   * @param <T> the queue entry type.
+   */
+  static abstract class ConcurrentPoll<T> {
+    private final MultipleBlockingQueue<T> queue;
+
+    ConcurrentPoll(MultipleBlockingQueue<T> queue) {
+      this.queue = queue;
+    }
+
+    T poll(final int i) throws IOException {
+      for(;;) {
+        synchronized(queue) {
+          final T polled = queue.poll(i);
+          if (polled != null) { // already populated; return polled item.
+            return polled;
+          }
+          if (isReady2Populate()) {
+            populate();
+            return queue.poll(i);
+          }
+        }
+
+        // sleep and then retry.
+        try {
+          Thread.sleep(100);
+        } catch(InterruptedException ie) {
+          throw DFSUtil.toInterruptedIOException(
+              "Sleep interrupted during poll", ie);
+        }
+      }
+    }
+
+    boolean isReady2Populate() {
+      return queue.isEmpty();
+    }
+
+    abstract void populate() throws IOException;
+  }
+
+  private final Coordinator coordinator;
+  private final int index;
+  private volatile boolean failed;
+
+  StripedDataStreamer(HdfsFileStatus stat,
+                      DFSClient dfsClient, String src,
+                      Progressable progress, DataChecksum checksum,
+                      AtomicReference<CachingStrategy> cachingStrategy,
+                      ByteArrayManager byteArrayManage, String[] favoredNodes,
+                      short index, Coordinator coordinator) {
+    super(stat, null, dfsClient, src, progress, checksum, cachingStrategy,
+        byteArrayManage, favoredNodes);
+    this.index = index;
+    this.coordinator = coordinator;
+  }
+
+  int getIndex() {
+    return index;
+  }
+
+  void setFailed(boolean failed) {
+    this.failed = failed;
+  }
+
+  boolean isFailed() {
+    return failed;
+  }
+
+  private boolean isParityStreamer() {
+    return index >= NUM_DATA_BLOCKS;
+  }
+
+  @Override
+  protected void endBlock() {
+    if (!isParityStreamer()) {
+      coordinator.offerEndBlock(index, block);
+    }
+    super.endBlock();
+  }
+
+  @Override
+  protected LocatedBlock locateFollowingBlock(final DatanodeInfo[] excludedNodes)
+      throws IOException {
+    final MultipleBlockingQueue<LocatedBlock> followingBlocks
+        = coordinator.getFollowingBlocks();
+    return new ConcurrentPoll<LocatedBlock>(followingBlocks) {
+      @Override
+      boolean isReady2Populate() {
+        return super.isReady2Populate()
+            && (block == null || coordinator.hasAllEndBlocks());
+      }
+
+      @Override
+      void populate() throws IOException {
+        getLastException().check(false);
+
+        if (block != null) {
+          // set numByte for the previous block group
+          long bytes = 0;
+          for (int i = 0; i < NUM_DATA_BLOCKS; i++) {
+            final ExtendedBlock b = coordinator.takeEndBlock(i);
+            StripedBlockUtil.checkBlocks(index, block, i, b);
+            bytes += b.getNumBytes();
+          }
+          block.setNumBytes(bytes);
+          block.setBlockId(block.getBlockId() - index);
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("locateFollowingBlock: index=" + index + ", block=" + block);
+        }
+
+        final LocatedBlock lb = StripedDataStreamer.super.locateFollowingBlock(
+            excludedNodes);
+        final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
+            (LocatedStripedBlock)lb,
+            BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
+
+        for (int i = 0; i < blocks.length; i++) {
+          if (!coordinator.getStripedDataStreamer(i).isFailed()) {
+            if (blocks[i] == null) {
+              getLastException().set(
+                  new IOException("Failed to get following block, i=" + i));
+            } else {
+              followingBlocks.offer(i, blocks[i]);
+            }
+          }
+        }
+      }
+    }.poll(index);
+  }
+
+  @Override
+  LocatedBlock updateBlockForPipeline() throws IOException {
+    final MultipleBlockingQueue<LocatedBlock> newBlocks
+        = coordinator.getNewBlocks();
+    return new ConcurrentPoll<LocatedBlock>(newBlocks) {
+      @Override
+      void populate() throws IOException {
+        final ExtendedBlock bg = coordinator.getBlockGroup();
+        final LocatedBlock updated = callUpdateBlockForPipeline(bg);
+        final long newGS = updated.getBlock().getGenerationStamp();
+        final LocatedBlock[] updatedBlks = StripedBlockUtil
+            .parseStripedBlockGroup((LocatedStripedBlock) updated,
+                BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
+        for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) {
+          final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock();
+          if (bi != null) {
+            final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS),
+                null, null, null, -1, updated.isCorrupt(), null);
+            lb.setBlockToken(updatedBlks[i].getBlockToken());
+            newBlocks.offer(i, lb);
+          } else {
+            final LocatedBlock lb = coordinator.getFollowingBlocks().peek(i);
+            lb.getBlock().setGenerationStamp(newGS);
+          }
+        }
+      }
+    }.poll(index);
+  }
+
+  @Override
+  ExtendedBlock updatePipeline(final long newGS) throws IOException {
+    final MultipleBlockingQueue<ExtendedBlock> updateBlocks
+        = coordinator.getUpdateBlocks();
+    return new ConcurrentPoll<ExtendedBlock>(updateBlocks) {
+      @Override
+      void populate() throws IOException {
+        final ExtendedBlock bg = coordinator.getBlockGroup();
+        final ExtendedBlock newBG = newBlock(bg, newGS);
+        final ExtendedBlock updated = callUpdatePipeline(bg, newBG);
+        for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) {
+          final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock();
+          updateBlocks.offer(i, newBlock(bi, updated.getGenerationStamp()));
+        }
+      }
+    }.poll(index);
+  }
+
+  @Override
+  public String toString() {
+    return "#" + index + ": failed? " + Boolean.toString(failed).charAt(0)
+        + ", " + super.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
index 84499bb..5a3c885 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
@@ -37,9 +37,11 @@ import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 
 /**
  * The public API for performing administrative functions on HDFS. Those writing
@@ -363,4 +365,42 @@ public class HdfsAdmin {
       throws IOException {
     dfs.setStoragePolicy(src, policyName);
   }
+
+  /**
+   * Create the ErasureCoding zone
+   *
+   * @param path
+   *          Directory to create the ErasureCoding zone
+   * @param schema
+   *          ECSchema for the zone. If not specified default will be used.
+   * @param cellSize
+   *          Cellsize for the striped ErasureCoding
+   * @throws IOException
+   */
+  public void createErasureCodingZone(final Path path, final ECSchema schema,
+      final int cellSize) throws IOException {
+    dfs.createErasureCodingZone(path, schema, cellSize);
+  }
+
+  /**
+   * Get the ErasureCoding zone information for the specified path
+   *
+   * @param path
+   * @return Returns the zone information if path is in EC zone, null otherwise
+   * @throws IOException
+   */
+  public ErasureCodingZone getErasureCodingZone(final Path path)
+      throws IOException {
+    return dfs.getErasureCodingZone(path);
+  }
+
+  /**
+   * Get the ErasureCoding schemas supported.
+   *
+   * @return ECSchemas
+   * @throws IOException
+   */
+  public ECSchema[] getECSchemas() throws IOException {
+    return dfs.getClient().getECSchemas();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
index a257e32..9aef436 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
@@ -38,6 +38,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIM
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -101,6 +102,9 @@ public class DfsClientConf {
   private final long hedgedReadThresholdMillis;
   private final int hedgedReadThreadpoolSize;
 
+  private final int stripedReadThreadpoolSize;
+
+
   public DfsClientConf(Configuration conf) {
     // The hdfsTimeout is currently the same as the ipc timeout 
     hdfsTimeout = Client.getTimeout(conf);
@@ -191,7 +195,7 @@ public class DfsClientConf {
     connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
         DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
     hdfsBlocksMetadataEnabled = conf.getBoolean(
-        DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, 
+        DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
         DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
     fileBlockStorageLocationsNumThreads = conf.getInt(
         DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS,
@@ -215,6 +219,13 @@ public class DfsClientConf {
     hedgedReadThreadpoolSize = conf.getInt(
         HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY,
         HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT);
+
+    stripedReadThreadpoolSize = conf.getInt(
+        HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY,
+        HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_DEFAULT);
+    Preconditions.checkArgument(stripedReadThreadpoolSize > 0, "The value of " +
+        HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY +
+        " must be greater than 0.");
   }
 
   private DataChecksum.Type getChecksumType(Configuration conf) {
@@ -492,6 +503,13 @@ public class DfsClientConf {
   }
 
   /**
+   * @return the stripedReadThreadpoolSize
+   */
+  public int getStripedReadThreadpoolSize() {
+    return stripedReadThreadpoolSize;
+  }
+
+  /**
    * @return the shortCircuitConf
    */
   public ShortCircuitConf getShortCircuitConf() {
@@ -744,4 +762,4 @@ public class DfsClientConf {
       return builder.toString();
     }
   }
-}
\ No newline at end of file
+}


Mime
View raw message