hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ji...@apache.org
Subject hadoop git commit: HDFS-8323. Bump GenerationStamp for write faliure in DFSStripedOutputStream. Contributed by Tsz Wo Nicholas Sze.
Date Wed, 20 May 2015 04:20:24 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7285 bf3c28a89 -> 4dd4aa577


HDFS-8323. Bump GenerationStamp for write faliure in DFSStripedOutputStream. Contributed by
Tsz Wo Nicholas Sze.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4dd4aa57
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4dd4aa57
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4dd4aa57

Branch: refs/heads/HDFS-7285
Commit: 4dd4aa5774f9c60ef0e5217875bf2c55c01f4ff9
Parents: bf3c28a
Author: Jing Zhao <jing9@apache.org>
Authored: Tue May 19 21:19:51 2015 -0700
Committer: Jing Zhao <jing9@apache.org>
Committed: Tue May 19 21:19:51 2015 -0700

----------------------------------------------------------------------
 .../hdfs/protocol/LocatedStripedBlock.java      |  12 ++-
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   3 +
 .../hadoop/hdfs/DFSStripedOutputStream.java     | 106 +++++++++++--------
 .../org/apache/hadoop/hdfs/DataStreamer.java    |  15 ++-
 .../apache/hadoop/hdfs/StripedDataStreamer.java |  67 +++++++++---
 .../blockmanagement/DatanodeStorageInfo.java    |  15 +--
 .../hdfs/server/namenode/FSNamesystem.java      |  38 ++++---
 .../hdfs/server/namenode/NameNodeRpcServer.java |   2 +-
 .../hadoop/hdfs/util/StripedBlockUtil.java      |  40 +++++--
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |   8 +-
 .../TestDFSStripedOutputStreamWithFailure.java  |  69 +++++++-----
 .../server/namenode/TestAddStripedBlocks.java   |  12 ++-
 12 files changed, 258 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4dd4aa57/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
