hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ji...@apache.org
Subject hadoop git commit: HDFS-8254. Avoid assigning a leading streamer in StripedDataStreamer to tolerate datanode failure. Contributed by Tsz Wo Nicholas Sze.
Date Fri, 19 Jun 2015 17:24:01 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7285 05c696882 -> 3682e0198


HDFS-8254. Avoid assigning a leading streamer in StripedDataStreamer to tolerate datanode
failure. Contributed by Tsz Wo Nicholas Sze.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3682e019
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3682e019
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3682e019

Branch: refs/heads/HDFS-7285
Commit: 3682e01984b6d93b35376532da8a8823d69239df
Parents: 05c6968
Author: Jing Zhao <jing9@apache.org>
Authored: Fri Jun 19 10:23:45 2015 -0700
Committer: Jing Zhao <jing9@apache.org>
Committed: Fri Jun 19 10:23:45 2015 -0700

----------------------------------------------------------------------
 .../hdfs/client/HdfsClientConfigKeys.java       |  10 -
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   5 +-
 .../hadoop/hdfs/DFSStripedOutputStream.java     | 192 +++++++++--------
 .../org/apache/hadoop/hdfs/DataStreamer.java    |  49 +++--
 .../apache/hadoop/hdfs/StripedDataStreamer.java | 206 +++++++++++++------
 .../hadoop/hdfs/client/impl/DfsClientConf.java  |  23 ---
 .../hadoop/hdfs/util/StripedBlockUtil.java      |  24 +--
 .../TestDFSStripedOutputStreamWithFailure.java  |   7 +
 8 files changed, 307 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3682e019/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index 9373e98..6006d71 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -189,16 +189,6 @@ public interface HdfsClientConfigKeys {
     int     THREADPOOL_SIZE_DEFAULT = 18;
   }
 
