hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z..@apache.org
Subject [22/36] hadoop git commit: HDFS-7285. Erasure Coding Support inside HDFS.
Date Fri, 14 Aug 2015 17:55:34 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index 3f242e0..bfb8dcc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION;
 import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
 import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.NO_SNAPSHOT_ID;
 
@@ -34,13 +35,16 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiff;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiffList;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshotFeature;
@@ -79,12 +83,14 @@ public class INodeFile extends INodeWithAdditionalFields
 
   /** 
    * Bit format:
-   * [4-bit storagePolicyID][12-bit replication][48-bit preferredBlockSize]
+   * [4-bit storagePolicyID][1-bit isStriped]
+   * [11-bit replication][48-bit preferredBlockSize]
    */
   static enum HeaderFormat {
     PREFERRED_BLOCK_SIZE(null, 48, 1),
-    REPLICATION(PREFERRED_BLOCK_SIZE.BITS, 12, 1),
-    STORAGE_POLICY_ID(REPLICATION.BITS, BlockStoragePolicySuite.ID_BIT_LENGTH,
+    REPLICATION(PREFERRED_BLOCK_SIZE.BITS, 11, 0),
+    IS_STRIPED(REPLICATION.BITS, 1, 0),
+    STORAGE_POLICY_ID(IS_STRIPED.BITS, BlockStoragePolicySuite.ID_BIT_LENGTH,
         0);
 
     private final LongBitFormat BITS;
@@ -105,14 +111,27 @@ public class INodeFile extends INodeWithAdditionalFields
       return (byte)STORAGE_POLICY_ID.BITS.retrieve(header);
     }
 
+    static boolean isStriped(long header) {
+      long isStriped = IS_STRIPED.BITS.retrieve(header);
+      Preconditions.checkState(isStriped == 0 || isStriped == 1);
+      return isStriped == 1;
+    }
+
     static long toLong(long preferredBlockSize, short replication,
-        byte storagePolicyID) {
+        boolean isStriped, byte storagePolicyID) {
       long h = 0;
       if (preferredBlockSize == 0) {
         preferredBlockSize = PREFERRED_BLOCK_SIZE.BITS.getMin();
       }
       h = PREFERRED_BLOCK_SIZE.BITS.combine(preferredBlockSize, h);
-      h = REPLICATION.BITS.combine(replication, h);
+      // Replication factor for striped files is zero
+      if (isStriped) {
+        h = REPLICATION.BITS.combine(0L, h);
+        h = IS_STRIPED.BITS.combine(1L, h);
+      } else {
+        h = REPLICATION.BITS.combine(replication, h);
+        h = IS_STRIPED.BITS.combine(0L, h);
+      }
       h = STORAGE_POLICY_ID.BITS.combine(storagePolicyID, h);
       return h;
     }
@@ -127,15 +146,21 @@ public class INodeFile extends INodeWithAdditionalFields
             long atime, BlockInfo[] blklist, short replication,
             long preferredBlockSize) {
     this(id, name, permissions, mtime, atime, blklist, replication,
-         preferredBlockSize, (byte) 0);
+        preferredBlockSize, (byte) 0, false);
   }
 
   INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime,
       long atime, BlockInfo[] blklist, short replication,
-      long preferredBlockSize, byte storagePolicyID) {
+      long preferredBlockSize, byte storagePolicyID, boolean isStriped) {
     super(id, name, permissions, mtime, atime);
-    header = HeaderFormat.toLong(preferredBlockSize, replication, storagePolicyID);
-    this.blocks = blklist;
+    header = HeaderFormat.toLong(preferredBlockSize, replication, isStriped,
+        storagePolicyID);
+    if (blklist != null && blklist.length > 0) {
+      for (BlockInfo b : blklist) {
+        Preconditions.checkArgument(b.isStriped() == isStriped);
+      }
+    }
+    setBlocks(blklist);
   }
   
   public INodeFile(INodeFile that) {
@@ -227,31 +252,37 @@ public class INodeFile extends INodeWithAdditionalFields
 
   @Override // BlockCollection
   public void setBlock(int index, BlockInfo blk) {
+    Preconditions.checkArgument(blk.isStriped() == this.isStriped());
     this.blocks[index] = blk;
   }
 
   @Override // BlockCollection, the file should be under construction
-  public BlockInfoContiguousUnderConstruction setLastBlock(
-      BlockInfo lastBlock, DatanodeStorageInfo[] locations)
-      throws IOException {
+  public void convertLastBlockToUC(BlockInfo lastBlock,
+      DatanodeStorageInfo[] locations) throws IOException {
     Preconditions.checkState(isUnderConstruction(),
         "file is no longer under construction");
-
     if (numBlocks() == 0) {
       throw new IOException("Failed to set last block: File is empty.");
     }
-    BlockInfoContiguousUnderConstruction ucBlock =
-      lastBlock.convertToBlockUnderConstruction(
-          BlockUCState.UNDER_CONSTRUCTION, locations);
+
+    final BlockInfo ucBlock;
+    if (isStriped()) {
+      Preconditions.checkState(lastBlock.isStriped());
+      ucBlock = ((BlockInfoStriped) lastBlock)
+          .convertToBlockUnderConstruction(UNDER_CONSTRUCTION, locations);
+    } else {
+      Preconditions.checkState(!lastBlock.isStriped());
+      ucBlock = ((BlockInfoContiguous) lastBlock)
+          .convertToBlockUnderConstruction(UNDER_CONSTRUCTION, locations);
+    }
     setBlock(numBlocks() - 1, ucBlock);
-    return ucBlock;
   }
 
   /**
    * Remove a block from the block list. This block should be
    * the last one on the list.
    */
-  BlockInfoContiguousUnderConstruction removeLastBlock(Block oldblock) {
+  BlockInfoUnderConstruction removeLastBlock(Block oldblock) {
     Preconditions.checkState(isUnderConstruction(),
         "file is no longer under construction");
     if (blocks == null || blocks.length == 0) {
@@ -262,8 +293,8 @@ public class INodeFile extends INodeWithAdditionalFields
       return null;
     }
 
-    BlockInfoContiguousUnderConstruction uc =
-        (BlockInfoContiguousUnderConstruction)blocks[size_1];
+    BlockInfoUnderConstruction uc =
+        (BlockInfoUnderConstruction)blocks[size_1];
     //copy to a new list
     BlockInfo[] newlist = new BlockInfo[size_1];
     System.arraycopy(blocks, 0, newlist, 0, size_1);
@@ -350,6 +381,7 @@ public class INodeFile extends INodeWithAdditionalFields
 
   /** The same as getFileReplication(null). */
   @Override // INodeFileAttributes
+  // TODO properly handle striped files
   public final short getFileReplication() {
     return getFileReplication(CURRENT_STATE_ID);
   }
@@ -365,7 +397,8 @@ public class INodeFile extends INodeWithAdditionalFields
       }
       max = maxInSnapshot > max ? maxInSnapshot : max;
     }
-    return max;
+    return isStriped() ?
+        HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS : max;
   }
 
   /** Set the replication factor of this file. */
@@ -413,6 +446,15 @@ public class INodeFile extends INodeWithAdditionalFields
     setStoragePolicyID(storagePolicyId);
   }
 