index 93a5948..dc5a77f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java
@@ -31,15 +31,21 @@ import java.util.Arrays;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class LocatedStripedBlock extends LocatedBlock {
+  private static final int[] EMPTY_INDICES = {};
+
   private int[] blockIndices;
 
   public LocatedStripedBlock(ExtendedBlock b, DatanodeInfo[] locs,
       String[] storageIDs, StorageType[] storageTypes, int[] indices,
       long startOffset, boolean corrupt, DatanodeInfo[] cachedLocs) {
     super(b, locs, storageIDs, storageTypes, startOffset, corrupt, cachedLocs);
-    assert indices != null && indices.length == locs.length;
-    this.blockIndices = new int[indices.length];
-    System.arraycopy(indices, 0, blockIndices, 0, indices.length);
+
+    if (indices == null) {
+      this.blockIndices = EMPTY_INDICES;
+    } else {
+      this.blockIndices = new int[indices.length];
+      System.arraycopy(indices, 0, blockIndices, 0, indices.length);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4dd4aa57/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 48bc9d6..b608b10 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -238,3 +238,6 @@
 
     HDFS-8428. Erasure Coding: Fix the NullPointerException when deleting file.
     (Yi Liu via zhz).
+
+    HDFS-8323. Bump GenerationStamp for write faliure in DFSStripedOutputStream.
+    (Tsz Wo Nicholas Sze via jing9)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4dd4aa57/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index 4399a37..8eed6ad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -33,7 +33,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -61,52 +60,72 @@ import com.google.common.base.Preconditions;
 
 @InterfaceAudience.Private
 public class DFSStripedOutputStream extends DFSOutputStream {
-  /** Coordinate the communication between the streamers. */
-  static class Coordinator {
-    private final DfsClientConf conf;
-    private final List<BlockingQueue<ExtendedBlock>> endBlocks;
-    private final List<BlockingQueue<LocatedBlock>> stripedBlocks;
-    private volatile boolean shouldLocateFollowingBlock = false;
-
-    Coordinator(final DfsClientConf conf, final int numDataBlocks,
-                final int numAllBlocks) {
-      this.conf = conf;
-      endBlocks = new ArrayList<>(numDataBlocks);
-      for (int i = 0; i < numDataBlocks; i++) {
-        endBlocks.add(new LinkedBlockingQueue<ExtendedBlock>(1));
+  static class MultipleBlockingQueue<T> {
+    private final int pullTimeout;
+    private final List<BlockingQueue<T>> queues;
+
+    MultipleBlockingQueue(int numQueue, int queueSize, int pullTimeout) {
+      queues = new ArrayList<>(numQueue);
+      for (int i = 0; i < numQueue; i++) {
+        queues.add(new LinkedBlockingQueue<T>(queueSize));
       }
 
-      stripedBlocks = new ArrayList<>(numAllBlocks);
-      for (int i = 0; i < numAllBlocks; i++) {
-        stripedBlocks.add(new LinkedBlockingQueue<LocatedBlock>(1));
+      this.pullTimeout = pullTimeout;
+    }
+
+    void offer(int i, T object) {
+      final boolean b = queues.get(i).offer(object);
+      Preconditions.checkState(b, "Failed to offer " + object
+          + " to queue, i=" + i);
+    }
+
+    T poll(int i) throws InterruptedIOException {
+      try {
+        return queues.get(i).poll(pullTimeout, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        throw DFSUtil.toInterruptedIOException("poll interrupted, i=" + i, e);
       }
     }
 
-    boolean shouldLocateFollowingBlock() {
-      return shouldLocateFollowingBlock;
+    T peek(int i) {
+      return queues.get(i).peek();
     }
+  }
 
-    void putEndBlock(int i, ExtendedBlock block) {
-      shouldLocateFollowingBlock = true;
+  /** Coordinate the communication between the streamers. */
+  static class Coordinator {
+    private final MultipleBlockingQueue<LocatedBlock> stripedBlocks;
+    private final MultipleBlockingQueue<ExtendedBlock> endBlocks;
+    private final MultipleBlockingQueue<ExtendedBlock> updateBlocks;
 
-      final boolean b = endBlocks.get(i).offer(block);
-      Preconditions.checkState(b, "Failed to add " + block
-          + " to endBlocks queue, i=" + i);
+    Coordinator(final DfsClientConf conf, final int numDataBlocks,
+        final int numAllBlocks) {
+      stripedBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1,
+          conf.getStripedWriteMaxSecondsGetStripedBlock());
+      endBlocks = new MultipleBlockingQueue<>(numDataBlocks, 1,
+          conf.getStripedWriteMaxSecondsGetEndedBlock());
+      updateBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1,
+          conf.getStripedWriteMaxSecondsGetStripedBlock());
+    }
+
+    void putEndBlock(int i, ExtendedBlock block) {
+      endBlocks.offer(i, block);
     }
 
     ExtendedBlock getEndBlock(int i) throws InterruptedIOException {
-      try {
-        return endBlocks.get(i).poll(
-            conf.getStripedWriteMaxSecondsGetEndedBlock(),
-            TimeUnit.SECONDS);
-      } catch (InterruptedException e) {
-        throw DFSUtil.toInterruptedIOException(
-            "getEndBlock interrupted, i=" + i, e);
-      }
+      return endBlocks.poll(i);
+    }
+
+    void putUpdateBlock(int i, ExtendedBlock block) {
+      updateBlocks.offer(i, block);
+    }
+
+    ExtendedBlock getUpdateBlock(int i) throws InterruptedIOException {
+      return updateBlocks.poll(i);
     }
 
     void setBytesEndBlock(int i, long newBytes, ExtendedBlock block) {
-      ExtendedBlock b = endBlocks.get(i).peek();
+      ExtendedBlock b = endBlocks.peek(i);
       if (b == null) {
         // streamer just has failed, put end block and continue
         b = block;
@@ -119,22 +138,11 @@ public class DFSStripedOutputStream extends DFSOutputStream {
       if (LOG.isDebugEnabled()) {
         LOG.debug("putStripedBlock " + block + ", i=" + i);
       }
-      final boolean b = stripedBlocks.get(i).offer(block);
-      if (!b) {
-        throw new IOException("Failed: " + block + ", i=" + i);
-      }
+      stripedBlocks.offer(i, block);
     }
 
     LocatedBlock getStripedBlock(int i) throws IOException {
-      final LocatedBlock lb;
-      try {
-        lb = stripedBlocks.get(i).poll(
-            conf.getStripedWriteMaxSecondsGetStripedBlock(),
-            TimeUnit.SECONDS);
-      } catch (InterruptedException e) {
-        throw DFSUtil.toInterruptedIOException("getStripedBlock interrupted", e);
-      }
-
+      final LocatedBlock lb = stripedBlocks.poll(i);
       if (lb == null) {
         throw new IOException("Failed: i=" + i);
       }
@@ -218,6 +226,11 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     return streamers.get(0);
   }
 
+  @Override
+  ExtendedBlock getBlock() {
+    return getLeadingStreamer().getBlock();
+  }
+
   /** Construct a new output stream for creating a file. */
   DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
                          EnumSet<CreateFlag> flag, Progressable progress,
@@ -292,6 +305,7 @@ public class DFSStripedOutputStream extends DFSOutputStream {
     int count = 0;
     for(StripedDataStreamer s : streamers) {
       if (!s.isFailed()) {
+        s.getErrorState().initExtenalError();
         count++;
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4dd4aa57/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 8f07341..1344d54 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -44,7 +44,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
 import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
@@ -210,6 +209,7 @@ class DataStreamer extends Daemon {
 
   static class ErrorState {
     private boolean error = false;
+    private boolean extenalError = false;
     private int badNodeIndex = -1;
     private int restartingNodeIndex = -1;
     private long restartingNodeDeadline = 0;
@@ -221,6 +221,7 @@ class DataStreamer extends Daemon {
 
     synchronized void reset() {
       error = false;
+      extenalError = false;
       badNodeIndex = -1;
       restartingNodeIndex = -1;
       restartingNodeDeadline = 0;
@@ -231,13 +232,19 @@ class DataStreamer extends Daemon {
     }
 
     synchronized boolean hasDatanodeError() {
-      return error && isNodeMarked();
+      return error && (isNodeMarked() || extenalError);
     }
 
     synchronized void setError(boolean err) {
       this.error = err;
     }
 
+    synchronized void initExtenalError() {
+      setError(true);
+      this.extenalError = true;
+    }
+
+
     synchronized void setBadNodeIndex(int index) {
       this.badNodeIndex = index;
     }
@@ -1736,6 +1743,10 @@ class DataStreamer extends Daemon {
     return accessToken;
   }
 
+  ErrorState getErrorState() {
+    return errorState;
+  }
+
   /**
    * Put a packet to the data queue
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4dd4aa57/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
index 258fc65..7b7db75 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
@@ -94,36 +94,69 @@ public class StripedDataStreamer extends DataStreamer {
   protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
       throws IOException {
     if (isLeadingStreamer()) {
-      if (coordinator.shouldLocateFollowingBlock()) {
+      if (block != null) {
         // set numByte for the previous block group
         long bytes = 0;
         for (int i = 0; i < NUM_DATA_BLOCKS; i++) {
           final ExtendedBlock b = coordinator.getEndBlock(i);
-          bytes += b == null ? 0 : b.getNumBytes();
+          if (b != null) {
+            StripedBlockUtil.checkBlocks(block, i, b);
+            bytes += b.getNumBytes();
+          }
         }
         block.setNumBytes(bytes);
       }
 
-      final LocatedStripedBlock lsb
-          = (LocatedStripedBlock)super.locateFollowingBlock(excludedNodes);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Obtained block group " + lsb);
-      }
-      LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(lsb,
-          BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
-
-      assert blocks.length == (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) :
-          "Fail to get block group from namenode: blockGroupSize: " +
-              (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) + ", blocks.length: " +
-              blocks.length;
-      for (int i = 0; i < blocks.length; i++) {
-        coordinator.putStripedBlock(i, blocks[i]);
-      }
+      putLoactedBlocks(super.locateFollowingBlock(excludedNodes));
     }
 
     return coordinator.getStripedBlock(index);
   }
 
+  void putLoactedBlocks(LocatedBlock lb) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Obtained block group " + lb);
+    }
+    LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
+        (LocatedStripedBlock)lb,
+        BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
+
+    // TODO allow write to continue if blocks.length >= NUM_DATA_BLOCKS
+    assert blocks.length == (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) :
+        "Fail to get block group from namenode: blockGroupSize: " +
+            (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) + ", blocks.length: " +
+            blocks.length;
+    for (int i = 0; i < blocks.length; i++) {
+      coordinator.putStripedBlock(i, blocks[i]);
+    }
+  }
+
+  @Override
+  LocatedBlock updateBlockForPipeline() throws IOException {
+    if (isLeadingStreamer()) {
+      final LocatedBlock updated = super.updateBlockForPipeline();
+      final ExtendedBlock block = updated.getBlock();
+      for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) {
+        final LocatedBlock lb = new LocatedBlock(block, null, null, null,
+                -1, updated.isCorrupt(), null);
+        lb.setBlockToken(updated.getBlockToken());
+        coordinator.putStripedBlock(i, lb);
+      }
+    }
+    return coordinator.getStripedBlock(index);
+  }
+
+  @Override
+  ExtendedBlock updatePipeline(long newGS) throws IOException {
+    if (isLeadingStreamer()) {
+      final ExtendedBlock newBlock = super.updatePipeline(newGS);
+      for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) {
+        coordinator.putUpdateBlock(i, new ExtendedBlock(newBlock));
+      }
+    }
+    return coordinator.getUpdateBlock(index);
+  }
+
   @Override
   public String toString() {
     return "#" + index + ": isFailed? " + Boolean.toString(isFailed).charAt(0)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4dd4aa57/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 9b5a923..2275d91 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -21,18 +21,15 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 
-import com.google.common.annotations.VisibleForTesting;
-
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * A Datanode has one or more storages. A storage in the Datanode is represented
  * by this class.
@@ -41,7 +38,7 @@ public class DatanodeStorageInfo {
   public static final DatanodeStorageInfo[] EMPTY_ARRAY = {};
 
   public static DatanodeInfo[] toDatanodeInfos(DatanodeStorageInfo[] storages) {
-    return toDatanodeInfos(Arrays.asList(storages));
+    return storages == null? null: toDatanodeInfos(Arrays.asList(storages));
   }
   static DatanodeInfo[] toDatanodeInfos(List<DatanodeStorageInfo> storages) {
     final DatanodeInfo[] datanodes = new DatanodeInfo[storages.size()];
@@ -61,6 +58,9 @@ public class DatanodeStorageInfo {
   }
 
   public static String[] toStorageIDs(DatanodeStorageInfo[] storages) {
+    if (storages == null) {
+      return null;
+    }
     String[] storageIDs = new String[storages.length];
     for(int i = 0; i < storageIDs.length; i++) {
       storageIDs[i] = storages[i].getStorageID();
@@ -69,6 +69,9 @@ public class DatanodeStorageInfo {
   }
 
   public static StorageType[] toStorageTypes(DatanodeStorageInfo[] storages) {
+    if (storages == null) {
+      return null;
+    }
     StorageType[] storageTypes = new StorageType[storages.length];
     for(int i = 0; i < storageTypes.length; i++) {
       storageTypes[i] = storages[i].getStorageType();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4dd4aa57/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 16212e9..9e5e0e8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -5987,29 +5987,30 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * Get a new generation stamp together with an access token for 
    * a block under construction
    * 
-   * This method is called for recovering a failed pipeline or setting up
-   * a pipeline to append to a block.
+   * This method is called for recovering a failed write or setting up
+   * a block for appended.
    * 
    * @param block a block
    * @param clientName the name of a client
    * @return a located block with a new generation stamp and an access token
    * @throws IOException if any error occurs
    */
-  LocatedBlock updateBlockForPipeline(ExtendedBlock block, 
+  LocatedBlock bumpBlockGenerationStamp(ExtendedBlock block,
       String clientName) throws IOException {
-    LocatedBlock locatedBlock;
+    final LocatedBlock locatedBlock;
     checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
 
       // check vadility of parameters
-      checkUCBlock(block, clientName);
+      final INodeFile file = checkUCBlock(block, clientName);
   
       // get a new generation stamp and an access token
       block.setGenerationStamp(nextGenerationStamp(blockIdManager.isLegacyBlock(block.getLocalBlock())));
-      locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
-      blockManager.setBlockToken(locatedBlock, BlockTokenIdentifier.AccessMode.WRITE);
+
+      locatedBlock = BlockManager.newLocatedBlock(
+          block, file.getLastBlock(), null, -1);
     } finally {
       writeUnlock();
     }
@@ -6064,23 +6065,26 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     // check the vadility of the block and lease holder name
     final INodeFile pendingFile = checkUCBlock(oldBlock, clientName);
     final BlockInfo lastBlock = pendingFile.getLastBlock();
-    // when updating pipeline, the last block must be contiguous block
-    assert lastBlock instanceof BlockInfoContiguousUnderConstruction;
-    BlockInfoContiguousUnderConstruction blockinfo =
-        (BlockInfoContiguousUnderConstruction) lastBlock;
+    final BlockInfoUnderConstruction blockinfo = (BlockInfoUnderConstruction)lastBlock;
 
     // check new GS & length: this is not expected
-    if (newBlock.getGenerationStamp() <= blockinfo.getGenerationStamp() ||
-        newBlock.getNumBytes() < blockinfo.getNumBytes()) {
-      String msg = "Update " + oldBlock + " (len = " + 
-        blockinfo.getNumBytes() + ") to an older state: " + newBlock + 
-        " (len = " + newBlock.getNumBytes() +")";
+    if (newBlock.getGenerationStamp() <= lastBlock.getGenerationStamp()) {
+      final String msg = "Update " + oldBlock + " but the new block " + newBlock
+          + " does not have a larger generation stamp than the last block "
+          + lastBlock;
+      LOG.warn(msg);
+      throw new IOException(msg);
+    }
+    if (newBlock.getNumBytes() < lastBlock.getNumBytes()) {
+      final String msg = "Update " + oldBlock + " (size="
+          + oldBlock.getNumBytes() + ") to a smaller size block " + newBlock
+          + " (size=" + newBlock.getNumBytes() + ")";
       LOG.warn(msg);
       throw new IOException(msg);
     }
 
     // Update old block with the new generation stamp and new length
-    blockinfo.setNumBytes(newBlock.getNumBytes());
+    lastBlock.setNumBytes(newBlock.getNumBytes());
     blockinfo.setGenerationStampAndVerifyReplicas(newBlock.getGenerationStamp());
 
     // find the DatanodeDescriptor objects

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4dd4aa57/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 747f528..9e94b90 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -788,7 +788,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   public LocatedBlock updateBlockForPipeline(ExtendedBlock block, String clientName)
       throws IOException {
     checkNNStartup();
-    return namesystem.updateBlockForPipeline(block, clientName);
+    return namesystem.bumpBlockGenerationStamp(block, clientName);
   }
 
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4dd4aa57/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 6f7dcb1..0b09f37 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder;
 
 import java.util.*;
+import java.io.IOException;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
@@ -104,12 +105,17 @@ public class StripedBlockUtil {
     final ExtendedBlock blk = constructInternalBlock(
         bg.getBlock(), cellSize, dataBlkNum, idxInBlockGroup);
 
-    return new LocatedBlock(blk,
-        new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
-        new String[]{bg.getStorageIDs()[idxInReturnedLocs]},
-        new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]},
-        bg.getStartOffset() + idxInBlockGroup * cellSize, bg.isCorrupt(),
-        null);
+    final long offset = bg.getStartOffset() + idxInBlockGroup * cellSize;
+    if (idxInReturnedLocs < bg.getLocations().length) {
+      return new LocatedBlock(blk,
+          new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]},
+          new String[]{bg.getStorageIDs()[idxInReturnedLocs]},
+          new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]},
+          offset, bg.isCorrupt(), null);
+    } else {
+      return new LocatedBlock(blk, null, null, null,
+          offset, bg.isCorrupt(), null);
+    }
   }
 
   /**
@@ -823,4 +829,26 @@ public class StripedBlockUtil {
       return "(index=" + index + ", state =" + state + ")";
     }
   }
+
+  /**
+   * Check if the information such as IDs and generation stamps in block-i
+   * match block-0.
+   */
+  public static void checkBlocks(ExtendedBlock block0, int i,
+      ExtendedBlock blocki) throws IOException {
+
+    if (!blocki.getBlockPoolId().equals(block0.getBlockPoolId())) {
+      throw new IOException("Block pool IDs mismatched: block0="
+          + block0 + ", block" + i + "=" + blocki);
+    }
+    if (blocki.getBlockId() - i != block0.getBlockId()) {
+      throw new IOException("Block IDs mismatched: block0="
+          + block0 + ", block" + i + "=" + blocki);
+    }
+    if (blocki.getGenerationStamp() != block0.getGenerationStamp()) {
+      throw new IOException("Generation stamps mismatched: block0="
+          + block0 + ", block" + i + "=" + blocki);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4dd4aa57/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 558c45d..82c0781 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1953,11 +1953,9 @@ public class DFSTestUtil {
    * Because currently DFSStripedOutputStream does not support hflush/hsync,
    * tests can use this method to flush all the buffered data to DataNodes.
    */
-  public static void writeAndFlushStripedOutputStream(
-      DFSStripedOutputStream out, int chunkSize) throws IOException {
-    // FSOutputSummer.BUFFER_NUM_CHUNKS == 9
-    byte[] toWrite = new byte[chunkSize * 9 + 1];
-    out.write(toWrite);
+  public static ExtendedBlock flushInternal(DFSStripedOutputStream out)
+      throws IOException {
     out.flushInternal();
+    return out.getBlock();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4dd4aa57/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
index 4ad3b2e..c232e13 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -44,6 +43,8 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.base.Preconditions;
+
 public class TestDFSStripedOutputStreamWithFailure {
   public static final Log LOG = LogFactory.getLog(
       TestDFSStripedOutputStreamWithFailure.class);
@@ -59,6 +60,9 @@ public class TestDFSStripedOutputStreamWithFailure {
   private static final int BLOCK_SIZE = CELL_SIZE * STRIPES_PER_BLOCK;
   private static final int BLOCK_GROUP_SIZE = BLOCK_SIZE * NUM_DATA_BLOCKS;
 
+  private static final int FLUSH_POS
+      = 9*DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1;
+
   private final HdfsConfiguration conf = new HdfsConfiguration();
   private MiniDFSCluster cluster;
   private DistributedFileSystem dfs;
@@ -149,50 +153,53 @@ public class TestDFSStripedOutputStreamWithFailure {
       cluster.startDataNodes(conf, 1, true, null, null);
       cluster.waitActive();
 
-      runTest(new Path(dir, src), length, dnIndex);
+      runTest(new Path(dir, src), length, length/2, dnIndex);
     } catch(Exception e) {
       LOG.info("FAILED", e);
       Assert.fail(StringUtils.stringifyException(e));
     }
   }
 
-  private void runTest(final Path p, final int length,
+  private void runTest(final Path p, final int length, final int killPos,
       final int dnIndex) throws Exception {
-    LOG.info("p=" + p + ", length=" + length + ", dnIndex=" + dnIndex);
+    LOG.info("p=" + p + ", length=" + length + ", killPos=" + killPos
+        + ", dnIndex=" + dnIndex);
+    Preconditions.checkArgument(killPos < length);
+    Preconditions.checkArgument(killPos > FLUSH_POS);
     final String fullPath = p.toString();
 
     final AtomicInteger pos = new AtomicInteger();
     final FSDataOutputStream out = dfs.create(p);
-    final AtomicBoolean killed = new AtomicBoolean();
-    final Thread killer = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        killDatanode(cluster, (DFSStripedOutputStream)out.getWrappedStream(),
-            dnIndex, pos);
-        killed.set(true);
-      }
-    });
-    killer.start();
+    final DFSStripedOutputStream stripedOut
+        = (DFSStripedOutputStream)out.getWrappedStream();
 
-    final int mask = (1 << 16) - 1;
+    long oldGS = -1;
+    boolean killed = false;
     for(; pos.get() < length; ) {
       final int i = pos.getAndIncrement();
+      if (i == killPos) {
+        final long gs = getGenerationStamp(stripedOut);
+        Assert.assertTrue(oldGS != -1);
+        Assert.assertEquals(oldGS, gs);
+
+        killDatanode(cluster, stripedOut, dnIndex, pos);
+        killed = true;
+      }
+
       write(out, i);
-      if ((i & mask) == 0) {
-        final long ms = 100;
-        LOG.info("i=" + i + " sleep " + ms);
-        Thread.sleep(ms);
+
+      if (i == FLUSH_POS) {
+        oldGS = getGenerationStamp(stripedOut);
       }
     }
-    killer.join(10000);
-    Assert.assertTrue(killed.get());
     out.close();
+    Assert.assertTrue(killed);
 
     // check file length
     final FileStatus status = dfs.getFileStatus(p);
     Assert.assertEquals(length, status.getLen());
 
-    checkData(dfs, fullPath, length, dnIndex);
+    checkData(dfs, fullPath, length, dnIndex, oldGS);
   }
 
   static void write(FSDataOutputStream out, int i) throws IOException {
@@ -203,6 +210,14 @@ public class TestDFSStripedOutputStreamWithFailure {
     }
   }
 
+  static long getGenerationStamp(DFSStripedOutputStream out)
+      throws IOException {
+    final long gs = DFSTestUtil.flushInternal(out).getGenerationStamp();
+    LOG.info("getGenerationStamp returns " + gs);
+    return gs;
+
+  }
+
   static DatanodeInfo getDatanodes(StripedDataStreamer streamer) {
     for(;;) {
       final DatanodeInfo[] datanodes = streamer.getNodes();
@@ -228,7 +243,7 @@ public class TestDFSStripedOutputStreamWithFailure {
   }
 
   static void checkData(DistributedFileSystem dfs, String src, int length,
-      int killedDnIndex) throws IOException {
+      int killedDnIndex, long oldGS) throws IOException {
     List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
     LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(src, 0L);
     final int expectedNumGroup = (length - 1)/BLOCK_GROUP_SIZE + 1;
@@ -236,6 +251,12 @@ public class TestDFSStripedOutputStreamWithFailure {
 
     for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) {
       Assert.assertTrue(firstBlock instanceof LocatedStripedBlock);
+
+      final long gs = firstBlock.getBlock().getGenerationStamp();
+      final String s = "gs=" + gs + ", oldGS=" + oldGS;
+      LOG.info(s);
+      Assert.assertTrue(s, gs > oldGS);
+
       LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
           (LocatedStripedBlock) firstBlock,
           CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
@@ -247,7 +268,7 @@ public class TestDFSStripedOutputStreamWithFailure {
       final boolean isLastGroup = group == blockGroupList.size() - 1;
       final int groupSize = !isLastGroup? BLOCK_GROUP_SIZE
           : length - (blockGroupList.size() - 1)*BLOCK_GROUP_SIZE;
-      final int numCellInGroup = (int)((groupSize - 1)/CELL_SIZE + 1);
+      final int numCellInGroup = (groupSize - 1)/CELL_SIZE + 1;
       final int lastCellIndex = (numCellInGroup - 1) % NUM_DATA_BLOCKS;
       final int lastCellSize = groupSize - (numCellInGroup - 1)*CELL_SIZE;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4dd4aa57/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
index a35cbf4..7876d1a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
@@ -105,6 +105,14 @@ public class TestAddStripedBlocks {
     Assert.assertEquals(firstId + HdfsServerConstants.MAX_BLOCKS_IN_GROUP, secondId);
   }
 
+  private static void writeAndFlushStripedOutputStream(
+      DFSStripedOutputStream out, int chunkSize) throws IOException {
+    // FSOutputSummer.BUFFER_NUM_CHUNKS == 9
+    byte[] toWrite = new byte[chunkSize * 9 + 1];
+    out.write(toWrite);
+    DFSTestUtil.flushInternal(out);
+  }
+
   @Test (timeout=60000)
   public void testAddStripedBlock() throws Exception {
     final Path file = new Path("/file1");
@@ -112,7 +120,7 @@ public class TestAddStripedBlocks {
     FSDataOutputStream out = null;
     try {
       out = dfs.create(file, (short) 1);
-      DFSTestUtil.writeAndFlushStripedOutputStream(
+      writeAndFlushStripedOutputStream(
           (DFSStripedOutputStream) out.getWrappedStream(),
           DFS_BYTES_PER_CHECKSUM_DEFAULT);
 
@@ -190,7 +198,7 @@ public class TestAddStripedBlocks {
     FSDataOutputStream out = null;
     try {
       out = dfs.create(file, (short) 1);
-      DFSTestUtil.writeAndFlushStripedOutputStream(
+      writeAndFlushStripedOutputStream(
           (DFSStripedOutputStream) out.getWrappedStream(),
           DFS_BYTES_PER_CHECKSUM_DEFAULT);
 


Mime
View raw message