-  /** dfs.client.write.striped configuration properties */
-  interface StripedWrite {
-    String PREFIX = Write.PREFIX + "striped.";
-
-    String  MAX_SECONDS_GET_STRIPED_BLOCK_KEY = PREFIX + "max-seconds-get-striped-block";
-    int     MAX_SECONDS_GET_STRIPED_BLOCK_DEFAULT = 90;
-    String  MAX_SECONDS_GET_ENDED_BLOCK_KEY = PREFIX + "max-seconds-get-ended-block";
-    int     MAX_SECONDS_GET_ENDED_BLOCK_DEFAULT = 60;
-  }
-
   /** dfs.http.client configuration properties */
   interface HttpClient {
     String  PREFIX = "dfs.http.client.";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3682e019/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index f41d30a..a710c2e 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -307,4 +307,7 @@
     StripedBlocksFeature. (Walter Su via jing9)
 
     HDFS-8466. Refactor BlockInfoContiguous and fix NPE in
-    TestBlockInfo#testCopyConstructor() (vinayakumarb)
\ No newline at end of file
+    TestBlockInfo#testCopyConstructor() (vinayakumarb)
+
+    HDFS-8254. Avoid assigning a leading streamer in StripedDataStreamer to
+    tolerate datanode failure. (Tsz Wo Nicholas Sze via jing9)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3682e019/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index bdd3352..1068b37 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -28,7 +28,6 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -40,7 +39,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.io.erasurecode.CodecUtil;
 import org.apache.hadoop.io.erasurecode.ECSchema;
-import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder;
 import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
@@ -51,27 +49,33 @@ import org.apache.htrace.TraceScope;
 import com.google.common.base.Preconditions;
 
 
-/****************************************************************
- * The DFSStripedOutputStream class supports writing files in striped
- * layout. Each stripe contains a sequence of cells and multiple
- * {@link StripedDataStreamer}s in DFSStripedOutputStream are responsible
- * for writing the cells to different datanodes.
- *
- ****************************************************************/
-
+/**
+ * This class supports writing files in striped layout and erasure coded format.
+ * Each stripe contains a sequence of cells.
+ */
 @InterfaceAudience.Private
 public class DFSStripedOutputStream extends DFSOutputStream {
   static class MultipleBlockingQueue<T> {
-    private final int pullTimeout;
     private final List<BlockingQueue<T>> queues;
 
-    MultipleBlockingQueue(int numQueue, int queueSize, int pullTimeout) {
+    MultipleBlockingQueue(int numQueue, int queueSize) {
       queues = new ArrayList<>(numQueue);
       for (int i = 0; i < numQueue; i++) {
         queues.add(new LinkedBlockingQueue<T>(queueSize));
       }
+    }
+
+    boolean isEmpty() {
+      for(int i = 0; i < queues.size(); i++) {
+        if (!queues.get(i).isEmpty()) {
+          return false;
+        }
+      }
+      return true;
+    }
 
-      this.pullTimeout = pullTimeout;
+    int numQueues() {
+      return queues.size();
     }
 
     void offer(int i, T object) {
@@ -80,49 +84,71 @@ public class DFSStripedOutputStream extends DFSOutputStream {
           + " to queue, i=" + i);
     }
 
-    T poll(int i) throws InterruptedIOException {
+    T take(int i) throws InterruptedIOException {
       try {
-        return queues.get(i).poll(pullTimeout, TimeUnit.SECONDS);
-      } catch (InterruptedException e) {
-        throw DFSUtil.toInterruptedIOException("poll interrupted, i=" + i, e);
+        return queues.get(i).take();
+      } catch(InterruptedException ie) {
+        throw DFSUtil.toInterruptedIOException("take interrupted, i=" + i, ie);
       }
     }
 
+    T poll(int i) {
+      return queues.get(i).poll();
+    }
+
     T peek(int i) {
       return queues.get(i).peek();
     }
   }
 
   /** Coordinate the communication between the streamers. */
-  static class Coordinator {
-    private final MultipleBlockingQueue<LocatedBlock> stripedBlocks;
+  class Coordinator {
+    private final MultipleBlockingQueue<LocatedBlock> followingBlocks;
     private final MultipleBlockingQueue<ExtendedBlock> endBlocks;
+
+    private final MultipleBlockingQueue<LocatedBlock> newBlocks;
     private final MultipleBlockingQueue<ExtendedBlock> updateBlocks;
 
     Coordinator(final DfsClientConf conf, final int numDataBlocks,
         final int numAllBlocks) {
-      stripedBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1,
-          conf.getStripedWriteMaxSecondsGetStripedBlock());
-      endBlocks = new MultipleBlockingQueue<>(numDataBlocks, 1,
-          conf.getStripedWriteMaxSecondsGetEndedBlock());
-      updateBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1,
-          conf.getStripedWriteMaxSecondsGetStripedBlock());
+      followingBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
+      endBlocks = new MultipleBlockingQueue<>(numDataBlocks, 1);
+
+      newBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
+      updateBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1);
     }
 
-    void putEndBlock(int i, ExtendedBlock block) {
-      endBlocks.offer(i, block);
+    MultipleBlockingQueue<LocatedBlock> getFollowingBlocks() {
+      return followingBlocks;
+    }
+
+    MultipleBlockingQueue<LocatedBlock> getNewBlocks() {
+      return newBlocks;
     }
 
-    ExtendedBlock getEndBlock(int i) throws InterruptedIOException {
-      return endBlocks.poll(i);
+    MultipleBlockingQueue<ExtendedBlock> getUpdateBlocks() {
+      return updateBlocks;
     }
 
-    void putUpdateBlock(int i, ExtendedBlock block) {
-      updateBlocks.offer(i, block);
+    StripedDataStreamer getStripedDataStreamer(int i) {
+      return DFSStripedOutputStream.this.getStripedDataStreamer(i);
     }
 
-    ExtendedBlock getUpdateBlock(int i) throws InterruptedIOException {
-      return updateBlocks.poll(i);
+    void offerEndBlock(int i, ExtendedBlock block) {
+      endBlocks.offer(i, block);
+    }
+
+    ExtendedBlock takeEndBlock(int i) throws InterruptedIOException {
+      return endBlocks.take(i);
+    }
+
+    boolean hasAllEndBlocks() {
+      for(int i = 0; i < endBlocks.numQueues(); i++) {
+        if (endBlocks.peek(i) == null) {
+          return false;
+        }
+      }
+      return true;
     }
 
     void setBytesEndBlock(int i, long newBytes, ExtendedBlock block) {
@@ -130,24 +156,35 @@ public class DFSStripedOutputStream extends DFSOutputStream {
       if (b == null) {
         // streamer just has failed, put end block and continue
         b = block;
-        putEndBlock(i, b);
+        offerEndBlock(i, b);
       }
       b.setNumBytes(newBytes);
     }
 
-    void putStripedBlock(int i, LocatedBlock block) throws IOException {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("putStripedBlock " + block + ", i=" + i);
+    /** @return a block representing the entire block group. */
+    ExtendedBlock getBlockGroup() {
+      final StripedDataStreamer s0 = getStripedDataStreamer(0);
+      final ExtendedBlock b0 = s0.getBlock();
+      if (b0 == null) {
+        return null;
       }
-      stripedBlocks.offer(i, block);
-    }
 
-    LocatedBlock getStripedBlock(int i) throws IOException {
-      final LocatedBlock lb = stripedBlocks.poll(i);
-      if (lb == null) {
-        throw new IOException("Failed: i=" + i);
+      final boolean atBlockGroupBoundary = s0.getBytesCurBlock() == 0 && b0.getNumBytes()
> 0;
+      final ExtendedBlock block = new ExtendedBlock(b0);
+      long numBytes = b0.getNumBytes();
+      for (int i = 1; i < numDataBlocks; i++) {
+        final StripedDataStreamer si = getStripedDataStreamer(i);
+        final ExtendedBlock bi = si.getBlock();
+        if (bi != null && bi.getGenerationStamp() > block.getGenerationStamp())
{
+          block.setGenerationStamp(bi.getGenerationStamp());
+        }
+        numBytes += atBlockGroupBoundary? bi.getNumBytes(): si.getBytesCurBlock();
       }
-      return lb;
+      block.setNumBytes(numBytes);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("getBlockGroup: " + block + ", numBytes=" + block.getNumBytes());
+      }
+      return block;
     }
   }
 
@@ -223,13 +260,9 @@ public class DFSStripedOutputStream extends DFSOutputStream {
   private final int numAllBlocks;
   private final int numDataBlocks;
 
-  private StripedDataStreamer getLeadingStreamer() {
-    return streamers.get(0);
-  }
-
   @Override
   ExtendedBlock getBlock() {
-    return getLeadingStreamer().getBlock();
+    return coordinator.getBlockGroup();
   }
 
   /** Construct a new output stream for creating a file. */
@@ -308,7 +341,9 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     int count = 0;
     for(StripedDataStreamer s : streamers) {
       if (!s.isFailed()) {
-        s.getErrorState().initExtenalError();
+        if (s.getBlock() != null) {
+          s.getErrorState().initExternalError();
+        }
         count++;
       }
     }
@@ -325,7 +360,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
   private void handleStreamerFailure(String err,
                                      Exception e) throws IOException {
     LOG.warn("Failed: " + err + ", " + this, e);
-    getCurrentStreamer().setIsFailed(true);
+    getCurrentStreamer().setFailed(true);
     checkStreamers();
     currentPacket = null;
   }
@@ -443,10 +478,17 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     dfsClient.endFileLease(fileId);
   }
 
-  //TODO: Handle slow writers (HDFS-7786)
-  //Cuurently only check if the leading streamer is terminated
+  @Override
   boolean isClosed() {
-    return closed || getLeadingStreamer().streamerClosed();
+    if (closed) {
+      return true;
+    }
+    for(StripedDataStreamer s : streamers) {
+      if (!s.streamerClosed()) {
+        return false;
+      }
+    }
+    return true;
   }
 
   @Override
@@ -560,7 +602,19 @@ public class DFSStripedOutputStream extends DFSOutputStream {
   @Override
   protected synchronized void closeImpl() throws IOException {
     if (isClosed()) {
-      getLeadingStreamer().getLastException().check(true);
+      final MultipleIOException.Builder b = new MultipleIOException.Builder();
+      for(int i = 0; i < streamers.size(); i++) {
+        final StripedDataStreamer si = getStripedDataStreamer(i);
+        try {
+          si.getLastException().check(true);
+        } catch (IOException e) {
+          b.add(e);
+        }
+      }
+      final IOException ioe = b.build();
+      if (ioe != null) {
+        throw ioe;
+      }
       return;
     }
 
@@ -594,7 +648,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
       }
 
       closeThreads(false);
-      final ExtendedBlock lastBlock = getCommittedBlock();
+      final ExtendedBlock lastBlock = coordinator.getBlockGroup();
       TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
       try {
         completeFile(lastBlock);
@@ -607,30 +661,4 @@ public class DFSStripedOutputStream extends DFSOutputStream {
       setClosed();
     }
   }
-
-  /**
-   * Generate the block which is reported and will be committed in NameNode.
-   * Need to go through all the streamers writing data blocks and add their
-   * bytesCurBlock together. Note that at this time all streamers have been
-   * closed. Also this calculation can cover streamers with writing failures.
-   *
-   * @return An ExtendedBlock with size of the whole block group.
-   */
-  ExtendedBlock getCommittedBlock() throws IOException {
-    ExtendedBlock b = getLeadingStreamer().getBlock();
-    if (b == null) {
-      return null;
-    }
-    final ExtendedBlock block = new ExtendedBlock(b);
-    final boolean atBlockGroupBoundary =
-        getLeadingStreamer().getBytesCurBlock() == 0 &&
-            getLeadingStreamer().getBlock() != null &&
-            getLeadingStreamer().getBlock().getNumBytes() > 0;
-    for (int i = 1; i < numDataBlocks; i++) {
-      block.setNumBytes(block.getNumBytes() +
-          (atBlockGroupBoundary ? streamers.get(i).getBlock().getNumBytes() :
-              streamers.get(i).getBytesCurBlock()));
-    }
-    return block;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3682e019/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 1344d54..c78199e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -209,7 +209,7 @@ class DataStreamer extends Daemon {
 
   static class ErrorState {
     private boolean error = false;
-    private boolean extenalError = false;
+    private boolean externalError = false;
     private int badNodeIndex = -1;
     private int restartingNodeIndex = -1;
     private long restartingNodeDeadline = 0;
@@ -221,7 +221,7 @@ class DataStreamer extends Daemon {
 
     synchronized void reset() {
       error = false;
-      extenalError = false;
+      externalError = false;
       badNodeIndex = -1;
       restartingNodeIndex = -1;
       restartingNodeDeadline = 0;
@@ -231,17 +231,21 @@ class DataStreamer extends Daemon {
       return error;
     }
 
+    synchronized boolean hasExternalErrorOnly() {
+      return error && externalError && !isNodeMarked();
+    }
+
     synchronized boolean hasDatanodeError() {
-      return error && (isNodeMarked() || extenalError);
+      return error && (isNodeMarked() || externalError);
     }
 
     synchronized void setError(boolean err) {
       this.error = err;
     }
 
-    synchronized void initExtenalError() {
+    synchronized void initExternalError() {
       setError(true);
-      this.extenalError = true;
+      this.externalError = true;
     }
 
 
@@ -405,11 +409,13 @@ class DataStreamer extends Daemon {
   private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes;
   private final String[] favoredNodes;
 
-  private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src,
+  private DataStreamer(HdfsFileStatus stat, ExtendedBlock block,
+                       DFSClient dfsClient, String src,
                        Progressable progress, DataChecksum checksum,
                        AtomicReference<CachingStrategy> cachingStrategy,
                        ByteArrayManager byteArrayManage,
                        boolean isAppend, String[] favoredNodes) {
+    this.block = block;
     this.dfsClient = dfsClient;
     this.src = src;
     this.progress = progress;
@@ -434,9 +440,8 @@ class DataStreamer extends Daemon {
                String src, Progressable progress, DataChecksum checksum,
                AtomicReference<CachingStrategy> cachingStrategy,
                ByteArrayManager byteArrayManage, String[] favoredNodes) {
-    this(stat, dfsClient, src, progress, checksum, cachingStrategy,
+    this(stat, block, dfsClient, src, progress, checksum, cachingStrategy,
         byteArrayManage, false, favoredNodes);
-    this.block = block;
     stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
   }
 
@@ -450,10 +455,9 @@ class DataStreamer extends Daemon {
                String src, Progressable progress, DataChecksum checksum,
                AtomicReference<CachingStrategy> cachingStrategy,
                ByteArrayManager byteArrayManage) throws IOException {
-    this(stat, dfsClient, src, progress, checksum, cachingStrategy,
+    this(stat, lastBlock.getBlock(), dfsClient, src, progress, checksum, cachingStrategy,
         byteArrayManage, true, null);
     stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
-    block = lastBlock.getBlock();
     bytesSent = block.getNumBytes();
     accessToken = lastBlock.getBlockToken();
   }
@@ -1074,6 +1078,10 @@ class DataStreamer extends Daemon {
     if (!errorState.hasDatanodeError()) {
       return false;
     }
+    if (errorState.hasExternalErrorOnly() && block == null) {
+      // block is not yet initialized, handle external error later.
+      return false;
+    }
     if (response != null) {
       LOG.info("Error Recovery for " + block +
           " waiting for responder to exit. ");
@@ -1402,15 +1410,28 @@ class DataStreamer extends Daemon {
   }
 
   LocatedBlock updateBlockForPipeline() throws IOException {
+    return callUpdateBlockForPipeline(block);
+  }
+
+  LocatedBlock callUpdateBlockForPipeline(ExtendedBlock newBlock) throws IOException {
     return dfsClient.namenode.updateBlockForPipeline(
-        block, dfsClient.clientName);
+        newBlock, dfsClient.clientName);
+  }
+
+  static ExtendedBlock newBlock(ExtendedBlock b, final long newGS) {
+    return new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(),
+        b.getNumBytes(), newGS);
   }
 
   /** update pipeline at the namenode */
   ExtendedBlock updatePipeline(long newGS) throws IOException {
-    final ExtendedBlock newBlock = new ExtendedBlock(
-        block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
-    dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
+    final ExtendedBlock newBlock = newBlock(block, newGS);
+    return callUpdatePipeline(block, newBlock);
+  }
+
+  ExtendedBlock callUpdatePipeline(ExtendedBlock oldBlock, ExtendedBlock newBlock)
+      throws IOException {
+    dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock, newBlock,
         nodes, storageIDs);
     return newBlock;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3682e019/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
index 7b7db75..a177796 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.hdfs.DFSStripedOutputStream.Coordinator;
+import org.apache.hadoop.hdfs.DFSStripedOutputStream.MultipleBlockingQueue;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -37,18 +38,64 @@ import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
 
-/****************************************************************************
- * The StripedDataStreamer class is used by {@link DFSStripedOutputStream}.
- * There are two kinds of StripedDataStreamer, leading streamer and ordinary
- * stream. Leading streamer requests a block group from NameNode, unwraps
- * it to located blocks and transfers each located block to its corresponding
- * ordinary streamer via a blocking queue.
- *
- ****************************************************************************/
+/**
+ * This class extends {@link DataStreamer} to support writing striped blocks
+ * to datanodes.
+ * A {@link DFSStripedOutputStream} has multiple {@link StripedDataStreamer}s.
+ * Whenever the streamers need to talk the namenode, only the fastest streamer
+ * sends an rpc call to the namenode and then populates the result for the
+ * other streamers.
+ */
 public class StripedDataStreamer extends DataStreamer {
+  /**
+   * This class is designed for multiple threads to share a
+   * {@link MultipleBlockingQueue}. Initially, the queue is empty. The earliest
+   * thread calling poll populates entries to the queue and the other threads
+   * will wait for it. Once the entries are populated, all the threads can poll
+   * their entries.
+   *
+   * @param <T> the queue entry type.
+   */
+  static abstract class ConcurrentPoll<T> {
+    private final MultipleBlockingQueue<T> queue;
+
+    ConcurrentPoll(MultipleBlockingQueue<T> queue) {
+      this.queue = queue;
+    }
+
+    T poll(final int i) throws IOException {
+      for(;;) {
+        synchronized(queue) {
+          final T polled = queue.poll(i);
+          if (polled != null) { // already populated; return polled item.
+            return polled;
+          }
+          if (isReady2Populate()) {
+            populate();
+            return queue.poll(i);
+          }
+        }
+
+        // sleep and then retry.
+        try {
+          Thread.sleep(100);
+        } catch(InterruptedException ie) {
+          throw DFSUtil.toInterruptedIOException(
+              "Sleep interrupted during poll", ie);
+        }
+      }
+    }
+
+    boolean isReady2Populate() {
+      return queue.isEmpty();
+    }
+
+    abstract void populate() throws IOException;
+  }
+
   private final Coordinator coordinator;
   private final int index;
-  private volatile boolean isFailed;
+  private volatile boolean failed;
 
   StripedDataStreamer(HdfsFileStatus stat,
                       DFSClient dfsClient, String src,
@@ -66,16 +113,12 @@ public class StripedDataStreamer extends DataStreamer {
     return index;
   }
 
-  void setIsFailed(boolean isFailed) {
-    this.isFailed = isFailed;
+  void setFailed(boolean failed) {
+    this.failed = failed;
   }
 
   boolean isFailed() {
-    return isFailed;
-  }
-
-  public boolean isLeadingStreamer () {
-    return index == 0;
+    return failed;
   }
 
   private boolean isParityStreamer() {
@@ -85,81 +128,110 @@ public class StripedDataStreamer extends DataStreamer {
   @Override
   protected void endBlock() {
     if (!isParityStreamer()) {
-      coordinator.putEndBlock(index, block);
+      coordinator.offerEndBlock(index, block);
     }
     super.endBlock();
   }
 
   @Override
-  protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
+  protected LocatedBlock locateFollowingBlock(final DatanodeInfo[] excludedNodes)
       throws IOException {
-    if (isLeadingStreamer()) {
-      if (block != null) {
-        // set numByte for the previous block group
-        long bytes = 0;
-        for (int i = 0; i < NUM_DATA_BLOCKS; i++) {
-          final ExtendedBlock b = coordinator.getEndBlock(i);
-          if (b != null) {
-            StripedBlockUtil.checkBlocks(block, i, b);
+    final MultipleBlockingQueue<LocatedBlock> followingBlocks
+        = coordinator.getFollowingBlocks();
+    return new ConcurrentPoll<LocatedBlock>(followingBlocks) {
+      @Override
+      boolean isReady2Populate() {
+        return super.isReady2Populate()
+            && (block == null || coordinator.hasAllEndBlocks());
+      }
+
+      @Override
+      void populate() throws IOException {
+        getLastException().check(false);
+
+        if (block != null) {
+          // set numByte for the previous block group
+          long bytes = 0;
+          for (int i = 0; i < NUM_DATA_BLOCKS; i++) {
+            final ExtendedBlock b = coordinator.takeEndBlock(i);
+            StripedBlockUtil.checkBlocks(index, block, i, b);
             bytes += b.getNumBytes();
           }
+          block.setNumBytes(bytes);
+          block.setBlockId(block.getBlockId() - index);
         }
-        block.setNumBytes(bytes);
-      }
 
-      putLoactedBlocks(super.locateFollowingBlock(excludedNodes));
-    }
-
-    return coordinator.getStripedBlock(index);
-  }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("locateFollowingBlock: index=" + index + ", block=" + block);
+        }
 
-  void putLoactedBlocks(LocatedBlock lb) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Obtained block group " + lb);
-    }
-    LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
-        (LocatedStripedBlock)lb,
-        BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
-
-    // TODO allow write to continue if blocks.length >= NUM_DATA_BLOCKS
-    assert blocks.length == (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) :
-        "Fail to get block group from namenode: blockGroupSize: " +
-            (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) + ", blocks.length: " +
-            blocks.length;
-    for (int i = 0; i < blocks.length; i++) {
-      coordinator.putStripedBlock(i, blocks[i]);
-    }
+        final LocatedBlock lb = StripedDataStreamer.super.locateFollowingBlock(
+            excludedNodes);
+        final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
+            (LocatedStripedBlock)lb,
+            BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
+
+        for (int i = 0; i < blocks.length; i++) {
+          if (!coordinator.getStripedDataStreamer(i).isFailed()) {
+            if (blocks[i] == null) {
+              getLastException().set(
+                  new IOException("Failed to get following block, i=" + i));
+            } else {
+              followingBlocks.offer(i, blocks[i]);
+            }
+          }
+        }
+      }
+    }.poll(index);
   }
 
   @Override
   LocatedBlock updateBlockForPipeline() throws IOException {
-    if (isLeadingStreamer()) {
-      final LocatedBlock updated = super.updateBlockForPipeline();
-      final ExtendedBlock block = updated.getBlock();
-      for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) {
-        final LocatedBlock lb = new LocatedBlock(block, null, null, null,
-                -1, updated.isCorrupt(), null);
-        lb.setBlockToken(updated.getBlockToken());
-        coordinator.putStripedBlock(i, lb);
+    final MultipleBlockingQueue<LocatedBlock> newBlocks
+        = coordinator.getNewBlocks();
+    return new ConcurrentPoll<LocatedBlock>(newBlocks) {
+      @Override
+      void populate() throws IOException {
+        final ExtendedBlock bg = coordinator.getBlockGroup();
+        final LocatedBlock updated = callUpdateBlockForPipeline(bg);
+        final long newGS = updated.getBlock().getGenerationStamp();
+        for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) {
+          final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock();
+          if (bi != null) {
+            final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS),
+                null, null, null, -1, updated.isCorrupt(), null);
+            lb.setBlockToken(updated.getBlockToken());
+            newBlocks.offer(i, lb);
+          } else {
+            final LocatedBlock lb = coordinator.getFollowingBlocks().peek(i);
+            lb.getBlock().setGenerationStamp(newGS);
+          }
+        }
       }
-    }
-    return coordinator.getStripedBlock(index);
+    }.poll(index);
   }
 
   @Override
-  ExtendedBlock updatePipeline(long newGS) throws IOException {
-    if (isLeadingStreamer()) {
-      final ExtendedBlock newBlock = super.updatePipeline(newGS);
-      for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) {
-        coordinator.putUpdateBlock(i, new ExtendedBlock(newBlock));
+  ExtendedBlock updatePipeline(final long newGS) throws IOException {
+    final MultipleBlockingQueue<ExtendedBlock> updateBlocks
+        = coordinator.getUpdateBlocks();
+    return new ConcurrentPoll<ExtendedBlock>(updateBlocks) {
+      @Override
+      void populate() throws IOException {
+        final ExtendedBlock bg = coordinator.getBlockGroup();
+        final ExtendedBlock newBG = newBlock(bg, newGS);
+        final ExtendedBlock updated = callUpdatePipeline(bg, newBG);
+        for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) {
+          final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock();
+          updateBlocks.offer(i, newBlock(bi, updated.getGenerationStamp()));
+        }
       }
-    }
-    return coordinator.getUpdateBlock(index);
+    }.poll(index);
   }
 
   @Override
   public String toString() {
-    return "#" + index + ": isFailed? " + Boolean.toString(isFailed).charAt(0)
+    return "#" + index + ": failed? " + Boolean.toString(failed).charAt(0)
         + ", " + super.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3682e019/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
index 34ec06d..9aef436 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
@@ -103,8 +103,6 @@ public class DfsClientConf {
   private final int hedgedReadThreadpoolSize;
 
   private final int stripedReadThreadpoolSize;
-  private final int stripedWriteMaxSecondsGetStripedBlock;
-  private final int stripedWriteMaxSecondsGetEndedBlock;
 
 
   public DfsClientConf(Configuration conf) {
@@ -228,13 +226,6 @@ public class DfsClientConf {
     Preconditions.checkArgument(stripedReadThreadpoolSize > 0, "The value of " +
         HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY +
         " must be greater than 0.");
-
-    stripedWriteMaxSecondsGetStripedBlock = conf.getInt(
-        HdfsClientConfigKeys.StripedWrite.MAX_SECONDS_GET_STRIPED_BLOCK_KEY,
-        HdfsClientConfigKeys.StripedWrite.MAX_SECONDS_GET_STRIPED_BLOCK_DEFAULT);
-    stripedWriteMaxSecondsGetEndedBlock = conf.getInt(
-        HdfsClientConfigKeys.StripedWrite.MAX_SECONDS_GET_ENDED_BLOCK_KEY,
-        HdfsClientConfigKeys.StripedWrite.MAX_SECONDS_GET_ENDED_BLOCK_DEFAULT);
   }
 
   private DataChecksum.Type getChecksumType(Configuration conf) {
@@ -519,20 +510,6 @@ public class DfsClientConf {
   }
 
   /**
-   * @return stripedWriteMaxSecondsGetStripedBlock
-   */
-  public int getStripedWriteMaxSecondsGetStripedBlock() {
-    return stripedWriteMaxSecondsGetStripedBlock;
-  }
-
-  /**
-   * @return stripedWriteMaxSecondsGetEndedBlock
-   */
-  public int getStripedWriteMaxSecondsGetEndedBlock() {
-    return stripedWriteMaxSecondsGetEndedBlock;
-  }
-
-  /**
    * @return the shortCircuitConf
    */
   public ShortCircuitConf getShortCircuitConf() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3682e019/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index a29e8e3..579434b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -950,22 +950,22 @@ public class StripedBlockUtil {
 
   /**
    * Check if the information such as IDs and generation stamps in block-i
-   * match block-0.
+   * match block-j, where block-i and block-j are in the same group.
    */
-  public static void checkBlocks(ExtendedBlock block0, int i,
-      ExtendedBlock blocki) throws IOException {
+  public static void checkBlocks(int j, ExtendedBlock blockj,
+      int i, ExtendedBlock blocki) throws IOException {
 
-    if (!blocki.getBlockPoolId().equals(block0.getBlockPoolId())) {
-      throw new IOException("Block pool IDs mismatched: block0="
-          + block0 + ", block" + i + "=" + blocki);
+    if (!blocki.getBlockPoolId().equals(blockj.getBlockPoolId())) {
+      throw new IOException("Block pool IDs mismatched: block" + j + "="
+          + blockj + ", block" + i + "=" + blocki);
     }
-    if (blocki.getBlockId() - i != block0.getBlockId()) {
-      throw new IOException("Block IDs mismatched: block0="
-          + block0 + ", block" + i + "=" + blocki);
+    if (blocki.getBlockId() - i != blockj.getBlockId() - j) {
+      throw new IOException("Block IDs mismatched: block" + j + "="
+          + blockj + ", block" + i + "=" + blocki);
     }
-    if (blocki.getGenerationStamp() != block0.getGenerationStamp()) {
-      throw new IOException("Generation stamps mismatched: block0="
-          + block0 + ", block" + i + "=" + blocki);
+    if (blocki.getGenerationStamp() != blockj.getGenerationStamp()) {
+      throw new IOException("Generation stamps mismatched: block" + j + "="
+          + blockj + ", block" + i + "=" + blocki);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3682e019/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
index d2e0458..8944cde 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
@@ -93,6 +93,13 @@ public class TestDFSStripedOutputStreamWithFailure {
   }
 
   @Test(timeout=120000)
+  public void testDatanodeFailure0() {
+    final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
+    final int dn = 0;
+    runTest("file" + dn, length, dn);
+  }
+
+  @Test(timeout=120000)
   public void testDatanodeFailure1() {
     final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE);
     final int dn = 1;


Mime
View raw message