+  /**
+   * @return true if the file is in the striping layout.
+   */
+  @VisibleForTesting
+  @Override
+  public boolean isStriped() {
+    return HeaderFormat.isStriped(header);
+  }
+
   @Override // INodeFileAttributes
   public long getHeaderLong() {
     return header;
@@ -429,7 +471,9 @@ public class INodeFile extends INodeWithAdditionalFields
     if(snapshot == CURRENT_STATE_ID || getDiffs() == null) {
       return getBlocks();
     }
+    // find blocks stored in snapshot diffs (for truncate)
     FileDiff diff = getDiffs().getDiffById(snapshot);
+    // note that currently FileDiff can only store contiguous blocks
     BlockInfo[] snapshotBlocks = diff == null ? getBlocks() : diff.getBlocks();
     if (snapshotBlocks != null) {
       return snapshotBlocks;
@@ -456,6 +500,7 @@ public class INodeFile extends INodeWithAdditionalFields
     int size = this.blocks.length;
     int totalAddedBlocks = 0;
     for(INodeFile f : inodes) {
+      Preconditions.checkState(f.isStriped() == this.isStriped());
       totalAddedBlocks += f.blocks.length;
     }
     
@@ -476,6 +521,7 @@ public class INodeFile extends INodeWithAdditionalFields
    * add a block to the block list
    */
   void addBlock(BlockInfo newblock) {
+    Preconditions.checkArgument(newblock.isStriped() == this.isStriped());
     if (this.blocks == null) {
       this.setBlocks(new BlockInfo[]{newblock});
     } else {
@@ -589,6 +635,10 @@ public class INodeFile extends INodeWithAdditionalFields
 
     final long ssDeltaNoReplication;
     short replication;
+    if (isStriped()) {
+      return computeQuotaUsageWithStriped(bsp, counts);
+    }
+
     if (last < lastSnapshotId) {
       ssDeltaNoReplication = computeFileSize(true, false);
       replication = getFileReplication();
@@ -611,6 +661,18 @@ public class INodeFile extends INodeWithAdditionalFields
     return counts;
   }
 
+  /**
+   * Compute quota of striped file. Note that currently EC files do not support
+   * append/hflush/hsync, thus the file length recorded in snapshots should be
+   * the same with the current file length.
+   */
+  public final QuotaCounts computeQuotaUsageWithStriped(
+      BlockStoragePolicy bsp, QuotaCounts counts) {
+    counts.addNameSpace(1);
+    counts.add(storagespaceConsumed(bsp));
+    return counts;
+  }
+
   @Override
   public final ContentSummaryComputationContext computeContentSummary(
       final ContentSummaryComputationContext summary) {
@@ -695,16 +757,20 @@ public class INodeFile extends INodeWithAdditionalFields
     }
     final int last = blocks.length - 1;
     //check if the last block is BlockInfoUnderConstruction
-    long size = blocks[last].getNumBytes();
-    if (blocks[last] instanceof BlockInfoContiguousUnderConstruction) {
+    BlockInfo lastBlk = blocks[last];
+    long size = lastBlk.getNumBytes();
+    if (lastBlk instanceof BlockInfoUnderConstruction) {
        if (!includesLastUcBlock) {
          size = 0;
        } else if (usePreferredBlockSize4LastUcBlock) {
-         size = getPreferredBlockSize();
+         size = isStriped()?
+             getPreferredBlockSize() *
+                 ((BlockInfoStriped)lastBlk).getDataBlockNum() :
+             getPreferredBlockSize();
        }
     }
     //sum other blocks
-    for(int i = 0; i < last; i++) {
+    for (int i = 0; i < last; i++) {
       size += blocks[i].getNumBytes();
     }
     return size;
@@ -716,6 +782,32 @@ public class INodeFile extends INodeWithAdditionalFields
    * Use preferred block size for the last block if it is under construction.
    */
   public final QuotaCounts storagespaceConsumed(BlockStoragePolicy bsp) {
+    if (isStriped()) {
+      return storagespaceConsumedStriped();
+    } else {
+      return storagespaceConsumedContiguous(bsp);
+    }
+  }
+
+  // TODO: support EC with heterogeneous storage
+  public final QuotaCounts storagespaceConsumedStriped() {
+    QuotaCounts counts = new QuotaCounts.Builder().build();
+    if (blocks == null || blocks.length == 0) {
+      return counts;
+    }
+
+    for (BlockInfo b : blocks) {
+      Preconditions.checkState(b.isStriped());
+      long blockSize = b.isComplete() ?
+          ((BlockInfoStriped)b).spaceConsumed() : getPreferredBlockSize() *
+          ((BlockInfoStriped)b).getTotalBlockNum();
+      counts.addStorageSpace(blockSize);
+    }
+    return  counts;
+  }
+
+  public final QuotaCounts storagespaceConsumedContiguous(
+      BlockStoragePolicy bsp) {
     QuotaCounts counts = new QuotaCounts.Builder().build();
     final Iterable<BlockInfo> blocks;
     FileWithSnapshotFeature sf = getFileWithSnapshotFeature();
@@ -819,6 +911,7 @@ public class INodeFile extends INodeWithAdditionalFields
   /**
    * compute the quota usage change for a truncate op
    * @param newLength the length for truncation
+   * TODO: properly handle striped blocks (HDFS-7622)
    **/
   void computeQuotaDeltaForTruncate(
       long newLength, BlockStoragePolicy bsps,
@@ -883,8 +976,15 @@ public class INodeFile extends INodeWithAdditionalFields
     setBlocks(newBlocks);
   }
 
+  /**
+   * This function is only called when block list is stored in snapshot
+   * diffs. Note that this can only happen when truncation happens with
+   * snapshots. Since we do not support truncation with striped blocks,
+   * we only need to handle contiguous blocks here.
+   */
   public void collectBlocksBeyondSnapshot(BlockInfo[] snapshotBlocks,
                                           BlocksMapUpdateInfo collectedBlocks) {
+    Preconditions.checkState(!isStriped());
     BlockInfo[] oldBlocks = getBlocks();
     if(snapshotBlocks == null || oldBlocks == null)
       return;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
index 204c8ac..13bd9e9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFileAttributes.java
@@ -29,6 +29,9 @@ public interface INodeFileAttributes extends INodeAttributes {
   /** @return the file replication. */
   public short getFileReplication();
 
+  /** @return whether the file is striped (instead of contiguous) */
+  public boolean isStriped();
+
   /** @return preferred block size in bytes */
   public long getPreferredBlockSize();
 
@@ -47,10 +50,10 @@ public interface INodeFileAttributes extends INodeAttributes {
     public SnapshotCopy(byte[] name, PermissionStatus permissions,
         AclFeature aclFeature, long modificationTime, long accessTime,
         short replication, long preferredBlockSize,
-        byte storagePolicyID, XAttrFeature xAttrsFeature) {
+        byte storagePolicyID, XAttrFeature xAttrsFeature, boolean isStriped) {
       super(name, permissions, aclFeature, modificationTime, accessTime, 
           xAttrsFeature);
-      header = HeaderFormat.toLong(preferredBlockSize, replication,
+      header = HeaderFormat.toLong(preferredBlockSize, replication, isStriped,
           storagePolicyID);
     }
 
@@ -70,6 +73,11 @@ public interface INodeFileAttributes extends INodeAttributes {
     }
 
     @Override
+    public boolean isStriped() {
+      return HeaderFormat.isStriped(header);
+    }
+
+    @Override
     public long getPreferredBlockSize() {
       return HeaderFormat.getPreferredBlockSize(header);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
index 1a1edaf..244dafe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
@@ -116,8 +116,8 @@ public class LeaseManager {
       for(BlockInfo b : blocks) {
         if(!b.isComplete())
           numUCBlocks++;
+        }
       }
-    }
     LOG.info("Number of blocks under construction: " + numUCBlocks);
     return numUCBlocks;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
index 3a5dc12..2943fc2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
@@ -87,7 +87,8 @@ public class NameNodeLayoutVersion {
     BLOCK_STORAGE_POLICY(-60, -60, "Block Storage policy"),
     TRUNCATE(-61, -61, "Truncate"),
     APPEND_NEW_BLOCK(-62, -61, "Support appending to new block"),
-    QUOTA_BY_STORAGE_TYPE(-63, -61, "Support quota for specific storage types");
+    QUOTA_BY_STORAGE_TYPE(-63, -61, "Support quota for specific storage types"),
+    ERASURE_CODING(-64, -61, "Support erasure coding");
 
     private final FeatureInfo info;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 6b7e8cf..bee06a5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSLimitException;
@@ -143,6 +144,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RetriableException;
@@ -794,7 +796,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   public LocatedBlock updateBlockForPipeline(ExtendedBlock block, String clientName)
       throws IOException {
     checkNNStartup();
-    return namesystem.updateBlockForPipeline(block, clientName);
+    return namesystem.bumpBlockGenerationStamp(block, clientName);
   }
 
 
@@ -1847,6 +1849,24 @@ class NameNodeRpcServer implements NamenodeProtocols {
   }
 
   @Override // ClientProtocol
+  public void createErasureCodingZone(String src, ECSchema schema, int cellSize)
+      throws IOException {
+    checkNNStartup();
+    final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return;
+    }
+    boolean success = false;
+    try {
+      namesystem.createErasureCodingZone(src, schema, cellSize,
+          cacheEntry != null);
+      success = true;
+    } finally {
+      RetryCache.setState(cacheEntry, success);
+    }
+  }
+
+  @Override // ClientProtocol
   public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
       throws IOException {
     checkNNStartup();
@@ -2041,4 +2061,16 @@ class NameNodeRpcServer implements NamenodeProtocols {
     namesystem.checkSuperuserPrivilege();
     nn.spanReceiverHost.removeSpanReceiver(id);
   }
+
+  @Override // ClientProtocol
+  public ECSchema[] getECSchemas() throws IOException {
+    checkNNStartup();
+    return namesystem.getErasureCodingSchemas();
+  }
+
+  @Override // ClientProtocol
+  public ErasureCodingZone getErasureCodingZone(String src) throws IOException {
+    checkNNStartup();
+    return namesystem.getErasureCodingZone(src);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
index 7d4cd7e..21062e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
@@ -65,8 +65,9 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicies;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
@@ -123,6 +124,9 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
   private final int totalDatanodes;
   private final InetAddress remoteAddress;
 
+  private long totalDirs = 0L;
+  private long totalSymlinks = 0L;
+
   private String lostFound = null;
   private boolean lfInited = false;
   private boolean lfInitedOk = false;
@@ -170,7 +174,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
   private final PrintWriter out;
   private List<String> snapshottableDirs = null;
 
-  private final BlockPlacementPolicy bpPolicy;
+  private final BlockPlacementPolicies bpPolicies;
   private StoragePolicySummary storageTypeSummary = null;
 
   /**
@@ -192,7 +196,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
     this.out = out;
     this.totalDatanodes = totalDatanodes;
     this.remoteAddress = remoteAddress;
-    this.bpPolicy = BlockPlacementPolicy.getInstance(conf, null,
+    this.bpPolicies = new BlockPlacementPolicies(conf, null,
         networktopology,
         namenode.getNamesystem().getBlockManager().getDatanodeManager()
         .getHost2DatanodeMap());
@@ -255,7 +259,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
       out.println("Block Id: " + blockId);
       out.println("Block belongs to: "+iNode.getFullPathName());
       out.println("No. of Expected Replica: " +
-          bc.getPreferredBlockReplication());
+          bm.getExpectedReplicaNum(bc, blockInfo));
       out.println("No. of live Replica: " + numberReplicas.liveReplicas());
       out.println("No. of excess Replica: " + numberReplicas.excessReplicas());
       out.println("No. of stale Replica: " +
@@ -356,13 +360,21 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
               namenode.getNamesystem().getBlockManager().getStoragePolicies());
         }
 
-        Result res = new Result(conf);
+        Result replRes = new ReplicationResult(conf);
+        Result ecRes = new ErasureCodingResult(conf);
 
-        check(path, file, res);
+        check(path, file, replRes, ecRes);
 
-        out.println(res);
-        out.println(" Number of data-nodes:\t\t" + totalDatanodes);
+        out.print("\nStatus: ");
+        out.println(replRes.isHealthy() && ecRes.isHealthy() ? "HEALTHY" : "CORRUPT");
+        out.println(" Number of data-nodes:\t" + totalDatanodes);
         out.println(" Number of racks:\t\t" + networktopology.getNumOfRacks());
+        out.println(" Total dirs:\t\t\t" + totalDirs);
+        out.println(" Total symlinks:\t\t" + totalSymlinks);
+        out.println("\nReplicated Blocks:");
+        out.println(replRes);
+        out.println("\nErasure Coded Block Groups:");
+        out.println(ecRes);
 
         if (this.showStoragePolcies) {
           out.print(storageTypeSummary.toString());
@@ -382,7 +394,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
         // of file system and return appropriate code. Changing the output
         // string might break testcases. Also note this must be the last line
         // of the report.
-        if (res.isHealthy()) {
+        if (replRes.isHealthy() && ecRes.isHealthy()) {
           out.print("\n\nThe filesystem under path '" + path + "' " + HEALTHY_STATUS);
         } else {
           out.print("\n\nThe filesystem under path '" + path + "' " + CORRUPT_STATUS);
@@ -425,42 +437,49 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
   }
 
   @VisibleForTesting
-  void check(String parent, HdfsFileStatus file, Result res) throws IOException {
+  void check(String parent, HdfsFileStatus file, Result replRes, Result ecRes)
+      throws IOException {
     String path = file.getFullName(parent);
     if (file.isDir()) {
-      checkDir(path, res);
+      checkDir(path, replRes, ecRes);
       return;
     }
     if (file.isSymlink()) {
       if (showFiles) {
         out.println(path + " <symlink>");
       }
-      res.totalSymlinks++;
+      totalSymlinks++;
       return;
     }
     LocatedBlocks blocks = getBlockLocations(path, file);
     if (blocks == null) { // the file is deleted
       return;
     }
-    collectFileSummary(path, file, res, blocks);
-    collectBlocksSummary(parent, file, res, blocks);
+
+    final Result r = file.getReplication() == 0? ecRes: replRes;
+    collectFileSummary(path, file, r, blocks);
+    if (showprogress && (replRes.totalFiles + ecRes.totalFiles) % 100 == 0) {
+      out.println();
+      out.flush();
+    }
+    collectBlocksSummary(parent, file, r, blocks);
   }
 
-  private void checkDir(String path, Result res) throws IOException {
+  private void checkDir(String path, Result replRes, Result ecRes) throws IOException {
     if (snapshottableDirs != null && snapshottableDirs.contains(path)) {
       String snapshotPath = (path.endsWith(Path.SEPARATOR) ? path : path
           + Path.SEPARATOR)
           + HdfsConstants.DOT_SNAPSHOT_DIR;
       HdfsFileStatus snapshotFileInfo = namenode.getRpcServer().getFileInfo(
           snapshotPath);
-      check(snapshotPath, snapshotFileInfo, res);
+      check(snapshotPath, snapshotFileInfo, replRes, ecRes);
     }
     byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;
     DirectoryListing thisListing;
     if (showFiles) {
       out.println(path + " <dir>");
     }
-    res.totalDirs++;
+    totalDirs++;
     do {
       assert lastReturnedName != null;
       thisListing = namenode.getRpcServer().getListing(
@@ -470,7 +489,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
       }
       HdfsFileStatus[] files = thisListing.getPartialListing();
       for (int i = 0; i < files.length; i++) {
-        check(path, files[i], res);
+        check(path, files[i], replRes, ecRes);
       }
       lastReturnedName = thisListing.getLastName();
     } while (thisListing.hasMore());
@@ -518,10 +537,6 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
     } else if (showprogress) {
       out.print('.');
     }
-    if ((showprogress) && res.totalFiles % 100 == 0) {
-      out.println();
-      out.flush();
-    }
   }
 
   private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res,
@@ -542,9 +557,10 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
 
       final BlockInfo storedBlock = bm.getStoredBlock(
           block.getLocalBlock());
+      final int minReplication = bm.getMinStorageNum(storedBlock);
       // count decommissionedReplicas / decommissioningReplicas
       NumberReplicas numberReplicas = bm.countNodes(storedBlock);
-      int decommissionedReplicas = numberReplicas.decommissioned();;
+      int decommissionedReplicas = numberReplicas.decommissioned();
       int decommissioningReplicas = numberReplicas.decommissioning();
       res.decommissionedReplicas +=  decommissionedReplicas;
       res.decommissioningReplicas += decommissioningReplicas;
@@ -556,11 +572,18 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
       res.totalReplicas += totalReplicasPerBlock;
 
       // count expected replicas
-      short targetFileReplication = file.getReplication();
+      short targetFileReplication;
+      if (file.getECSchema() != null) {
+        assert storedBlock instanceof BlockInfoStriped;
+        targetFileReplication = ((BlockInfoStriped) storedBlock)
+            .getRealTotalBlockNum();
+      } else {
+        targetFileReplication = file.getReplication();
+      }
       res.numExpectedReplicas += targetFileReplication;
 
       // count under min repl'd blocks
-      if(totalReplicasPerBlock < res.minReplication){
+      if(totalReplicasPerBlock < minReplication){
         res.numUnderMinReplicatedBlocks++;
       }
 
@@ -581,7 +604,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
       }
 
       // count minimally replicated blocks
-      if (totalReplicasPerBlock >= res.minReplication)
+      if (totalReplicasPerBlock >= minReplication)
         res.numMinReplicatedBlocks++;
 
       // count missing replicas / under replicated blocks
@@ -601,7 +624,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
       }
 
       // count mis replicated blocks
-      BlockPlacementStatus blockPlacementStatus = bpPolicy
+      BlockPlacementStatus blockPlacementStatus = bpPolicies.getPolicy(false)
           .verifyBlockPlacement(path, lBlk, targetFileReplication);
       if (!blockPlacementStatus.isPlacementPolicySatisfied()) {
         res.numMisReplicatedBlocks++;
@@ -636,9 +659,9 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
         report.append(" Live_repl=" + liveReplicas);
         if (showLocations || showRacks || showReplicaDetails) {
           StringBuilder sb = new StringBuilder("[");
-          Iterable<DatanodeStorageInfo> storages = bm.getStorages(block.getLocalBlock());
-          for (Iterator<DatanodeStorageInfo> iterator = storages.iterator(); iterator.hasNext();) {
-            DatanodeStorageInfo storage = iterator.next();
+          DatanodeStorageInfo[] storages = bm.getStorages(storedBlock);
+          for (int i = 0; i < storages.length; i++) {
+            DatanodeStorageInfo storage = storages[i];
             DatanodeDescriptor dnDesc = storage.getDatanodeDescriptor();
             if (showRacks) {
               sb.append(NodeBase.getPath(dnDesc));
@@ -647,7 +670,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
                   .getStorageType()));
             }
             if (showReplicaDetails) {
-              LightWeightLinkedSet<Block> blocksExcess =
+              LightWeightLinkedSet<BlockInfo> blocksExcess =
                   bm.excessReplicateMap.get(dnDesc.getDatanodeUuid());
               Collection<DatanodeDescriptor> corruptReplicas =
                   bm.getCorruptReplicas(block.getLocalBlock());
@@ -668,7 +691,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
                 sb.append("LIVE)");
               }
             }
-            if (iterator.hasNext()) {
+            if (i < storages.length - 1) {
               sb.append(", ");
             }
           }
@@ -982,7 +1005,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
     long missingReplicas = 0L;
     long decommissionedReplicas = 0L;
     long decommissioningReplicas = 0L;
-    long numUnderMinReplicatedBlocks=0L;
+    long numUnderMinReplicatedBlocks = 0L;
     long numOverReplicatedBlocks = 0L;
     long numUnderReplicatedBlocks = 0L;
     long numMisReplicatedBlocks = 0L;  // blocks that do not satisfy block placement policy
@@ -992,22 +1015,10 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
     long totalOpenFilesBlocks = 0L;
     long totalFiles = 0L;
     long totalOpenFiles = 0L;
-    long totalDirs = 0L;
-    long totalSymlinks = 0L;
     long totalSize = 0L;
     long totalOpenFilesSize = 0L;
     long totalReplicas = 0L;
 
-    final short replication;
-    final int minReplication;
-
-    Result(Configuration conf) {
-      this.replication = (short)conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
-                                            DFSConfigKeys.DFS_REPLICATION_DEFAULT);
-      this.minReplication = (short)conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
-                                            DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
-    }
-
     /**
      * DFS is considered healthy if there are no missing blocks.
      */
@@ -1033,19 +1044,29 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
         return 0.0f;
       return (float) (totalReplicas) / (float) totalBlocks;
     }
+  }
+
+  @VisibleForTesting
+  static class ReplicationResult extends Result {
+    final short replication;
+    final short minReplication;
+
+    ReplicationResult(Configuration conf) {
+      this.replication = (short)conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
+                                            DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+      this.minReplication = (short)conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
+                                            DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
+    }
 
     @Override
     public String toString() {
       StringBuilder res = new StringBuilder();
-      res.append("Status: ").append((isHealthy() ? "HEALTHY" : "CORRUPT"))
-          .append("\n Total size:\t").append(totalSize).append(" B");
+      res.append(" Total size:\t").append(totalSize).append(" B");
       if (totalOpenFilesSize != 0) {
         res.append(" (Total open files size: ").append(totalOpenFilesSize)
             .append(" B)");
       }
-      res.append("\n Total dirs:\t").append(totalDirs).append(
-          "\n Total files:\t").append(totalFiles);
-      res.append("\n Total symlinks:\t\t").append(totalSymlinks);
+      res.append("\n Total files:\t").append(totalFiles);
       if (totalOpenFiles != 0) {
         res.append(" (Files currently being written: ").append(totalOpenFiles)
             .append(")");
@@ -1135,4 +1156,110 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
       return res.toString();
     }
   }
+
+  @VisibleForTesting
+  static class ErasureCodingResult extends Result {
+    final String defaultSchema;
+
+    ErasureCodingResult(Configuration conf) {
+      defaultSchema = ErasureCodingSchemaManager.getSystemDefaultSchema()
+          .getSchemaName();
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder res = new StringBuilder();
+      res.append(" Total size:\t").append(totalSize).append(" B");
+      if (totalOpenFilesSize != 0) {
+        res.append(" (Total open files size: ").append(totalOpenFilesSize)
+            .append(" B)");
+      }
+      res.append("\n Total files:\t").append(totalFiles);
+      if (totalOpenFiles != 0) {
+        res.append(" (Files currently being written: ").append(totalOpenFiles)
+            .append(")");
+      }
+      res.append("\n Total block groups (validated):\t").append(totalBlocks);
+      if (totalBlocks > 0) {
+        res.append(" (avg. block group size ").append((totalSize / totalBlocks))
+            .append(" B)");
+      }
+      if (totalOpenFilesBlocks != 0) {
+        res.append(" (Total open file block groups (not validated): ").append(
+            totalOpenFilesBlocks).append(")");
+      }
+      if (corruptFiles > 0 || numUnderMinReplicatedBlocks > 0) {
+        res.append("\n  ********************************");
+        if(numUnderMinReplicatedBlocks>0){
+          res.append("\n  UNRECOVERABLE BLOCK GROUPS:\t").append(numUnderMinReplicatedBlocks);
+          if(totalBlocks>0){
+            res.append(" (").append(
+                ((float) (numUnderMinReplicatedBlocks * 100) / (float) totalBlocks))
+                .append(" %)");
+          }
+        }
+        if(corruptFiles>0) {
+          res.append(
+              "\n  CORRUPT FILES:\t").append(corruptFiles);
+          if (missingSize > 0) {
+            res.append("\n  MISSING BLOCK GROUPS:\t").append(missingIds.size()).append(
+                "\n  MISSING SIZE:\t\t").append(missingSize).append(" B");
+          }
+          if (corruptBlocks > 0) {
+            res.append("\n  CORRUPT BLOCK GROUPS: \t").append(corruptBlocks).append(
+                "\n  CORRUPT SIZE:\t\t").append(corruptSize).append(" B");
+          }
+        }
+        res.append("\n  ********************************");
+      }
+      res.append("\n Minimally erasure-coded block groups:\t").append(
+          numMinReplicatedBlocks);
+      if (totalBlocks > 0) {
+        res.append(" (").append(
+            ((float) (numMinReplicatedBlocks * 100) / (float) totalBlocks))
+            .append(" %)");
+      }
+      res.append("\n Over-erasure-coded block groups:\t")
+          .append(numOverReplicatedBlocks);
+      if (totalBlocks > 0) {
+        res.append(" (").append(
+            ((float) (numOverReplicatedBlocks * 100) / (float) totalBlocks))
+            .append(" %)");
+      }
+      res.append("\n Under-erasure-coded block groups:\t").append(
+          numUnderReplicatedBlocks);
+      if (totalBlocks > 0) {
+        res.append(" (").append(
+            ((float) (numUnderReplicatedBlocks * 100) / (float) totalBlocks))
+            .append(" %)");
+      }
+      res.append("\n Unsatisfactory placement block groups:\t")
+          .append(numMisReplicatedBlocks);
+      if (totalBlocks > 0) {
+        res.append(" (").append(
+            ((float) (numMisReplicatedBlocks * 100) / (float) totalBlocks))
+            .append(" %)");
+      }
+      res.append("\n Default schema:\t\t").append(defaultSchema)
+          .append("\n Average block group size:\t").append(
+          getReplicationFactor()).append("\n Missing block groups:\t\t").append(
+          missingIds.size()).append("\n Corrupt block groups:\t\t").append(
+          corruptBlocks).append("\n Missing internal blocks:\t").append(
+          missingReplicas);
+      if (totalReplicas > 0) {
+        res.append(" (").append(
+            ((float) (missingReplicas * 100) / (float) numExpectedReplicas)).append(
+            " %)");
+      }
+      if (decommissionedReplicas > 0) {
+        res.append("\n Decommissioned internal blocks:\t").append(
+            decommissionedReplicas);
+      }
+      if (decommissioningReplicas > 0) {
+        res.append("\n Decommissioning internal blocks:\t").append(
+            decommissioningReplicas);
+      }
+      return res.toString();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
index a5053bc..430853a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
@@ -17,8 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.IOException;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.util.RwLock;
@@ -45,7 +49,16 @@ public interface Namesystem extends RwLock, SafeMode {
 
   void checkOperation(OperationCategory read) throws StandbyException;
 
-  boolean isInSnapshot(BlockInfoContiguousUnderConstruction blockUC);
+  boolean isInSnapshot(BlockCollection bc);
 
   CacheManager getCacheManager();
+
+  /**
+   * Gets the ECZone for path
+   * @param src the filesystem path
+   * @return {@link ErasureCodingZone}
+   * @throws IOException
+   */
+  ErasureCodingZone getErasureCodingZoneForPath(String src)
+      throws IOException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
index 1428482..252844c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java
@@ -44,9 +44,10 @@ public interface SafeMode {
     
   /**
    * Increment number of blocks that reached minimal replication.
-   * @param replication current replication 
+   * @param replication current replication
+   * @param storedBlock current stored Block
    */
-  public void incrementSafeBlockCount(int replication);
+  public void incrementSafeBlockCount(int replication, BlockInfo storedBlock);
 
   /** Decrement number of blocks that reached minimal replication. */
   public void decrementSafeBlockCount(BlockInfo b);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
index 64ad1f6..e684b41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java
@@ -234,19 +234,23 @@ public class FSImageFormatPBSnapshot {
               .toByteArray(), permission, acl, fileInPb.getModificationTime(),
               fileInPb.getAccessTime(), (short) fileInPb.getReplication(),
               fileInPb.getPreferredBlockSize(),
-              (byte)fileInPb.getStoragePolicyID(), xAttrs);
+              (byte)fileInPb.getStoragePolicyID(), xAttrs,
+              fileInPb.getIsStriped());
         }
 
         FileDiff diff = new FileDiff(pbf.getSnapshotId(), copy, null,
             pbf.getFileSize());
         List<BlockProto> bpl = pbf.getBlocksList();
-        BlockInfo[] blocks = new BlockInfo[bpl.size()];
+        // in file diff there can only be contiguous blocks
+        BlockInfoContiguous[] blocks = new BlockInfoContiguous[bpl.size()];
         for(int j = 0, e = bpl.size(); j < e; ++j) {
           Block blk = PBHelper.convert(bpl.get(j));
-          BlockInfo storedBlock =  fsn.getBlockManager().getStoredBlock(blk);
+          BlockInfoContiguous storedBlock =
+              (BlockInfoContiguous) fsn.getBlockManager().getStoredBlock(blk);
           if(storedBlock == null) {
-            storedBlock = fsn.getBlockManager().addBlockCollection(
-                new BlockInfoContiguous(blk, copy.getFileReplication()), file);
+            storedBlock = (BlockInfoContiguous) fsn.getBlockManager()
+                .addBlockCollectionWithCheck(new BlockInfoContiguous(blk,
+                    copy.getFileReplication()), file);
           }
           blocks[j] = storedBlock;
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java
index 6b8388e..0a64a38 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileDiffList.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
@@ -55,7 +56,9 @@ public class FileDiffList extends
     final FileDiff diff =
         super.saveSelf2Snapshot(latestSnapshotId, iNodeFile, snapshotCopy);
     if (withBlocks) {  // Store blocks if this is the first update
-      diff.setBlocks(iNodeFile.getBlocks());
+      BlockInfo[] blks = iNodeFile.getBlocks();
+      assert blks != null;
+      diff.setBlocks(blks);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java
new file mode 100644
index 0000000..56a1546
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import com.google.common.base.Joiner;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * A BlockECRecoveryCommand is an instruction to a DataNode to reconstruct a
+ * striped block group with missing blocks.
+ *
+ * Upon receiving this command, the DataNode pulls data from other DataNodes
+ * hosting blocks in this group and reconstructs the lost blocks through codec
+ * calculation.
+ *
+ * After the reconstruction, the DataNode pushes the reconstructed blocks to
+ * their final destinations if necessary (e.g., the destination is different
+ * from the reconstruction node, or multiple blocks in a group are to be
+ * reconstructed).
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockECRecoveryCommand extends DatanodeCommand {
+  final Collection<BlockECRecoveryInfo> ecTasks;
+
+  /**
+   * Create BlockECRecoveryCommand from a collection of
+   * {@link BlockECRecoveryInfo}, each representing a recovery task
+   */
+  public BlockECRecoveryCommand(int action,
+      Collection<BlockECRecoveryInfo> blockECRecoveryInfoList) {
+    super(action);
+    this.ecTasks = blockECRecoveryInfoList;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("BlockECRecoveryCommand(\n  ");
+    Joiner.on("\n  ").appendTo(sb, ecTasks);
+    sb.append("\n)");
+    return sb.toString();
+  }
+
+  /** Block and targets pair */
+  @InterfaceAudience.Private
+  @InterfaceStability.Evolving
+  public static class BlockECRecoveryInfo {
+    private final ExtendedBlock block;
+    private final DatanodeInfo[] sources;
+    private DatanodeInfo[] targets;
+    private String[] targetStorageIDs;
+    private StorageType[] targetStorageTypes;
+    private final short[] liveBlockIndices;
+    private final ECSchema ecSchema;
+    private final int cellSize;
+
+    public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources,
+        DatanodeStorageInfo[] targetDnStorageInfo, short[] liveBlockIndices,
+        ECSchema ecSchema, int cellSize) {
+      this(block, sources, DatanodeStorageInfo
+          .toDatanodeInfos(targetDnStorageInfo), DatanodeStorageInfo
+          .toStorageIDs(targetDnStorageInfo), DatanodeStorageInfo
+          .toStorageTypes(targetDnStorageInfo), liveBlockIndices, ecSchema,
+          cellSize);
+    }
+
+    public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources,
+        DatanodeInfo[] targets, String[] targetStorageIDs,
+        StorageType[] targetStorageTypes, short[] liveBlockIndices,
+        ECSchema ecSchema, int cellSize) {
+      this.block = block;
+      this.sources = sources;
+      this.targets = targets;
+      this.targetStorageIDs = targetStorageIDs;
+      this.targetStorageTypes = targetStorageTypes;
+      this.liveBlockIndices = liveBlockIndices;
+      this.ecSchema = ecSchema;
+      this.cellSize = cellSize;
+    }
+
+    public ExtendedBlock getExtendedBlock() {
+      return block;
+    }
+
+    public DatanodeInfo[] getSourceDnInfos() {
+      return sources;
+    }
+
+    public DatanodeInfo[] getTargetDnInfos() {
+      return targets;
+    }
+
+    public String[] getTargetStorageIDs() {
+      return targetStorageIDs;
+    }
+    
+    public StorageType[] getTargetStorageTypes() {
+      return targetStorageTypes;
+    }
+
+    public short[] getLiveBlockIndices() {
+      return liveBlockIndices;
+    }
+    
+    public ECSchema getECSchema() {
+      return ecSchema;
+    }
+
+    public int getCellSize() {
+      return cellSize;
+    }
+
+    @Override
+    public String toString() {
+      return new StringBuilder().append("BlockECRecoveryInfo(\n  ")
+          .append("Recovering ").append(block).append(" From: ")
+          .append(Arrays.asList(sources)).append(" To: [")
+          .append(Arrays.asList(targets)).append(")\n")
+          .append(" Block Indices: ").append(Arrays.asList(liveBlockIndices))
+          .toString();
+    }
+  }
+
+  public Collection<BlockECRecoveryInfo> getECTasks() {
+    return this.ecTasks;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java
index a985dbd..0507faf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.protocol;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.StorageType;
@@ -91,6 +92,30 @@ public class BlocksWithLocations {
     }
   }
 
+  public static class StripedBlockWithLocations extends BlockWithLocations {
+    final byte[] indices;
+    final short dataBlockNum;
+
+    public StripedBlockWithLocations(BlockWithLocations blk, byte[] indices,
+         short dataBlockNum) {
+      super(blk.getBlock(), blk.getDatanodeUuids(), blk.getStorageIDs(),
+          blk.getStorageTypes());
+      Preconditions.checkArgument(
+          blk.getDatanodeUuids().length == indices.length);
+      this.indices = indices;
+      this.dataBlockNum = dataBlockNum;
+
+    }
+
+    public byte[] getIndices() {
+      return indices;
+    }
+
+    public short getDataBlockNum() {
+      return dataBlockNum;
+    }
+  }
+
   private final BlockWithLocations[] blocks;
 
   /** Constructor with one parameter */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index dfe0813..add4e73 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -76,6 +76,7 @@ public interface DatanodeProtocol {
   final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth
   final static int DNA_CACHE = 9;      // cache blocks
   final static int DNA_UNCACHE = 10;   // uncache blocks
+  final static int DNA_ERASURE_CODING_RECOVERY = 11; // erasure coding recovery command
 
   /** 
    * Register Datanode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/erasurecode/ECCli.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/erasurecode/ECCli.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/erasurecode/ECCli.java
new file mode 100644
index 0000000..4ed9d0a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/erasurecode/ECCli.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdfs.tools.erasurecode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.shell.CommandFactory;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * CLI for the erasure code encoding operations.
+ */
+public class ECCli extends FsShell {
+
+  private final static String usagePrefix =
+      "Usage: hdfs erasurecode [generic options]";
+
+  @Override
+  protected String getUsagePrefix() {
+    return usagePrefix;
+  }
+
+  @Override
+  protected void registerCommands(CommandFactory factory) {
+    factory.registerCommands(ECCommand.class);
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    int res = ToolRunner.run(conf, new ECCli(), args);
+    System.exit(res);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/erasurecode/ECCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/erasurecode/ECCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/erasurecode/ECCommand.java
new file mode 100644
index 0000000..03026d8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/erasurecode/ECCommand.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdfs.tools.erasurecode;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.shell.Command;
+import org.apache.hadoop.fs.shell.CommandFactory;
+import org.apache.hadoop.fs.shell.PathData;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Erasure Coding CLI commands
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class ECCommand extends Command {
+
+  public static void registerCommands(CommandFactory factory) {
+    // Register all commands of Erasure CLI, with a '-' at the beginning in name
+    // of the command.
+    factory.addClass(CreateECZoneCommand.class, "-" + CreateECZoneCommand.NAME);
+    factory.addClass(GetECZoneCommand.class, "-"
+        + GetECZoneCommand.NAME);
+    factory.addClass(ListECSchemas.class, "-" + ListECSchemas.NAME);
+  }
+
+  @Override
+  public String getCommandName() {
+    return getName();
+  }
+
+  @Override
+  protected void run(Path path) throws IOException {
+    throw new RuntimeException("Not suppose to get here");
+  }
+
+  @Deprecated
+  @Override
+  public int runAll() {
+    return run(args);
+  }
+
+  @Override
+  protected void processPath(PathData item) throws IOException {
+    if (!(item.fs instanceof DistributedFileSystem)) {
+      throw new UnsupportedActionException(
+          "Erasure commands are only supported for the HDFS paths");
+    }
+  }
+
+  /**
+   * Create EC encoding zone command. Zones are created to use specific EC
+   * encoding schema, other than default while encoding the files under some
+   * specific directory.
+   */
+  static class CreateECZoneCommand extends ECCommand {
+    public static final String NAME = "createZone";
+    public static final String USAGE = "[-s <schemaName>] [-c <cellSize>] <path>";
+    public static final String DESCRIPTION = 
+        "Create a zone to encode files using a specified schema\n"
+        + "Options :\n"
+        + "  -s <schemaName> : EC schema name to encode files. "
+        + "If not passed default schema will be used\n"
+        + "  -c <cellSize> : cell size to use for striped encoding files."
+        + " If not passed default cellsize of "
+        + HdfsConstants.BLOCK_STRIPED_CELL_SIZE + " will be used\n"
+        + "  <path>  : Path to an empty directory. Under this directory "
+        + "files will be encoded using specified schema";
+    private String schemaName;
+    private int cellSize = 0;
+    private ECSchema schema = null;
+
+    @Override
+    protected void processOptions(LinkedList<String> args) throws IOException {
+      schemaName = StringUtils.popOptionWithArgument("-s", args);
+      String cellSizeStr = StringUtils.popOptionWithArgument("-c", args);
+      if (cellSizeStr != null) {
+        cellSize = (int) StringUtils.TraditionalBinaryPrefix
+            .string2long(cellSizeStr);
+      }
+      if (args.isEmpty()) {
+        throw new HadoopIllegalArgumentException("<path> is missing");
+      }
+      if (args.size() > 1) {
+        throw new HadoopIllegalArgumentException("Too many arguments");
+      }
+    }
+
+    @Override
+    protected void processPath(PathData item) throws IOException {
+      super.processPath(item);
+      DistributedFileSystem dfs = (DistributedFileSystem) item.fs;
+      try {
+        if (schemaName != null) {
+          ECSchema[] ecSchemas = dfs.getClient().getECSchemas();
+          for (ECSchema ecSchema : ecSchemas) {
+            if (schemaName.equals(ecSchema.getSchemaName())) {
+              schema = ecSchema;
+              break;
+            }
+          }
+          if (schema == null) {
+            StringBuilder sb = new StringBuilder();
+            sb.append("Schema '");
+            sb.append(schemaName);
+            sb.append("' does not match any of the supported schemas.");
+            sb.append(" Please select any one of ");
+            List<String> schemaNames = new ArrayList<String>();
+            for (ECSchema ecSchema : ecSchemas) {
+              schemaNames.add(ecSchema.getSchemaName());
+            }
+            sb.append(schemaNames);
+            throw new HadoopIllegalArgumentException(sb.toString());
+          }
+        }
+        dfs.createErasureCodingZone(item.path, schema, cellSize);
+        out.println("EC Zone created successfully at " + item.path);
+      } catch (IOException e) {
+        throw new IOException("Unable to create EC zone for the path "
+            + item.path + ". " + e.getMessage());
+      }
+    }
+  }
+
+  /**
+   * Get the information about the zone
+   */
+  static class GetECZoneCommand extends ECCommand {
+    public static final String NAME = "getZone";
+    public static final String USAGE = "<path>";
+    public static final String DESCRIPTION =
+        "Get information about the EC zone at specified path\n";
+
+    @Override
+    protected void processOptions(LinkedList<String> args) throws IOException {
+      if (args.isEmpty()) {
+        throw new HadoopIllegalArgumentException("<path> is missing");
+      }
+      if (args.size() > 1) {
+        throw new HadoopIllegalArgumentException("Too many arguments");
+      }
+    }
+
+    @Override
+    protected void processPath(PathData item) throws IOException {
+      super.processPath(item);
+      DistributedFileSystem dfs = (DistributedFileSystem) item.fs;
+      try {
+        ErasureCodingZone ecZone = dfs.getErasureCodingZone(item.path);
+        if (ecZone != null) {
+          out.println(ecZone.toString());
+        } else {
+          out.println("Path " + item.path + " is not in EC zone");
+        }
+      } catch (IOException e) {
+        throw new IOException("Unable to get EC zone for the path "
+            + item.path + ". " + e.getMessage());
+      }
+    }
+  }
+
+  /**
+   * List all supported EC Schemas
+   */
+  static class ListECSchemas extends ECCommand {
+    public static final String NAME = "listSchemas";
+    public static final String USAGE = "";
+    public static final String DESCRIPTION = 
+        "Get the list of ECSchemas supported\n";
+
+    @Override
+    protected void processOptions(LinkedList<String> args) throws IOException {
+      if (!args.isEmpty()) {
+        throw new HadoopIllegalArgumentException("Too many parameters");
+      }
+
+      FileSystem fs = FileSystem.get(getConf());
+      if (fs instanceof DistributedFileSystem == false) {
+        throw new UnsupportedActionException(
+            "Erasure commands are only supported for the HDFS");
+      }
+      DistributedFileSystem dfs = (DistributedFileSystem) fs;
+
+      ECSchema[] ecSchemas = dfs.getClient().getECSchemas();
+      StringBuilder sb = new StringBuilder();
+      int i = 0;
+      while (i < ecSchemas.length) {
+        ECSchema ecSchema = ecSchemas[i];
+        sb.append(ecSchema.getSchemaName());
+        i++;
+        if (i < ecSchemas.length) {
+          sb.append(", ");
+        }
+      }
+      out.println(sb.toString());
+    }
+  }
+}


Mime
View raw message