hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z..@apache.org
Subject [11/21] hadoop git commit: HDFS-7285. Erasure Coding Support inside HDFS.
Date Tue, 11 Aug 2015 17:45:01 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
index 732e9cf..6a313a1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -44,10 +45,13 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.util.ChunkedArrayList;
@@ -73,7 +77,7 @@ class FSDirWriteFileOp {
       Block block) throws IOException {
     // modify file-> block and blocksMap
     // fileNode should be under construction
-    BlockInfoContiguousUnderConstruction uc = fileNode.removeLastBlock(block);
+    BlockInfoUnderConstruction uc = fileNode.removeLastBlock(block);
     if (uc == null) {
       return false;
     }
@@ -87,7 +91,7 @@ class FSDirWriteFileOp {
 
     // update space consumed
     fsd.updateCount(iip, 0, -fileNode.getPreferredBlockSize(),
-                    fileNode.getPreferredBlockReplication(), true);
+        fileNode.getPreferredBlockReplication(), true);
     return true;
   }
 
@@ -167,9 +171,10 @@ class FSDirWriteFileOp {
       String src, long fileId, String clientName,
       ExtendedBlock previous, LocatedBlock[] onRetryBlock) throws IOException {
     final long blockSize;
-    final int replication;
+    final short numTargets;
     final byte storagePolicyID;
     String clientMachine;
+    final boolean isStriped;
 
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     src = fsn.dir.resolvePath(pc, src, pathComponents);
@@ -195,18 +200,21 @@ class FSDirWriteFileOp {
     blockSize = pendingFile.getPreferredBlockSize();
     clientMachine = pendingFile.getFileUnderConstructionFeature()
         .getClientMachine();
-    replication = pendingFile.getFileReplication();
+    isStriped = pendingFile.isStriped();
+    numTargets = isStriped ?
+        HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS :
+        pendingFile.getFileReplication();
     storagePolicyID = pendingFile.getStoragePolicyID();
-    return new ValidateAddBlockResult(blockSize, replication, storagePolicyID,
-                                    clientMachine);
+    return new ValidateAddBlockResult(blockSize, numTargets, storagePolicyID,
+                                      clientMachine, isStriped);
   }
 
-  static LocatedBlock makeLocatedBlock(FSNamesystem fsn, Block blk,
+  static LocatedBlock makeLocatedBlock(FSNamesystem fsn, BlockInfo blk,
       DatanodeStorageInfo[] locs, long offset) throws IOException {
     LocatedBlock lBlk = BlockManager.newLocatedBlock(fsn.getExtendedBlock(blk),
-                                                     locs, offset, false);
-    fsn.getFSDirectory().getBlockManager()
-        .setBlockToken(lBlk, BlockTokenIdentifier.AccessMode.WRITE);
+        blk, locs, offset);
+    fsn.getBlockManager().setBlockToken(lBlk,
+        BlockTokenIdentifier.AccessMode.WRITE);
     return lBlk;
   }
 
@@ -236,8 +244,9 @@ class FSDirWriteFileOp {
       } else {
         // add new chosen targets to already allocated block and return
         BlockInfo lastBlockInFile = pendingFile.getLastBlock();
-        ((BlockInfoContiguousUnderConstruction) lastBlockInFile)
-            .setExpectedLocations(targets);
+        final BlockInfoUnderConstruction uc
+            = (BlockInfoUnderConstruction)lastBlockInFile;
+        uc.setExpectedLocations(targets);
         offset = pendingFile.computeFileSize();
         return makeLocatedBlock(fsn, lastBlockInFile, targets, offset);
       }
@@ -248,15 +257,17 @@ class FSDirWriteFileOp {
                                   ExtendedBlock.getLocalBlock(previous));
 
     // allocate new block, record block locations in INode.
-    Block newBlock = fsn.createNewBlock();
+    final boolean isStriped = pendingFile.isStriped();
+    // allocate new block, record block locations in INode.
+    Block newBlock = fsn.createNewBlock(isStriped);
     INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
-    saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets);
+    saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets, isStriped);
 
     persistNewBlock(fsn, src, pendingFile);
     offset = pendingFile.computeFileSize();
 
     // Return located block
-    return makeLocatedBlock(fsn, newBlock, targets, offset);
+    return makeLocatedBlock(fsn, fsn.getStoredBlock(newBlock), targets, offset);
   }
 
   static DatanodeStorageInfo[] chooseTargetForNewBlock(
@@ -277,9 +288,10 @@ class FSDirWriteFileOp {
         : Arrays.asList(favoredNodes);
 
     // choose targets for the new block to be allocated.
-    return bm.chooseTarget4NewBlock(src, r.replication, clientNode,
+    return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode,
                                     excludedNodesSet, r.blockSize,
-                                    favoredNodesList, r.storagePolicyID);
+                                    favoredNodesList, r.storagePolicyID,
+                                    r.isStriped);
   }
 
   /**
@@ -468,22 +480,22 @@ class FSDirWriteFileOp {
       long preferredBlockSize, boolean underConstruction, String clientName,
       String clientMachine, byte storagePolicyId) {
     final INodeFile newNode;
+    Preconditions.checkNotNull(existing);
     assert fsd.hasWriteLock();
-    if (underConstruction) {
-      newNode = newINodeFile(id, permissions, modificationTime,
-                                              modificationTime, replication,
-                                              preferredBlockSize,
-                                              storagePolicyId);
-      newNode.toUnderConstruction(clientName, clientMachine);
-    } else {
-      newNode = newINodeFile(id, permissions, modificationTime,
-                                              atime, replication,
-                                              preferredBlockSize,
-                                              storagePolicyId);
-    }
-
-    newNode.setLocalName(localName);
     try {
+      // check if the file is in an EC zone
+      final boolean isStriped = FSDirErasureCodingOp.isInErasureCodingZone(
+          fsd.getFSNamesystem(), existing);
+      if (underConstruction) {
+        newNode = newINodeFile(id, permissions, modificationTime,
+            modificationTime, replication, preferredBlockSize, storagePolicyId,
+            isStriped);
+        newNode.toUnderConstruction(clientName, clientMachine);
+      } else {
+        newNode = newINodeFile(id, permissions, modificationTime, atime,
+            replication, preferredBlockSize, storagePolicyId, isStriped);
+      }
+      newNode.setLocalName(localName);
       INodesInPath iip = fsd.addINode(existing, newNode);
       if (iip != null) {
         if (aclEntries != null) {
@@ -509,23 +521,38 @@ class FSDirWriteFileOp {
    */
   private static BlockInfo addBlock(
       FSDirectory fsd, String path, INodesInPath inodesInPath, Block block,
-      DatanodeStorageInfo[] targets) throws IOException {
+      DatanodeStorageInfo[] targets, boolean isStriped) throws IOException {
     fsd.writeLock();
     try {
       final INodeFile fileINode = inodesInPath.getLastINode().asFile();
       Preconditions.checkState(fileINode.isUnderConstruction());
 
-      // check quota limits and updated space consumed
-      fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
-          fileINode.getPreferredBlockReplication(), true);
-
       // associate new last block for the file
-      BlockInfoContiguousUnderConstruction blockInfo =
-        new BlockInfoContiguousUnderConstruction(
-            block,
-            fileINode.getFileReplication(),
-            HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION,
+      final BlockInfo blockInfo;
+      if (isStriped) {
+        ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone(
+            fsd.getFSNamesystem(), inodesInPath);
+        ECSchema ecSchema = ecZone.getSchema();
+        short numDataUnits = (short) ecSchema.getNumDataUnits();
+        short numParityUnits = (short) ecSchema.getNumParityUnits();
+        short numLocations = (short) (numDataUnits + numParityUnits);
+
+        // check quota limits and updated space consumed
+        fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
+            numLocations, true);
+        blockInfo = new BlockInfoStripedUnderConstruction(block, ecSchema,
+            ecZone.getCellSize(),
+            HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
+      } else {
+        // check quota limits and updated space consumed
+        fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(),
+            fileINode.getPreferredBlockReplication(), true);
+
+        short numLocations = fileINode.getFileReplication();
+        blockInfo = new BlockInfoContiguousUnderConstruction(block,
+            numLocations, HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION,
             targets);
+      }
       fsd.getBlockManager().addBlockCollection(blockInfo, fileINode);
       fileINode.addBlock(blockInfo);
 
@@ -551,22 +578,24 @@ class FSDirWriteFileOp {
       String clientName, String clientMachine)
       throws IOException {
 
+    Preconditions.checkNotNull(existing);
     long modTime = now();
-    INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
-                                     modTime, modTime, replication, preferredBlockSize);
-    newNode.setLocalName(localName.getBytes(Charsets.UTF_8));
-    newNode.toUnderConstruction(clientName, clientMachine);
-
     INodesInPath newiip;
     fsd.writeLock();
     try {
+      final boolean isStriped = FSDirErasureCodingOp.isInErasureCodingZone(
+          fsd.getFSNamesystem(), existing);
+      INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
+          modTime, modTime, replication, preferredBlockSize, isStriped);
+      newNode.setLocalName(localName.getBytes(Charsets.UTF_8));
+      newNode.toUnderConstruction(clientName, clientMachine);
       newiip = fsd.addINode(existing, newNode);
     } finally {
       fsd.writeUnlock();
     }
     if (newiip == null) {
       NameNode.stateChangeLog.info("DIR* addFile: failed to add " +
-                                       existing.getPath() + "/" + localName);
+          existing.getPath() + "/" + localName);
       return null;
     }
 
@@ -579,7 +608,7 @@ class FSDirWriteFileOp {
   private static FileState analyzeFileState(
       FSNamesystem fsn, String src, long fileId, String clientName,
       ExtendedBlock previous, LocatedBlock[] onRetryBlock)
-          throws IOException  {
+      throws IOException {
     assert fsn.hasReadLock();
 
     checkBlock(fsn, previous);
@@ -662,8 +691,8 @@ class FSDirWriteFileOp {
             "allocation of a new block in " + src + ". Returning previously" +
             " allocated block " + lastBlockInFile);
         long offset = file.computeFileSize();
-        BlockInfoContiguousUnderConstruction lastBlockUC =
-            (BlockInfoContiguousUnderConstruction) lastBlockInFile;
+        BlockInfoUnderConstruction lastBlockUC =
+            (BlockInfoUnderConstruction) lastBlockInFile;
         onRetryBlock[0] = makeLocatedBlock(fsn, lastBlockInFile,
             lastBlockUC.getExpectedStorageLocations(), offset);
         return new FileState(file, src, iip);
@@ -688,14 +717,8 @@ class FSDirWriteFileOp {
     checkBlock(fsn, last);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     src = fsn.dir.resolvePath(pc, src, pathComponents);
-    boolean success = completeFileInternal(fsn, src, holder,
-                                           ExtendedBlock.getLocalBlock(last),
-                                           fileId);
-    if (success) {
-      NameNode.stateChangeLog.info("DIR* completeFile: " + srcArg
-                                       + " is closed by " + holder);
-    }
-    return success;
+    return completeFileInternal(fsn, src, holder,
+        ExtendedBlock.getLocalBlock(last), fileId);
   }
 
   private static boolean completeFileInternal(
@@ -760,16 +783,18 @@ class FSDirWriteFileOp {
 
   private static INodeFile newINodeFile(
       long id, PermissionStatus permissions, long mtime, long atime,
-      short replication, long preferredBlockSize, byte storagePolicyId) {
+      short replication, long preferredBlockSize, byte storagePolicyId,
+      boolean isStriped) {
     return new INodeFile(id, null, permissions, mtime, atime,
         BlockInfo.EMPTY_ARRAY, replication, preferredBlockSize,
-        storagePolicyId);
+        storagePolicyId, isStriped);
   }
 
   private static INodeFile newINodeFile(long id, PermissionStatus permissions,
-      long mtime, long atime, short replication, long preferredBlockSize) {
+      long mtime, long atime, short replication, long preferredBlockSize,
+      boolean isStriped) {
     return newINodeFile(id, permissions, mtime, atime, replication,
-        preferredBlockSize, (byte)0);
+        preferredBlockSize, (byte)0, isStriped);
   }
 
   /**
@@ -797,13 +822,12 @@ class FSDirWriteFileOp {
    * @param targets target datanodes where replicas of the new block is placed
    * @throws QuotaExceededException If addition of block exceeds space quota
    */
-  private static void saveAllocatedBlock(
-      FSNamesystem fsn, String src, INodesInPath inodesInPath, Block newBlock,
-      DatanodeStorageInfo[] targets)
-      throws IOException {
+  private static void saveAllocatedBlock(FSNamesystem fsn, String src,
+      INodesInPath inodesInPath, Block newBlock, DatanodeStorageInfo[] targets,
+      boolean isStriped) throws IOException {
     assert fsn.hasWriteLock();
-    BlockInfo b = addBlock(fsn.dir, src, inodesInPath, newBlock,
-                                     targets);
+    BlockInfo b = addBlock(fsn.dir, src, inodesInPath, newBlock, targets,
+        isStriped);
     NameNode.stateChangeLog.info("BLOCK* allocate " + b + " for " + src);
     DatanodeStorageInfo.incrementBlocksScheduled(targets);
   }
@@ -852,17 +876,19 @@ class FSDirWriteFileOp {
 
   static class ValidateAddBlockResult {
     final long blockSize;
-    final int replication;
+    final int numTargets;
     final byte storagePolicyID;
     final String clientMachine;
+    final boolean isStriped;
 
     ValidateAddBlockResult(
-        long blockSize, int replication, byte storagePolicyID,
-        String clientMachine) {
+        long blockSize, int numTargets, byte storagePolicyID,
+        String clientMachine, boolean isStriped) {
       this.blockSize = blockSize;
-      this.replication = replication;
+      this.numTargets = numTargets;
       this.storagePolicyID = storagePolicyID;
       this.clientMachine = clientMachine;
+      this.isStriped = isStriped;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index ccee1ae..ce48595 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -201,6 +201,9 @@ public class FSDirectory implements Closeable {
   @VisibleForTesting
   public final EncryptionZoneManager ezManager;
 
+  @VisibleForTesting
+  public final ErasureCodingZoneManager ecZoneManager;
+
   /**
    * Caches frequently used file names used in {@link INode} to reuse 
    * byte[] objects and reduce heap usage.
@@ -292,6 +295,7 @@ public class FSDirectory implements Closeable {
     namesystem = ns;
     this.editLog = ns.getEditLog();
     ezManager = new EncryptionZoneManager(this, conf);
+    ecZoneManager = new ErasureCodingZoneManager(this);
   }
     
   FSNamesystem getFSNamesystem() {
@@ -513,7 +517,7 @@ public class FSDirectory implements Closeable {
     final INodeFile fileINode = iip.getLastINode().asFile();
     EnumCounters<StorageType> typeSpaceDeltas =
       getStorageTypeDeltas(fileINode.getStoragePolicyID(), ssDelta,
-          replication, replication);;
+          replication, replication);
     updateCount(iip, iip.length() - 1,
       new QuotaCounts.Builder().nameSpace(nsDelta).storageSpace(ssDelta * replication).
           typeSpaces(typeSpaceDeltas).build(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 3dd076d..246fcd8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -36,16 +36,20 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
@@ -413,7 +417,9 @@ public class FSEditLogLoader {
       // Update the salient file attributes.
       newFile.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
       newFile.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
-      updateBlocks(fsDir, addCloseOp, iip, newFile);
+      ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone(
+          fsDir.getFSNamesystem(), iip);
+      updateBlocks(fsDir, addCloseOp, iip, newFile, ecZone);
       break;
     }
     case OP_CLOSE: {
@@ -433,7 +439,9 @@ public class FSEditLogLoader {
       // Update the salient file attributes.
       file.setAccessTime(addCloseOp.atime, Snapshot.CURRENT_STATE_ID);
       file.setModificationTime(addCloseOp.mtime, Snapshot.CURRENT_STATE_ID);
-      updateBlocks(fsDir, addCloseOp, iip, file);
+      ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone(
+          fsDir.getFSNamesystem(), iip);
+      updateBlocks(fsDir, addCloseOp, iip, file, ecZone);
 
       // Now close the file
       if (!file.isUnderConstruction() &&
@@ -491,8 +499,10 @@ public class FSEditLogLoader {
       INodesInPath iip = fsDir.getINodesInPath(path, true);
       INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path);
       // Update in-memory data structures
-      updateBlocks(fsDir, updateOp, iip, oldFile);
-      
+      ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone(
+          fsDir.getFSNamesystem(), iip);
+      updateBlocks(fsDir, updateOp, iip, oldFile, ecZone);
+
       if (toAddRetryCache) {
         fsNamesys.addCacheEntry(updateOp.rpcClientId, updateOp.rpcCallId);
       }
@@ -505,9 +515,12 @@ public class FSEditLogLoader {
         FSNamesystem.LOG.debug(op.opCode + ": " + path +
             " new block id : " + addBlockOp.getLastBlock().getBlockId());
       }
-      INodeFile oldFile = INodeFile.valueOf(fsDir.getINode(path), path);
+      INodesInPath iip = fsDir.getINodesInPath(path, true);
+      INodeFile oldFile = INodeFile.valueOf(iip.getLastINode(), path);
       // add the new block to the INodeFile
-      addNewBlock(fsDir, addBlockOp, oldFile);
+      ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone(
+          fsDir.getFSNamesystem(), iip);
+      addNewBlock(addBlockOp, oldFile, ecZone);
       break;
     }
     case OP_SET_REPLICATION: {
@@ -787,8 +800,15 @@ public class FSEditLogLoader {
     }
     case OP_ALLOCATE_BLOCK_ID: {
       AllocateBlockIdOp allocateBlockIdOp = (AllocateBlockIdOp) op;
-      fsNamesys.getBlockIdManager().setLastAllocatedBlockId(
-          allocateBlockIdOp.blockId);
+      if (BlockIdManager.isStripedBlockID(allocateBlockIdOp.blockId)) {
+        // ALLOCATE_BLOCK_ID is added for sequential block id, thus if the id
+        // is negative, it must belong to striped blocks
+        fsNamesys.getBlockIdManager().setLastAllocatedStripedBlockId(
+            allocateBlockIdOp.blockId);
+      } else {
+        fsNamesys.getBlockIdManager().setLastAllocatedContiguousBlockId(
+            allocateBlockIdOp.blockId);
+      }
       break;
     }
     case OP_ROLLING_UPGRADE_START: {
@@ -941,16 +961,16 @@ public class FSEditLogLoader {
   /**
    * Add a new block into the given INodeFile
    */
-  private void addNewBlock(FSDirectory fsDir, AddBlockOp op, INodeFile file)
-      throws IOException {
+  private void addNewBlock(AddBlockOp op, INodeFile file,
+      ErasureCodingZone ecZone) throws IOException {
     BlockInfo[] oldBlocks = file.getBlocks();
     Block pBlock = op.getPenultimateBlock();
     Block newBlock= op.getLastBlock();
     
     if (pBlock != null) { // the penultimate block is not null
-      Preconditions.checkState(oldBlocks != null && oldBlocks.length > 0);
+      assert oldBlocks != null && oldBlocks.length > 0;
       // compare pBlock with the last block of oldBlocks
-      Block oldLastBlock = oldBlocks[oldBlocks.length - 1];
+      BlockInfo oldLastBlock = oldBlocks[oldBlocks.length - 1];
       if (oldLastBlock.getBlockId() != pBlock.getBlockId()
           || oldLastBlock.getGenerationStamp() != pBlock.getGenerationStamp()) {
         throw new IOException(
@@ -960,19 +980,25 @@ public class FSEditLogLoader {
       }
       
       oldLastBlock.setNumBytes(pBlock.getNumBytes());
-      if (oldLastBlock instanceof BlockInfoContiguousUnderConstruction) {
-        fsNamesys.getBlockManager().forceCompleteBlock(file,
-            (BlockInfoContiguousUnderConstruction) oldLastBlock);
+      if (!oldLastBlock.isComplete()) {
+        fsNamesys.getBlockManager().forceCompleteBlock(file, oldLastBlock);
         fsNamesys.getBlockManager().processQueuedMessagesForBlock(pBlock);
       }
     } else { // the penultimate block is null
       Preconditions.checkState(oldBlocks == null || oldBlocks.length == 0);
     }
     // add the new block
-    BlockInfo newBI = new BlockInfoContiguousUnderConstruction(
-          newBlock, file.getPreferredBlockReplication());
-    fsNamesys.getBlockManager().addBlockCollection(newBI, file);
-    file.addBlock(newBI);
+    final BlockInfo newBlockInfo;
+    boolean isStriped = ecZone != null;
+    if (isStriped) {
+      newBlockInfo = new BlockInfoStripedUnderConstruction(newBlock,
+          ecZone.getSchema(), ecZone.getCellSize());
+    } else {
+      newBlockInfo = new BlockInfoContiguousUnderConstruction(newBlock,
+          file.getPreferredBlockReplication());
+    }
+    fsNamesys.getBlockManager().addBlockCollectionWithCheck(newBlockInfo, file);
+    file.addBlock(newBlockInfo);
     fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock);
   }
   
@@ -981,7 +1007,8 @@ public class FSEditLogLoader {
    * @throws IOException
    */
   private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
-      INodesInPath iip, INodeFile file) throws IOException {
+      INodesInPath iip, INodeFile file, ErasureCodingZone ecZone)
+      throws IOException {
     // Update its block list
     BlockInfo[] oldBlocks = file.getBlocks();
     Block[] newBlocks = op.getBlocks();
@@ -1010,11 +1037,10 @@ public class FSEditLogLoader {
         oldBlock.getGenerationStamp() != newBlock.getGenerationStamp();
       oldBlock.setGenerationStamp(newBlock.getGenerationStamp());
       
-      if (oldBlock instanceof BlockInfoContiguousUnderConstruction &&
+      if (!oldBlock.isComplete() &&
           (!isLastBlock || op.shouldCompleteLastBlock())) {
         changeMade = true;
-        fsNamesys.getBlockManager().forceCompleteBlock(file,
-            (BlockInfoContiguousUnderConstruction) oldBlock);
+        fsNamesys.getBlockManager().forceCompleteBlock(file, oldBlock);
       }
       if (changeMade) {
         // The state or gen-stamp of the block has changed. So, we may be
@@ -1041,25 +1067,38 @@ public class FSEditLogLoader {
         throw new IOException("Trying to delete non-existant block " + oldBlock);
       }
     } else if (newBlocks.length > oldBlocks.length) {
+      final boolean isStriped = ecZone != null;
       // We're adding blocks
       for (int i = oldBlocks.length; i < newBlocks.length; i++) {
         Block newBlock = newBlocks[i];
-        BlockInfo newBI;
+        final BlockInfo newBI;
         if (!op.shouldCompleteLastBlock()) {
           // TODO: shouldn't this only be true for the last block?
           // what about an old-version fsync() where fsync isn't called
           // until several blocks in?
-          newBI = new BlockInfoContiguousUnderConstruction(
-              newBlock, file.getPreferredBlockReplication());
+          if (isStriped) {
+            newBI = new BlockInfoStripedUnderConstruction(newBlock,
+                ecZone.getSchema(), ecZone.getCellSize());
+          } else {
+            newBI = new BlockInfoContiguousUnderConstruction(newBlock,
+                file.getPreferredBlockReplication());
+          }
         } else {
           // OP_CLOSE should add finalized blocks. This code path
           // is only executed when loading edits written by prior
           // versions of Hadoop. Current versions always log
           // OP_ADD operations as each block is allocated.
-          newBI = new BlockInfoContiguous(newBlock,
-              file.getPreferredBlockReplication());
+          // TODO: ECSchema can be restored from persisted file (HDFS-7859).
+          if (isStriped) {
+            newBI = new BlockInfoStriped(newBlock,
+                ErasureCodingSchemaManager.getSystemDefaultSchema(),
+                ecZone.getCellSize());
+          } else {
+            newBI = new BlockInfoContiguous(newBlock,
+                file.getPreferredBlockReplication());
+          }
         }
-        fsNamesys.getBlockManager().addBlockCollection(newBI, file);
+        fsNamesys.getBlockManager().addBlockCollectionWithCheck(newBI, file);
         file.addBlock(newBI);
         fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
index 30517d0..1a1d6b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.fs.PathIsNotDirectoryException;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutFlags;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
@@ -359,7 +360,8 @@ public class FSImageFormat {
 
           // read the max sequential block ID.
           long maxSequentialBlockId = in.readLong();
-          namesystem.getBlockIdManager().setLastAllocatedBlockId(maxSequentialBlockId);
+          namesystem.getBlockIdManager().setLastAllocatedContiguousBlockId(
+              maxSequentialBlockId);
         } else {
 
           long startingGenStamp = namesystem.getBlockIdManager()
@@ -691,7 +693,7 @@ public class FSImageFormat {
       if (blocks != null) {
         final BlockManager bm = namesystem.getBlockManager();
         for (int i = 0; i < blocks.length; i++) {
-          file.setBlock(i, bm.addBlockCollection(blocks[i], file));
+          file.setBlock(i, bm.addBlockCollectionWithCheck(blocks[i], file));
         } 
       }
     }
@@ -791,14 +793,14 @@ public class FSImageFormat {
         counter.increment();
       }
 
-      final INodeFile file = new INodeFile(inodeId, localName, permissions,
+      INodeFile file = new INodeFile(inodeId, localName, permissions,
           modificationTime, atime, blocks, replication, blockSize);
       if (underConstruction) {
         file.toUnderConstruction(clientName, clientMachine);
       }
-        return fileDiffs == null ? file : new INodeFile(file, fileDiffs);
-      } else if (numBlocks == -1) {
-        //directory
+      return fileDiffs == null ? file : new INodeFile(file, fileDiffs);
+    } else if (numBlocks == -1) {
+      //directory
       
       //read quotas
       final long nsQuota = in.readLong();
@@ -897,7 +899,7 @@ public class FSImageFormat {
       final long preferredBlockSize = in.readLong();
 
       return new INodeFileAttributes.SnapshotCopy(name, permissions, null, modificationTime,
-          accessTime, replication, preferredBlockSize, (byte) 0, null);
+          accessTime, replication, preferredBlockSize, (byte) 0, null, false);
     }
 
     public INodeDirectoryAttributes loadINodeDirectoryAttributes(DataInput in)
@@ -961,8 +963,8 @@ public class FSImageFormat {
         if (oldnode.numBlocks() > 0) {
           BlockInfo ucBlock = cons.getLastBlock();
           // we do not replace the inode, just replace the last block of oldnode
-          BlockInfo info = namesystem.getBlockManager().addBlockCollection(
-              ucBlock, oldnode);
+          BlockInfo info = namesystem.getBlockManager()
+              .addBlockCollectionWithCheck(ucBlock, oldnode);
           oldnode.setBlock(oldnode.numBlocks() - 1, info);
         }
 
@@ -1269,7 +1271,7 @@ public class FSImageFormat {
         out.writeLong(sourceNamesystem.getBlockIdManager().getGenerationStampV1());
         out.writeLong(sourceNamesystem.getBlockIdManager().getGenerationStampV2());
         out.writeLong(sourceNamesystem.getBlockIdManager().getGenerationStampAtblockIdSwitch());
-        out.writeLong(sourceNamesystem.getBlockIdManager().getLastAllocatedBlockId());
+        out.writeLong(sourceNamesystem.getBlockIdManager().getLastAllocatedContiguousBlockId());
         out.writeLong(context.getTxId());
         out.writeLong(sourceNamesystem.dir.getLastInodeId());
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
index e8378e5..c4fc1ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java
@@ -45,6 +45,8 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.LoaderContext;
 import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SaverContext;
@@ -68,6 +70,7 @@ import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.protobuf.ByteString;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 
 @InterfaceAudience.Private
 public final class FSImageFormatPBINode {
@@ -218,7 +221,7 @@ public final class FSImageFormatPBINode {
       final BlockInfo[] blocks = file.getBlocks();
       if (blocks != null) {
         for (int i = 0; i < blocks.length; i++) {
-          file.setBlock(i, bm.addBlockCollection(blocks[i], file));
+          file.setBlock(i, bm.addBlockCollectionWithCheck(blocks[i], file));
         }
       }
     }
@@ -331,27 +334,39 @@ public final class FSImageFormatPBINode {
       INodeSection.INodeFile f = n.getFile();
       List<BlockProto> bp = f.getBlocksList();
       short replication = (short) f.getReplication();
+      boolean isStriped = f.getIsStriped();
       LoaderContext state = parent.getLoaderContext();
+      ECSchema schema = ErasureCodingSchemaManager.getSystemDefaultSchema();
 
+      if (isStriped) {
+        Preconditions.checkState(f.hasStripingCellSize());
+      }
       BlockInfo[] blocks = new BlockInfo[bp.size()];
-      for (int i = 0, e = bp.size(); i < e; ++i) {
-        blocks[i] =
-            new BlockInfoContiguous(PBHelper.convert(bp.get(i)), replication);
+      for (int i = 0; i < bp.size(); ++i) {
+        BlockProto b = bp.get(i);
+        if (isStriped) {
+          blocks[i] = new BlockInfoStriped(PBHelper.convert(b), schema,
+              (int)f.getStripingCellSize());
+        } else {
+          blocks[i] = new BlockInfoContiguous(PBHelper.convert(b),
+              replication);
+        }
       }
+
       final PermissionStatus permissions = loadPermission(f.getPermission(),
           parent.getLoaderContext().getStringTable());
 
       final INodeFile file = new INodeFile(n.getId(),
           n.getName().toByteArray(), permissions, f.getModificationTime(),
           f.getAccessTime(), blocks, replication, f.getPreferredBlockSize(),
-          (byte)f.getStoragePolicyID());
+          (byte)f.getStoragePolicyID(), isStriped);
 
       if (f.hasAcl()) {
         int[] entries = AclEntryStatusFormat.toInt(loadAclEntries(
             f.getAcl(), state.getStringTable()));
         file.addAclFeature(new AclFeature(entries));
       }
-      
+
       if (f.hasXAttrs()) {
         file.addXAttrFeature(new XAttrFeature(
             loadXAttrs(f.getXAttrs(), state.getStringTable())));
@@ -364,8 +379,16 @@ public final class FSImageFormatPBINode {
         if (blocks.length > 0) {
           BlockInfo lastBlk = file.getLastBlock();
           // replace the last block of file
-          file.setBlock(file.numBlocks() - 1, new BlockInfoContiguousUnderConstruction(
-              lastBlk, replication));
+          final BlockInfo ucBlk;
+          if (isStriped) {
+            BlockInfoStriped striped = (BlockInfoStriped) lastBlk;
+            ucBlk = new BlockInfoStripedUnderConstruction(striped,
+                schema, (int)f.getStripingCellSize());
+          } else {
+            ucBlk = new BlockInfoContiguousUnderConstruction(lastBlk,
+                replication);
+          }
+          file.setBlock(file.numBlocks() - 1, ucBlk);
         }
       }
       return file;
@@ -479,7 +502,8 @@ public final class FSImageFormatPBINode {
           .setPermission(buildPermissionStatus(file, state.getStringMap()))
           .setPreferredBlockSize(file.getPreferredBlockSize())
           .setReplication(file.getFileReplication())
-          .setStoragePolicyID(file.getLocalStoragePolicyID());
+          .setStoragePolicyID(file.getLocalStoragePolicyID())
+          .setIsStriped(file.isStriped());
 
       AclFeature f = file.getAclFeature();
       if (f != null) {
@@ -633,13 +657,24 @@ public final class FSImageFormatPBINode {
     private void save(OutputStream out, INodeFile n) throws IOException {
       INodeSection.INodeFile.Builder b = buildINodeFile(n,
           parent.getSaverContext());
+      BlockInfo[] blocks = n.getBlocks();
 
-      if (n.getBlocks() != null) {
+      if (blocks != null) {
         for (Block block : n.getBlocks()) {
           b.addBlocks(PBHelper.convert(block));
         }
       }
 
+      if (n.isStriped()) {
+        if (blocks != null && blocks.length > 0) {
+          BlockInfo firstBlock = blocks[0];
+          Preconditions.checkState(firstBlock.isStriped());
+          b.setStripingCellSize(((BlockInfoStriped)firstBlock).getCellSize());
+        } else {
+          b.setStripingCellSize(HdfsConstants.BLOCK_STRIPED_CELL_SIZE);
+        }
+      }
+
       FileUnderConstructionFeature uc = n.getFileUnderConstructionFeature();
       if (uc != null) {
         INodeSection.FileUnderConstructionFeature f =
@@ -668,7 +703,7 @@ public final class FSImageFormatPBINode {
       r.writeDelimitedTo(out);
     }
 
-    private final INodeSection.INode.Builder buildINodeCommon(INode n) {
+    private INodeSection.INode.Builder buildINodeCommon(INode n) {
       return INodeSection.INode.newBuilder()
           .setId(n.getId())
           .setName(ByteString.copyFrom(n.getLocalNameBytes()));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
index 7c8a857..ef0cc1d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java
@@ -297,7 +297,11 @@ public final class FSImageFormatProtobuf {
       blockIdManager.setGenerationStampV1(s.getGenstampV1());
       blockIdManager.setGenerationStampV2(s.getGenstampV2());
       blockIdManager.setGenerationStampV1Limit(s.getGenstampV1Limit());
-      blockIdManager.setLastAllocatedBlockId(s.getLastAllocatedBlockId());
+      blockIdManager.setLastAllocatedContiguousBlockId(s.getLastAllocatedBlockId());
+      if (s.hasLastAllocatedStripedBlockId()) {
+        blockIdManager.setLastAllocatedStripedBlockId(
+            s.getLastAllocatedStripedBlockId());
+      }
       imgTxId = s.getTransactionId();
       if (s.hasRollingUpgradeStartTime()
           && fsn.getFSImage().hasRollbackFSImage()) {
@@ -549,7 +553,8 @@ public final class FSImageFormatProtobuf {
           .setGenstampV1(blockIdManager.getGenerationStampV1())
           .setGenstampV1Limit(blockIdManager.getGenerationStampV1Limit())
           .setGenstampV2(blockIdManager.getGenerationStampV2())
-          .setLastAllocatedBlockId(blockIdManager.getLastAllocatedBlockId())
+          .setLastAllocatedBlockId(blockIdManager.getLastAllocatedContiguousBlockId())
+          .setLastAllocatedStripedBlockId(blockIdManager.getLastAllocatedStripedBlockId())
           .setTransactionId(context.getTxId());
 
       // We use the non-locked version of getNamespaceInfo here since

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
index f71cf0b..c204f55 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
@@ -125,12 +125,12 @@ public class FSImageSerialization {
     short blockReplication = in.readShort();
     long modificationTime = in.readLong();
     long preferredBlockSize = in.readLong();
-  
+
     int numBlocks = in.readInt();
     BlockInfo[] blocks = new BlockInfo[numBlocks];
     Block blk = new Block();
     int i = 0;
-    for (; i < numBlocks-1; i++) {
+    for (; i < numBlocks - 1; i++) {
       blk.readFields(in);
       blocks[i] = new BlockInfoContiguous(blk, blockReplication);
     }
@@ -138,8 +138,9 @@ public class FSImageSerialization {
     if(numBlocks > 0) {
       blk.readFields(in);
       blocks[i] = new BlockInfoContiguousUnderConstruction(
-        blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null);
+          blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null);
     }
+
     PermissionStatus perm = PermissionStatus.read(in);
     String clientName = readString(in);
     String clientMachine = readString(in);
@@ -180,9 +181,9 @@ public class FSImageSerialization {
 
   /**
    * Serialize a {@link INodeFile} node
-   * @param node The node to write
+   * @param file The node to write
    * @param out The {@link DataOutputStream} where the fields are written
-   * @param writeBlock Whether to write block information
+   * @param writeUnderConstruction Whether to write block information
    */
   public static void writeINodeFile(INodeFile file, DataOutput out,
       boolean writeUnderConstruction) throws IOException {
@@ -305,7 +306,7 @@ public class FSImageSerialization {
     if (!isWithName) {
       Preconditions.checkState(ref instanceof INodeReference.DstReference);
       // dst snapshot id
-      out.writeInt(((INodeReference.DstReference) ref).getDstSnapshotId());
+      out.writeInt(ref.getDstSnapshotId());
     } else {
       out.writeInt(((INodeReference.WithName) ref).getLastSnapshotId());
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 4cc3073..c0bfcc1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
@@ -58,9 +57,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROL
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CACHE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_INODE_ATTRIBUTES_PROVIDER_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
@@ -138,6 +137,7 @@ import org.apache.hadoop.crypto.CipherSuite;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
+import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
@@ -149,6 +149,7 @@ import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.InvalidPathException;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttrSetFlag;
@@ -157,10 +158,8 @@ import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
-import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.ServiceFailedException;
-import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
@@ -168,6 +167,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.UnknownCryptoProtocolVersionException;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
@@ -177,14 +177,17 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeException;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
@@ -200,6 +203,9 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
@@ -251,6 +257,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.RetryCache;
 import org.apache.hadoop.ipc.Server;
@@ -414,6 +421,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   private final BlockManager blockManager;
   private final SnapshotManager snapshotManager;
   private final CacheManager cacheManager;
+  private final ErasureCodingSchemaManager ecSchemaManager;
   private final DatanodeStatistics datanodeStatistics;
 
   private String nameserviceId;
@@ -590,6 +598,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     leaseManager.removeAllLeases();
     snapshotManager.clearSnapshottableDirs();
     cacheManager.clear();
+    ecSchemaManager.clear();
     setImageLoaded(false);
     blockManager.clear();
   }
@@ -827,6 +836,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       this.dir = new FSDirectory(this, conf);
       this.snapshotManager = new SnapshotManager(dir);
       this.cacheManager = new CacheManager(this, conf, blockManager);
+      this.ecSchemaManager = new ErasureCodingSchemaManager();
       this.safeMode = new SafeModeInfo(conf);
       this.topConf = new TopConf(conf);
       this.auditLoggers = initAuditLoggers(conf);
@@ -1755,8 +1765,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
     LocatedBlocks blocks = res.blocks;
     if (blocks != null) {
+      List<LocatedBlock> blkList = blocks.getLocatedBlocks();
+      if (blkList == null || blkList.size() == 0 ||
+          blkList.get(0) instanceof LocatedStripedBlock) {
+        // no need to sort locations for striped blocks
+        return blocks;
+      }
       blockManager.getDatanodeManager().sortLocatedBlocks(
-          clientMachine, blocks.getLocatedBlocks());
+          clientMachine, blkList);
 
       // lastBlock is not part of getLocatedBlocks(), might need to sort it too
       LocatedBlock lastBlock = blocks.getLastLocatedBlock();
@@ -2114,7 +2130,19 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     if (!DFSUtil.isValidName(src)) {
       throw new InvalidPathException(src);
     }
-    blockManager.verifyReplication(src, replication, clientMachine);
+
+    checkOperation(OperationCategory.READ);
+    readLock();
+    try {
+      checkOperation(OperationCategory.READ);
+      if (!FSDirErasureCodingOp.isInErasureCodingZone(this, src)) {
+        blockManager.verifyReplication(src, replication, clientMachine);
+      }
+    } finally {
+      readUnlock();
+    }
+    
+    checkOperation(OperationCategory.WRITE);
     if (blockSize < minBlockSize) {
       throw new IOException("Specified block size is less than configured" +
           " minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY
@@ -2455,6 +2483,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     final long preferredblocksize;
     final byte storagePolicyID;
     final List<DatanodeStorageInfo> chosen;
+    final boolean isStriped;
     checkOperation(OperationCategory.READ);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     FSPermissionChecker pc = getPermissionChecker();
@@ -2481,6 +2510,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       clientnode = blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
       preferredblocksize = file.getPreferredBlockSize();
       storagePolicyID = file.getStoragePolicyID();
+      isStriped = file.isStriped();
 
       //find datanode storages
       final DatanodeManager dm = blockManager.getDatanodeManager();
@@ -2496,7 +2526,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     // choose new datanodes.
     final DatanodeStorageInfo[] targets = blockManager.chooseTarget4AdditionalDatanode(
         src, numAdditionalNodes, clientnode, chosen, 
-        excludes, preferredblocksize, storagePolicyID);
+        excludes, preferredblocksize, storagePolicyID, isStriped);
     final LocatedBlock lb = BlockManager.newLocatedBlock(
         blk, targets, -1, false);
     blockManager.setBlockToken(lb, BlockTokenIdentifier.AccessMode.COPY);
@@ -2590,15 +2620,20 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       writeUnlock();
     }
     getEditLog().logSync();
+    if (success) {
+      NameNode.stateChangeLog.info("DIR* completeFile: " + src
+          + " is closed by " + holder);
+    }
     return success;
   }
 
   /**
    * Create new block with a unique block id and a new generation stamp.
+   * @param isStriped is the file under striping or contiguous layout?
    */
-  Block createNewBlock() throws IOException {
+  Block createNewBlock(boolean isStriped) throws IOException {
     assert hasWriteLock();
-    Block b = new Block(nextBlockId(), 0, 0);
+    Block b = new Block(nextBlockId(isStriped), 0, 0);
     // Increment the generation stamp for every new block.
     b.setGenerationStamp(nextGenerationStamp(false));
     return b;
@@ -2792,7 +2827,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       if (trackBlockCounts) {
         if (b.isComplete()) {
           numRemovedComplete++;
-          if (blockManager.checkMinReplication(b)) {
+          if (blockManager.hasMinStorage(b, b.numNodes())) {
             numRemovedSafe++;
           }
         }
@@ -3024,7 +3059,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       curBlock = blocks[nrCompleteBlocks];
       if(!curBlock.isComplete())
         break;
-      assert blockManager.checkMinReplication(curBlock) :
+      assert blockManager.hasMinStorage(curBlock) :
               "A COMPLETE block is not minimally replicated in " + src;
     }
 
@@ -3059,8 +3094,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     BlockInfo penultimateBlock = pendingFile.getPenultimateBlock();
 
     // If penultimate block doesn't exist then its minReplication is met
-    boolean penultimateBlockMinReplication = penultimateBlock == null ? true :
-        blockManager.checkMinReplication(penultimateBlock);
+    boolean penultimateBlockMinStorage = penultimateBlock == null ||
+        blockManager.hasMinStorage(penultimateBlock);
 
     switch(lastBlockState) {
     case COMPLETE:
@@ -3068,8 +3103,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       break;
     case COMMITTED:
       // Close file if committed blocks are minimally replicated
-      if(penultimateBlockMinReplication &&
-          blockManager.checkMinReplication(lastBlock)) {
+      if(penultimateBlockMinStorage &&
+          blockManager.hasMinStorage(lastBlock)) {
         finalizeINodeFileUnderConstruction(src, pendingFile,
             iip.getLatestSnapshotId());
         NameNode.stateChangeLog.warn("BLOCK*"
@@ -3089,16 +3124,19 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       throw new AlreadyBeingCreatedException(message);
     case UNDER_CONSTRUCTION:
     case UNDER_RECOVERY:
-      final BlockInfoContiguousUnderConstruction uc = (BlockInfoContiguousUnderConstruction)lastBlock;
+      // TODO support truncate of striped blocks
+      final BlockInfoUnderConstruction uc =
+          (BlockInfoUnderConstruction)lastBlock;
       // determine if last block was intended to be truncated
       Block recoveryBlock = uc.getTruncateBlock();
       boolean truncateRecovery = recoveryBlock != null;
       boolean copyOnTruncate = truncateRecovery &&
-          recoveryBlock.getBlockId() != uc.getBlockId();
+          recoveryBlock.getBlockId() != uc.toBlock().getBlockId();
       assert !copyOnTruncate ||
-          recoveryBlock.getBlockId() < uc.getBlockId() &&
-          recoveryBlock.getGenerationStamp() < uc.getGenerationStamp() &&
-          recoveryBlock.getNumBytes() > uc.getNumBytes() :
+          recoveryBlock.getBlockId() < uc.toBlock().getBlockId() &&
+          recoveryBlock.getGenerationStamp() < uc.toBlock().
+              getGenerationStamp() &&
+          recoveryBlock.getNumBytes() > uc.toBlock().getNumBytes() :
             "wrong recoveryBlock";
 
       // setup the last block locations from the blockManager if not known
@@ -3106,7 +3144,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         uc.setExpectedLocations(blockManager.getStorages(lastBlock));
       }
 
-      if (uc.getNumExpectedLocations() == 0 && uc.getNumBytes() == 0) {
+      if (uc.getNumExpectedLocations() == 0 &&
+          uc.toBlock().getNumBytes() == 0) {
         // There is no datanode reported to this block.
         // may be client have crashed before writing data to pipeline.
         // This blocks doesn't need any recovery.
@@ -3119,10 +3158,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         return true;
       }
       // start recovery of the last block for this file
-      long blockRecoveryId = nextGenerationStamp(blockIdManager.isLegacyBlock(uc));
+      long blockRecoveryId =
+          nextGenerationStamp(blockIdManager.isLegacyBlock(uc.toBlock()));
       lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
       if(copyOnTruncate) {
-        uc.setGenerationStamp(blockRecoveryId);
+        uc.toBlock().setGenerationStamp(blockRecoveryId);
       } else if(truncateRecovery) {
         recoveryBlock.setGenerationStamp(blockRecoveryId);
       }
@@ -3167,10 +3207,32 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
 
     // Adjust disk space consumption if required
-    final long diff = fileINode.getPreferredBlockSize() - commitBlock.getNumBytes();    
+    final long diff;
+    final short replicationFactor;
+    if (fileINode.isStriped()) {
+      final ErasureCodingZone ecZone = FSDirErasureCodingOp
+          .getErasureCodingZone(this, iip);
+      final ECSchema ecSchema = ecZone.getSchema();
+      final short numDataUnits = (short) ecSchema.getNumDataUnits();
+      final short numParityUnits = (short) ecSchema.getNumParityUnits();
+
+      final long numBlocks = numDataUnits + numParityUnits;
+      final long fullBlockGroupSize =
+          fileINode.getPreferredBlockSize() * numBlocks;
+
+      final BlockInfoStriped striped = new BlockInfoStriped(commitBlock,
+          ecSchema, ecZone.getCellSize());
+      final long actualBlockGroupSize = striped.spaceConsumed();
+
+      diff = fullBlockGroupSize - actualBlockGroupSize;
+      replicationFactor = (short) 1;
+    } else {
+      diff = fileINode.getPreferredBlockSize() - commitBlock.getNumBytes();
+      replicationFactor = fileINode.getFileReplication();
+    }
     if (diff > 0) {
       try {
-        dir.updateSpaceConsumed(iip, 0, -diff, fileINode.getFileReplication());
+        dir.updateSpaceConsumed(iip, 0, -diff, replicationFactor);
       } catch (IOException e) {
         LOG.warn("Unexpected exception while updating disk space.", e);
       }
@@ -3200,14 +3262,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
 
   @VisibleForTesting
-  BlockInfo getStoredBlock(Block block) {
+  public BlockInfo getStoredBlock(Block block) {
     return blockManager.getStoredBlock(block);
   }
   
   @Override
-  public boolean isInSnapshot(BlockInfoContiguousUnderConstruction blockUC) {
+  public boolean isInSnapshot(BlockCollection bc) {
     assert hasReadLock();
-    final BlockCollection bc = blockUC.getBlockCollection();
     if (bc == null || !(bc instanceof INodeFile)
         || !bc.isUnderConstruction()) {
       return false;
@@ -3252,7 +3313,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     waitForLoadingFSImage();
     writeLock();
     boolean copyTruncate = false;
-    BlockInfoContiguousUnderConstruction truncatedBlock = null;
+    BlockInfo truncatedBlock = null;
     try {
       checkOperation(OperationCategory.WRITE);
       // If a DN tries to commit to the standby, the recovery will
@@ -3309,9 +3370,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         return;
       }
 
-      truncatedBlock = (BlockInfoContiguousUnderConstruction) iFile
-          .getLastBlock();
-      long recoveryId = truncatedBlock.getBlockRecoveryId();
+      truncatedBlock = iFile.getLastBlock();
+      final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)truncatedBlock;
+      final long recoveryId = uc.getBlockRecoveryId();
       copyTruncate = truncatedBlock.getBlockId() != storedBlock.getBlockId();
       if(recoveryId != newgenerationstamp) {
         throw new IOException("The recovery id " + newgenerationstamp
@@ -3325,8 +3386,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         if (remove) {
           blockManager.removeBlock(storedBlock);
         }
-      }
-      else {
+      } else {
         // update last block
         if(!copyTruncate) {
           storedBlock.setGenerationStamp(newgenerationstamp);
@@ -3360,9 +3420,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
                 trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i));
             if (storageInfo != null) {
               if(copyTruncate) {
-                storageInfo.addBlock(truncatedBlock);
+                storageInfo.addBlock(truncatedBlock, truncatedBlock);
               } else {
-                storageInfo.addBlock(storedBlock);
+                storageInfo.addBlock(storedBlock, storedBlock);
               }
             }
           }
@@ -3374,12 +3434,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
                 trimmedTargets.toArray(new DatanodeID[trimmedTargets.size()]),
                 trimmedStorages.toArray(new String[trimmedStorages.size()]));
         if(copyTruncate) {
-          iFile.setLastBlock(truncatedBlock, trimmedStorageInfos);
+          iFile.convertLastBlockToUC(truncatedBlock, trimmedStorageInfos);
         } else {
-          iFile.setLastBlock(storedBlock, trimmedStorageInfos);
+          iFile.convertLastBlockToUC(storedBlock, trimmedStorageInfos);
           if (closeFile) {
-            blockManager.markBlockReplicasAsCorrupt(storedBlock,
-                oldGenerationStamp, oldNumBytes, trimmedStorageInfos);
+            blockManager.markBlockReplicasAsCorrupt(oldBlock.getLocalBlock(),
+                storedBlock, oldGenerationStamp, oldNumBytes,
+                trimmedStorageInfos);
           }
         }
       }
@@ -3387,7 +3448,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       if (closeFile) {
         if(copyTruncate) {
           src = closeFileCommitBlocks(iFile, truncatedBlock);
-          if(!iFile.isBlockInLatestSnapshot(storedBlock)) {
+          if(!iFile.isBlockInLatestSnapshot((BlockInfoContiguous) storedBlock)) {
             blockManager.removeBlock(storedBlock);
           }
         } else {
@@ -3714,7 +3775,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
         while (it.hasNext()) {
           Block b = it.next();
-          BlockInfo blockInfo = blockManager.getStoredBlock(b);
+          BlockInfo blockInfo = getStoredBlock(b);
           if (blockInfo.getBlockCollection().getStoragePolicyID()
               == lpPolicy.getId()) {
             filesToDelete.add(blockInfo.getBlockCollection());
@@ -4353,10 +4414,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     /**
      * Increment number of safe blocks if current block has 
      * reached minimal replication.
-     * @param replication current replication 
+     * @param storageNum current number of replicas or number of internal blocks
+     *                   of a striped block group
+     * @param storedBlock current storedBlock which is either a
+     *                    BlockInfoContiguous or a BlockInfoStriped
      */
-    private synchronized void incrementSafeBlockCount(short replication) {
-      if (replication == safeReplication) {
+    private synchronized void incrementSafeBlockCount(short storageNum,
+        BlockInfo storedBlock) {
+      final int safe = storedBlock.isStriped() ?
+          ((BlockInfoStriped) storedBlock).getRealDataBlockNum() : safeReplication;
+      if (storageNum == safe) {
         this.blockSafe++;
 
         // Report startup progress only if we haven't completed startup yet.
@@ -4649,12 +4716,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
 
   @Override
-  public void incrementSafeBlockCount(int replication) {
+  public void incrementSafeBlockCount(int storageNum, BlockInfo storedBlock) {
     // safeMode is volatile, and may be set to null at any time
     SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null)
       return;
-    safeMode.incrementSafeBlockCount((short)replication);
+    safeMode.incrementSafeBlockCount((short) storageNum, storedBlock);
   }
 
   @Override
@@ -5154,11 +5221,13 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   /**
    * Increments, logs and then returns the block ID
+   * @param isStriped is the file under striping or contiguous layout?
    */
-  private long nextBlockId() throws IOException {
+  private long nextBlockId(boolean isStriped) throws IOException {
     assert hasWriteLock();
     checkNameNodeSafeMode("Cannot get next block ID");
-    final long blockId = blockIdManager.nextBlockId();
+    final long blockId = isStriped ?
+        blockIdManager.nextStripedBlockId() : blockIdManager.nextContiguousBlockId();
     getEditLog().logAllocateBlockId(blockId);
     // NB: callers sync the log
     return blockId;
@@ -5262,29 +5331,32 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * Get a new generation stamp together with an access token for 
    * a block under construction
    * 
-   * This method is called for recovering a failed pipeline or setting up
-   * a pipeline to append to a block.
+   * This method is called for recovering a failed write or setting up
+   * a block for appended.
    * 
    * @param block a block
    * @param clientName the name of a client
    * @return a located block with a new generation stamp and an access token
    * @throws IOException if any error occurs
    */
-  LocatedBlock updateBlockForPipeline(ExtendedBlock block, 
+  LocatedBlock bumpBlockGenerationStamp(ExtendedBlock block,
       String clientName) throws IOException {
-    LocatedBlock locatedBlock;
+    final LocatedBlock locatedBlock;
     checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
       checkOperation(OperationCategory.WRITE);
 
       // check vadility of parameters
-      checkUCBlock(block, clientName);
+      final INodeFile file = checkUCBlock(block, clientName);
   
       // get a new generation stamp and an access token
       block.setGenerationStamp(nextGenerationStamp(blockIdManager.isLegacyBlock(block.getLocalBlock())));
-      locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
-      blockManager.setBlockToken(locatedBlock, BlockTokenIdentifier.AccessMode.WRITE);
+
+      locatedBlock = BlockManager.newLocatedBlock(
+          block, file.getLastBlock(), null, -1);
+      blockManager.setBlockToken(locatedBlock,
+          BlockTokenIdentifier.AccessMode.WRITE);
     } finally {
       writeUnlock();
     }
@@ -5336,21 +5408,27 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     assert hasWriteLock();
     // check the vadility of the block and lease holder name
     final INodeFile pendingFile = checkUCBlock(oldBlock, clientName);
-    final BlockInfoContiguousUnderConstruction blockinfo
-        = (BlockInfoContiguousUnderConstruction)pendingFile.getLastBlock();
+    final BlockInfo lastBlock = pendingFile.getLastBlock();
+    final BlockInfoUnderConstruction blockinfo = (BlockInfoUnderConstruction)lastBlock;
 
     // check new GS & length: this is not expected
-    if (newBlock.getGenerationStamp() <= blockinfo.getGenerationStamp() ||
-        newBlock.getNumBytes() < blockinfo.getNumBytes()) {
-      String msg = "Update " + oldBlock + " (len = " + 
-        blockinfo.getNumBytes() + ") to an older state: " + newBlock + 
-        " (len = " + newBlock.getNumBytes() +")";
+    if (newBlock.getGenerationStamp() <= lastBlock.getGenerationStamp()) {
+      final String msg = "Update " + oldBlock + " but the new block " + newBlock
+          + " does not have a larger generation stamp than the last block "
+          + lastBlock;
+      LOG.warn(msg);
+      throw new IOException(msg);
+    }
+    if (newBlock.getNumBytes() < lastBlock.getNumBytes()) {
+      final String msg = "Update " + oldBlock + " (size="
+          + oldBlock.getNumBytes() + ") to a smaller size block " + newBlock
+          + " (size=" + newBlock.getNumBytes() + ")";
       LOG.warn(msg);
       throw new IOException(msg);
     }
 
     // Update old block with the new generation stamp and new length
-    blockinfo.setNumBytes(newBlock.getNumBytes());
+    lastBlock.setNumBytes(newBlock.getNumBytes());
     blockinfo.setGenerationStampAndVerifyReplicas(newBlock.getGenerationStamp());
 
     // find the DatanodeDescriptor objects
@@ -6122,17 +6200,29 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   public FSDirectory getFSDirectory() {
     return dir;
   }
+
   /** Set the FSDirectory. */
   @VisibleForTesting
   public void setFSDirectory(FSDirectory dir) {
     this.dir = dir;
   }
+
   /** @return the cache manager. */
   @Override
   public CacheManager getCacheManager() {
     return cacheManager;
   }
 
+  /** @return the ErasureCodingSchemaManager. */
+  public ErasureCodingSchemaManager getErasureCodingSchemaManager() {
+    return ecSchemaManager;
+  }
+
+  /** @return the ErasureCodingZoneManager. */
+  public ErasureCodingZoneManager getErasureCodingZoneManager() {
+    return dir.ecZoneManager;
+  }
+
   @Override  // NameNodeMXBean
   public String getCorruptFiles() {
     List<String> list = new ArrayList<String>();
@@ -7078,6 +7168,85 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
   }
 
+  /**
+   * Create an erasure coding zone on directory src.
+   * @param srcArg  the path of a directory which will be the root of the
+   *                erasure coding zone. The directory must be empty.
+   * @param schema  ECSchema for the erasure coding zone
+   * @param cellSize Cell size of stripe 
+   * @throws AccessControlException  if the caller is not the superuser.
+   * @throws UnresolvedLinkException if the path can't be resolved.
+   * @throws SafeModeException       if the Namenode is in safe mode.
+   */
+  void createErasureCodingZone(final String srcArg, final ECSchema schema,
+      int cellSize, final boolean logRetryCache) throws IOException,
+      UnresolvedLinkException, SafeModeException, AccessControlException {
+    checkSuperuserPrivilege();
+    checkOperation(OperationCategory.WRITE);
+    HdfsFileStatus resultingStat = null;
+    boolean success = false;
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      checkNameNodeSafeMode("Cannot create erasure coding zone on " + srcArg);
+      resultingStat = FSDirErasureCodingOp.createErasureCodingZone(this,
+          srcArg, schema, cellSize, logRetryCache);
+      success = true;
+    } finally {
+      writeUnlock();
+      if (success) {
+        getEditLog().logSync();
+      }
+      logAuditEvent(success, "createErasureCodingZone", srcArg, null,
+          resultingStat);
+    }
+  }
+
+  /**
+   * Get the erasure coding zone information for specified path
+   */
+  ErasureCodingZone getErasureCodingZone(String src)
+      throws AccessControlException, UnresolvedLinkException, IOException {
+    checkOperation(OperationCategory.READ);
+    readLock();
+    try {
+      checkOperation(OperationCategory.READ);
+      return getErasureCodingZoneForPath(src);
+    } finally {
+      readUnlock();
+    }
+  }
+
+  /**
+   * Get available erasure coding schemas
+   */
+  ECSchema[] getErasureCodingSchemas() throws IOException {
+    checkOperation(OperationCategory.READ);
+    waitForLoadingFSImage();
+    readLock();
+    try {
+      checkOperation(OperationCategory.READ);
+      return FSDirErasureCodingOp.getErasureCodingSchemas(this);
+    } finally {
+      readUnlock();
+    }
+  }
+
+  /**
+   * Get the ECSchema specified by the name
+   */
+  ECSchema getErasureCodingSchema(String schemaName) throws IOException {
+    checkOperation(OperationCategory.READ);
+    waitForLoadingFSImage();
+    readLock();
+    try {
+      checkOperation(OperationCategory.READ);
+      return FSDirErasureCodingOp.getErasureCodingSchema(this, schemaName);
+    } finally {
+      readUnlock();
+    }
+  }
+
   void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
                 boolean logRetryCache)
       throws IOException {
@@ -7260,5 +7429,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     }
   }
 
+  @Override
+  public ErasureCodingZone getErasureCodingZoneForPath(String src)
+      throws IOException {
+    return FSDirErasureCodingOp.getErasureCodingZone(this, src);
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java
index d07ae1f..900839a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 
 /**
@@ -61,9 +60,9 @@ public class FileUnderConstructionFeature implements INode.Feature {
     BlockInfo lastBlock = f.getLastBlock();
     assert (lastBlock != null) : "The last block for path "
         + f.getFullPathName() + " is null when updating its length";
-    assert (lastBlock instanceof BlockInfoContiguousUnderConstruction)
+    assert !lastBlock.isComplete()
         : "The last block for path " + f.getFullPathName()
-            + " is not a BlockInfoUnderConstruction when updating its length";
+        + " is not a BlockInfoUnderConstruction when updating its length";
     lastBlock.setNumBytes(lastBlockLength);
   }
 
@@ -76,9 +75,8 @@ public class FileUnderConstructionFeature implements INode.Feature {
       final BlocksMapUpdateInfo collectedBlocks) {
     final BlockInfo[] blocks = f.getBlocks();
     if (blocks != null && blocks.length > 0
-        && blocks[blocks.length - 1] instanceof BlockInfoContiguousUnderConstruction) {
-      BlockInfoContiguousUnderConstruction lastUC =
-          (BlockInfoContiguousUnderConstruction) blocks[blocks.length - 1];
+        && !blocks[blocks.length - 1].isComplete()) {
+      BlockInfo lastUC = blocks[blocks.length - 1];
       if (lastUC.getNumBytes() == 0) {
         // this is a 0-sized block. do not need check its UC state here
         collectedBlocks.addDeleteBlock(lastUC);


Mime
View raw message