hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z..@apache.org
Subject [16/21] hadoop git commit: HDFS-7285. Erasure Coding Support inside HDFS.
Date Tue, 11 Aug 2015 17:45:06 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java
index 23e8f57..4701538 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 
 /** 
  * Interface that represents the over the wire information
@@ -58,10 +59,11 @@ public class HdfsLocatedFileStatus extends HdfsFileStatus {
       int block_replication, long blocksize, long modification_time,
       long access_time, FsPermission permission, String owner, String group,
       byte[] symlink, byte[] path, long fileId, LocatedBlocks locations,
-      int childrenNum, FileEncryptionInfo feInfo, byte storagePolicy) {
+      int childrenNum, FileEncryptionInfo feInfo, byte storagePolicy,
+      ECSchema schema, int stripeCellSize) {
     super(length, isdir, block_replication, blocksize, modification_time,
         access_time, permission, owner, group, symlink, path, fileId,
-        childrenNum, feInfo, storagePolicy);
+        childrenNum, feInfo, storagePolicy, schema, stripeCellSize);
     this.locations = locations;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index 8e81fdc..f988ae3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -201,6 +202,12 @@ import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathR
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetECSchemasRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetECSchemasResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingZoneRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingZoneResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.CreateErasureCodingZoneRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.CreateErasureCodingZoneResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
@@ -216,6 +223,7 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
 import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto;
 import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
@@ -422,7 +430,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
           req.getClientName(), flags);
       AppendResponseProto.Builder builder = AppendResponseProto.newBuilder();
       if (result.getLastBlock() != null) {
-        builder.setBlock(PBHelper.convert(result.getLastBlock()));
+        builder.setBlock(PBHelper.convertLocatedBlock(result.getLastBlock()));
       }
       if (result.getFileStatus() != null) {
         builder.setStat(PBHelper.convert(result.getFileStatus()));
@@ -498,7 +506,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
           (favor == null || favor.size() == 0) ? null : favor
               .toArray(new String[favor.size()]));
       return AddBlockResponseProto.newBuilder()
-          .setBlock(PBHelper.convert(result)).build();
+          .setBlock(PBHelper.convertLocatedBlock(result)).build();
     } catch (IOException e) {
       throw new ServiceException(e);
     }
@@ -522,7 +530,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
               new DatanodeInfoProto[excludesList.size()])), 
           req.getNumAdditionalNodes(), req.getClientName());
       return GetAdditionalDatanodeResponseProto.newBuilder().setBlock(
-          PBHelper.convert(result))
+          PBHelper.convertLocatedBlock(result))
           .build();
     } catch (IOException e) {
       throw new ServiceException(e);
@@ -548,8 +556,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       ReportBadBlocksRequestProto req) throws ServiceException {
     try {
       List<LocatedBlockProto> bl = req.getBlocksList();
-      server.reportBadBlocks(PBHelper.convertLocatedBlock(
-              bl.toArray(new LocatedBlockProto[bl.size()])));
+      server.reportBadBlocks(PBHelper.convertLocatedBlocks(
+          bl.toArray(new LocatedBlockProto[bl.size()])));
     } catch (IOException e) {
       throw new ServiceException(e);
     }
@@ -953,8 +961,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       RpcController controller, UpdateBlockForPipelineRequestProto req)
       throws ServiceException {
     try {
-      LocatedBlockProto result = PBHelper.convert(server
-          .updateBlockForPipeline(PBHelper.convert(req.getBlock()),
+      LocatedBlockProto result = PBHelper.convertLocatedBlock(
+          server.updateBlockForPipeline(PBHelper.convert(req.getBlock()),
               req.getClientName()));
       return UpdateBlockForPipelineResponseProto.newBuilder().setBlock(result)
           .build();
@@ -1394,6 +1402,21 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
   }
 
   @Override
+  public CreateErasureCodingZoneResponseProto createErasureCodingZone(
+      RpcController controller, CreateErasureCodingZoneRequestProto req)
+      throws ServiceException {
+    try {
+      ECSchema schema = req.hasSchema() ? PBHelper.convertECSchema(req
+          .getSchema()) : null;
+      int cellSize = req.hasCellSize() ? req.getCellSize() : 0;
+      server.createErasureCodingZone(req.getSrc(), schema, cellSize);
+      return CreateErasureCodingZoneResponseProto.newBuilder().build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
   public SetXAttrResponseProto setXAttr(RpcController controller,
       SetXAttrRequestProto req) throws ServiceException {
     try {
@@ -1514,4 +1537,35 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public GetECSchemasResponseProto getECSchemas(RpcController controller,
+      GetECSchemasRequestProto request) throws ServiceException {
+    try {
+      ECSchema[] ecSchemas = server.getECSchemas();
+      GetECSchemasResponseProto.Builder resBuilder = GetECSchemasResponseProto
+          .newBuilder();
+      for (ECSchema ecSchema : ecSchemas) {
+        resBuilder.addSchemas(PBHelper.convertECSchema(ecSchema));
+      }
+      return resBuilder.build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public GetErasureCodingZoneResponseProto getErasureCodingZone(RpcController controller,
+      GetErasureCodingZoneRequestProto request) throws ServiceException {
+    try {
+      ErasureCodingZone ecZone = server.getErasureCodingZone(request.getSrc());
+      GetErasureCodingZoneResponseProto.Builder builder = GetErasureCodingZoneResponseProto.newBuilder();
+      if (ecZone != null) {
+        builder.setECZone(PBHelper.convertErasureCodingZone(ecZone));
+      }
+      return builder.build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index d6afa6e..342da0c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
@@ -161,10 +162,16 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Trunca
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
+import org.apache.hadoop.hdfs.protocol.proto.*;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetECSchemasRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetECSchemasResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingZoneRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingZoneResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.CreateErasureCodingZoneRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto;
@@ -176,6 +183,7 @@ import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolMetaInterface;
 import org.apache.hadoop.ipc.ProtocolTranslator;
@@ -233,6 +241,10 @@ public class ClientNamenodeProtocolTranslatorPB implements
   VOID_GET_STORAGE_POLICIES_REQUEST =
       GetStoragePoliciesRequestProto.newBuilder().build();
 
+  private final static GetECSchemasRequestProto
+  VOID_GET_ECSCHEMAS_REQUEST = GetECSchemasRequestProto
+      .newBuilder().build();
+
   public ClientNamenodeProtocolTranslatorPB(ClientNamenodeProtocolPB proxy) {
     rpcProxy = proxy;
   }
@@ -328,7 +340,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
     try {
       AppendResponseProto res = rpcProxy.append(null, req);
       LocatedBlock lastBlock = res.hasBlock() ? PBHelper
-          .convert(res.getBlock()) : null;
+          .convertLocatedBlockProto(res.getBlock()) : null;
       HdfsFileStatus stat = (res.hasStat()) ? PBHelper.convert(res.getStat())
           : null;
       return new LastBlockWithStatus(lastBlock, stat);
@@ -416,7 +428,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
       req.addAllFavoredNodes(Arrays.asList(favoredNodes));
     }
     try {
-      return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock());
+      return PBHelper.convertLocatedBlockProto(
+          rpcProxy.addBlock(null, req.build()).getBlock());
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
@@ -441,8 +454,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
         .setClientName(clientName)
         .build();
     try {
-      return PBHelper.convert(rpcProxy.getAdditionalDatanode(null, req)
-          .getBlock());
+      return PBHelper.convertLocatedBlockProto(
+          rpcProxy.getAdditionalDatanode(null, req).getBlock());
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
@@ -469,7 +482,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
   @Override
   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
     ReportBadBlocksRequestProto req = ReportBadBlocksRequestProto.newBuilder()
-        .addAllBlocks(Arrays.asList(PBHelper.convertLocatedBlock(blocks)))
+        .addAllBlocks(Arrays.asList(PBHelper.convertLocatedBlocks(blocks)))
         .build();
     try {
       rpcProxy.reportBadBlocks(null, req);
@@ -901,7 +914,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
         .setClientName(clientName)
         .build();
     try {
-      return PBHelper.convert(
+      return PBHelper.convertLocatedBlockProto(
           rpcProxy.updateBlockForPipeline(null, req).getBlock());
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
@@ -1407,6 +1420,26 @@ public class ClientNamenodeProtocolTranslatorPB implements
   }
 
   @Override
+  public void createErasureCodingZone(String src, ECSchema schema, int cellSize)
+      throws IOException {
+    final CreateErasureCodingZoneRequestProto.Builder builder =
+        CreateErasureCodingZoneRequestProto.newBuilder();
+    builder.setSrc(src);
+    if (schema != null) {
+      builder.setSchema(PBHelper.convertECSchema(schema));
+    }
+    if (cellSize > 0) {
+      builder.setCellSize(cellSize);
+    }
+    CreateErasureCodingZoneRequestProto req = builder.build();
+    try {
+      rpcProxy.createErasureCodingZone(null, req);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
   public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
       throws IOException {
     SetXAttrRequestProto req = SetXAttrRequestProto.newBuilder()
@@ -1528,4 +1561,36 @@ public class ClientNamenodeProtocolTranslatorPB implements
       throw ProtobufHelper.getRemoteException(e);
     }
   }
+
+  @Override
+  public ECSchema[] getECSchemas() throws IOException {
+    try {
+      GetECSchemasResponseProto response = rpcProxy.getECSchemas(null,
+          VOID_GET_ECSCHEMAS_REQUEST);
+      ECSchema[] schemas = new ECSchema[response.getSchemasCount()];
+      int i = 0;
+      for (ECSchemaProto schemaProto : response.getSchemasList()) {
+        schemas[i++] = PBHelper.convertECSchema(schemaProto);
+      }
+      return schemas;
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  @Override
+  public ErasureCodingZone getErasureCodingZone(String src) throws IOException {
+    GetErasureCodingZoneRequestProto req = GetErasureCodingZoneRequestProto.newBuilder()
+        .setSrc(src).build();
+    try {
+      GetErasureCodingZoneResponseProto response = rpcProxy.getErasureCodingZone(
+          null, req);
+      if (response.hasECZone()) {
+        return PBHelper.convertErasureCodingZone(response.getECZone());
+      }
+      return null;
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index 94028a2..e71e24c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -281,7 +281,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements
     ReportBadBlocksRequestProto.Builder builder = ReportBadBlocksRequestProto
         .newBuilder();
     for (int i = 0; i < blocks.length; i++) {
-      builder.addBlocks(i, PBHelper.convert(blocks[i]));
+      builder.addBlocks(i, PBHelper.convertLocatedBlock(blocks[i]));
     }
     ReportBadBlocksRequestProto req = builder.build();
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index e133ec7..09ba564 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@ -259,7 +259,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements
     List<LocatedBlockProto> lbps = request.getBlocksList();
     LocatedBlock [] blocks = new LocatedBlock [lbps.size()];
     for(int i=0; i<lbps.size(); i++) {
-      blocks[i] = PBHelper.convert(lbps.get(i));
+      blocks[i] = PBHelper.convertLocatedBlockProto(lbps.get(i));
     }
     try {
       impl.reportBadBlocks(blocks);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 4ca5b26..a97e2ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -28,8 +28,13 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
@@ -72,6 +77,7 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingZone;
 import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
@@ -81,6 +87,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
@@ -93,7 +100,7 @@ import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.AclEntryTyp
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclEntryProto.FsActionProto;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.AclStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.AclProtos.GetAclStatusResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos;
+import org.apache.hadoop.hdfs.protocol.proto.*;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoExpirationProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
@@ -114,6 +121,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmI
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECRecoveryCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
@@ -125,7 +133,10 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDele
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECRecoveryInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.ErasureCodingZoneProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaOptionEntryProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto;
@@ -172,7 +183,6 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
-import org.apache.hadoop.hdfs.protocol.proto.InotifyProtos;
 import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsResponseProto;
@@ -191,12 +201,15 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
 import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -223,6 +236,7 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
 import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
@@ -429,22 +443,32 @@ public class PBHelper {
   }
 
   public static BlockWithLocationsProto convert(BlockWithLocations blk) {
-    return BlockWithLocationsProto.newBuilder()
-        .setBlock(convert(blk.getBlock()))
+    BlockWithLocationsProto.Builder builder = BlockWithLocationsProto
+        .newBuilder().setBlock(convert(blk.getBlock()))
         .addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids()))
         .addAllStorageUuids(Arrays.asList(blk.getStorageIDs()))
-        .addAllStorageTypes(convertStorageTypes(blk.getStorageTypes()))
-        .build();
+        .addAllStorageTypes(convertStorageTypes(blk.getStorageTypes()));
+    if (blk instanceof StripedBlockWithLocations) {
+      StripedBlockWithLocations sblk = (StripedBlockWithLocations) blk;
+      builder.setIndices(getByteString(sblk.getIndices()));
+      builder.setDataBlockNum(sblk.getDataBlockNum());
+    }
+    return builder.build();
   }
 
   public static BlockWithLocations convert(BlockWithLocationsProto b) {
     final List<String> datanodeUuids = b.getDatanodeUuidsList();
     final List<String> storageUuids = b.getStorageUuidsList();
     final List<StorageTypeProto> storageTypes = b.getStorageTypesList();
-    return new BlockWithLocations(convert(b.getBlock()),
+    BlockWithLocations blk = new BlockWithLocations(convert(b.getBlock()),
         datanodeUuids.toArray(new String[datanodeUuids.size()]),
         storageUuids.toArray(new String[storageUuids.size()]),
         convertStorageTypes(storageTypes, storageUuids.size()));
+    if (b.hasIndices()) {
+      blk = new StripedBlockWithLocations(blk, b.getIndices().toByteArray(),
+          (short) b.getDataBlockNum());
+    }
+    return blk;
   }
 
   public static BlocksWithLocationsProto convert(BlocksWithLocations blks) {
@@ -609,7 +633,7 @@ public class PBHelper {
     if (b == null) {
       return null;
     }
-    LocatedBlockProto lb = PBHelper.convert((LocatedBlock)b);
+    LocatedBlockProto lb = PBHelper.convertLocatedBlock(b);
     RecoveringBlockProto.Builder builder = RecoveringBlockProto.newBuilder();
     builder.setBlock(lb).setNewGenStamp(b.getNewGenerationStamp());
     if(b.getNewBlock() != null)
@@ -759,7 +783,7 @@ public class PBHelper {
     }
   }
   
-  public static LocatedBlockProto convert(LocatedBlock b) {
+  public static LocatedBlockProto convertLocatedBlock(LocatedBlock b) {
     if (b == null) return null;
     Builder builder = LocatedBlockProto.newBuilder();
     DatanodeInfo[] locs = b.getLocations();
@@ -780,21 +804,30 @@ public class PBHelper {
 
     StorageType[] storageTypes = b.getStorageTypes();
     if (storageTypes != null) {
-      for (int i = 0; i < storageTypes.length; ++i) {
-        builder.addStorageTypes(PBHelper.convertStorageType(storageTypes[i]));
+      for (StorageType storageType : storageTypes) {
+        builder.addStorageTypes(PBHelper.convertStorageType(storageType));
       }
     }
     final String[] storageIDs = b.getStorageIDs();
     if (storageIDs != null) {
       builder.addAllStorageIDs(Arrays.asList(storageIDs));
     }
+    if (b instanceof LocatedStripedBlock) {
+      LocatedStripedBlock sb = (LocatedStripedBlock) b;
+      int[] indices = sb.getBlockIndices();
+      Token<BlockTokenIdentifier>[] blockTokens = sb.getBlockTokens();
+      for (int i = 0; i < indices.length; i++) {
+        builder.addBlockIndex(indices[i]);
+        builder.addBlockTokens(PBHelper.convert(blockTokens[i]));
+      }
+    }
 
     return builder.setB(PBHelper.convert(b.getBlock()))
         .setBlockToken(PBHelper.convert(b.getBlockToken()))
         .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
   }
   
-  public static LocatedBlock convert(LocatedBlockProto proto) {
+  public static LocatedBlock convertLocatedBlockProto(LocatedBlockProto proto) {
     if (proto == null) return null;
     List<DatanodeInfoProto> locs = proto.getLocsList();
     DatanodeInfo[] targets = new DatanodeInfo[locs.size()];
@@ -814,6 +847,15 @@ public class PBHelper {
       storageIDs = proto.getStorageIDsList().toArray(new String[storageIDsCount]);
     }
 
+    int[] indices = null;
+    final int indexCount = proto.getBlockIndexCount();
+    if (indexCount > 0) {
+      indices = new int[indexCount];
+      for (int i = 0; i < indexCount; i++) {
+        indices[i] = proto.getBlockIndex(i);
+      }
+    }
+
     // Set values from the isCached list, re-using references from loc
     List<DatanodeInfo> cachedLocs = new ArrayList<DatanodeInfo>(locs.size());
     List<Boolean> isCachedList = proto.getIsCachedList();
@@ -823,9 +865,23 @@ public class PBHelper {
       }
     }
 
-    LocatedBlock lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets,
-        storageIDs, storageTypes, proto.getOffset(), proto.getCorrupt(),
-        cachedLocs.toArray(new DatanodeInfo[0]));
+    final LocatedBlock lb;
+    if (indices == null) {
+      lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets, storageIDs,
+          storageTypes, proto.getOffset(), proto.getCorrupt(),
+          cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()]));
+    } else {
+      lb = new LocatedStripedBlock(PBHelper.convert(proto.getB()), targets,
+          storageIDs, storageTypes, indices, proto.getOffset(),
+          proto.getCorrupt(),
+          cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()]));
+      List<TokenProto> tokenProtos = proto.getBlockTokensList();
+      Token<BlockTokenIdentifier>[] blockTokens = new Token[indices.length];
+      for (int i = 0; i < indices.length; i++) {
+        blockTokens[i] = PBHelper.convert(tokenProtos.get(i));
+      }
+      ((LocatedStripedBlock) lb).setBlockTokens(blockTokens);
+    }
     lb.setBlockToken(PBHelper.convert(proto.getBlockToken()));
 
     return lb;
@@ -918,6 +974,8 @@ public class PBHelper {
       return REG_CMD;
     case BlockIdCommand:
       return PBHelper.convert(proto.getBlkIdCmd());
+    case BlockECRecoveryCommand:
+      return PBHelper.convert(proto.getBlkECRecoveryCmd());
     default:
       return null;
     }
@@ -1068,6 +1126,11 @@ public class PBHelper {
       builder.setCmdType(DatanodeCommandProto.Type.BlockIdCommand).
         setBlkIdCmd(PBHelper.convert((BlockIdCommand) datanodeCommand));
       break;
+    case DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY:
+      builder.setCmdType(DatanodeCommandProto.Type.BlockECRecoveryCommand)
+          .setBlkECRecoveryCmd(
+              convert((BlockECRecoveryCommand) datanodeCommand));
+      break;
     case DatanodeProtocol.DNA_UNKNOWN: //Not expected
     default:
       builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand);
@@ -1241,36 +1304,36 @@ public class PBHelper {
   }
   
   // Located Block Arrays and Lists
-  public static LocatedBlockProto[] convertLocatedBlock(LocatedBlock[] lb) {
+  public static LocatedBlockProto[] convertLocatedBlocks(LocatedBlock[] lb) {
     if (lb == null) return null;
-    return convertLocatedBlock2(Arrays.asList(lb)).toArray(
-        new LocatedBlockProto[lb.length]);
+    return convertLocatedBlocks2(Arrays.asList(lb))
+        .toArray(new LocatedBlockProto[lb.length]);
   }
   
-  public static LocatedBlock[] convertLocatedBlock(LocatedBlockProto[] lb) {
+  public static LocatedBlock[] convertLocatedBlocks(LocatedBlockProto[] lb) {
     if (lb == null) return null;
-    return convertLocatedBlock(Arrays.asList(lb)).toArray(
-        new LocatedBlock[lb.length]);
+    return convertLocatedBlocks(Arrays.asList(lb))
+        .toArray(new LocatedBlock[lb.length]);
   }
   
-  public static List<LocatedBlock> convertLocatedBlock(
+  public static List<LocatedBlock> convertLocatedBlocks(
       List<LocatedBlockProto> lb) {
     if (lb == null) return null;
     final int len = lb.size();
-    List<LocatedBlock> result = 
-        new ArrayList<LocatedBlock>(len);
-    for (int i = 0; i < len; ++i) {
-      result.add(PBHelper.convert(lb.get(i)));
+    List<LocatedBlock> result = new ArrayList<>(len);
+    for (LocatedBlockProto aLb : lb) {
+      result.add(PBHelper.convertLocatedBlockProto(aLb));
     }
     return result;
   }
   
-  public static List<LocatedBlockProto> convertLocatedBlock2(List<LocatedBlock> lb) {
+  public static List<LocatedBlockProto> convertLocatedBlocks2(
+      List<LocatedBlock> lb) {
     if (lb == null) return null;
     final int len = lb.size();
-    List<LocatedBlockProto> result = new ArrayList<LocatedBlockProto>(len);
-    for (int i = 0; i < len; ++i) {
-      result.add(PBHelper.convert(lb.get(i)));
+    List<LocatedBlockProto> result = new ArrayList<>(len);
+    for (LocatedBlock aLb : lb) {
+      result.add(PBHelper.convertLocatedBlock(aLb));
     }
     return result;
   }
@@ -1280,11 +1343,13 @@ public class PBHelper {
   public static LocatedBlocks convert(LocatedBlocksProto lb) {
     return new LocatedBlocks(
         lb.getFileLength(), lb.getUnderConstruction(),
-        PBHelper.convertLocatedBlock(lb.getBlocksList()),
-        lb.hasLastBlock() ? PBHelper.convert(lb.getLastBlock()) : null,
+        PBHelper.convertLocatedBlocks(lb.getBlocksList()),
+        lb.hasLastBlock() ?
+            PBHelper.convertLocatedBlockProto(lb.getLastBlock()) : null,
         lb.getIsLastBlockComplete(),
-        lb.hasFileEncryptionInfo() ? convert(lb.getFileEncryptionInfo()) :
-            null);
+        lb.hasFileEncryptionInfo() ? convert(lb.getFileEncryptionInfo()) : null,
+        lb.hasECSchema() ? convertECSchema(lb.getECSchema()) : null,
+        lb.hasStripeCellSize() ? lb.getStripeCellSize() : 0);
   }
   
   public static LocatedBlocksProto convert(LocatedBlocks lb) {
@@ -1294,14 +1359,21 @@ public class PBHelper {
     LocatedBlocksProto.Builder builder = 
         LocatedBlocksProto.newBuilder();
     if (lb.getLastLocatedBlock() != null) {
-      builder.setLastBlock(PBHelper.convert(lb.getLastLocatedBlock()));
+      builder.setLastBlock(
+          PBHelper.convertLocatedBlock(lb.getLastLocatedBlock()));
     }
     if (lb.getFileEncryptionInfo() != null) {
       builder.setFileEncryptionInfo(convert(lb.getFileEncryptionInfo()));
     }
+    if (lb.getECSchema() != null) {
+      builder.setECSchema(convertECSchema(lb.getECSchema()));
+    }
+    if (lb.getStripeCellSize() != 0) {
+      builder.setStripeCellSize(lb.getStripeCellSize());
+    }
     return builder.setFileLength(lb.getFileLength())
         .setUnderConstruction(lb.isUnderConstruction())
-        .addAllBlocks(PBHelper.convertLocatedBlock2(lb.getLocatedBlocks()))
+        .addAllBlocks(PBHelper.convertLocatedBlocks2(lb.getLocatedBlocks()))
         .setIsLastBlockComplete(lb.isLastBlockComplete()).build();
   }
   
@@ -1441,7 +1513,9 @@ public class PBHelper {
         fs.hasChildrenNum() ? fs.getChildrenNum() : -1,
         fs.hasFileEncryptionInfo() ? convert(fs.getFileEncryptionInfo()) : null,
         fs.hasStoragePolicy() ? (byte) fs.getStoragePolicy()
-            : HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED);
+            : HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED,
+        fs.hasEcSchema() ? PBHelper.convertECSchema(fs.getEcSchema()) : null,
+        fs.hasStripeCellSize() ? fs.getStripeCellSize() : 0);
   }
 
   public static SnapshottableDirectoryStatus convert(
@@ -1502,6 +1576,10 @@ public class PBHelper {
         builder.setLocations(PBHelper.convert(locations));
       }
     }
+    if(fs.getECSchema() != null) {
+      builder.setEcSchema(PBHelper.convertECSchema(fs.getECSchema()));
+    }
+    builder.setStripeCellSize(fs.getStripeCellSize());
     return builder.build();
   }
   
@@ -3070,4 +3148,176 @@ public class PBHelper {
         setLeaseId(context.getLeaseId()).
         build();
   }
+
+  public static ECSchema convertECSchema(ECSchemaProto schema) {
+    List<ECSchemaOptionEntryProto> optionsList = schema.getOptionsList();
+    Map<String, String> options = new HashMap<>(optionsList.size());
+    for (ECSchemaOptionEntryProto option : optionsList) {
+      options.put(option.getKey(), option.getValue());
+    }
+    return new ECSchema(schema.getSchemaName(), schema.getCodecName(),
+        schema.getDataUnits(), schema.getParityUnits(), options);
+  }
+
+  public static ECSchemaProto convertECSchema(ECSchema schema) {
+    ECSchemaProto.Builder builder = ECSchemaProto.newBuilder()
+        .setSchemaName(schema.getSchemaName())
+        .setCodecName(schema.getCodecName())
+        .setDataUnits(schema.getNumDataUnits())
+        .setParityUnits(schema.getNumParityUnits());
+    Set<Entry<String, String>> entrySet = schema.getExtraOptions().entrySet();
+    for (Entry<String, String> entry : entrySet) {
+      builder.addOptions(ECSchemaOptionEntryProto.newBuilder()
+          .setKey(entry.getKey()).setValue(entry.getValue()).build());
+    }
+    return builder.build();
+  }
+
+  public static ErasureCodingZoneProto convertErasureCodingZone(
+      ErasureCodingZone ecZone) {
+    return ErasureCodingZoneProto.newBuilder().setDir(ecZone.getDir())
+        .setSchema(convertECSchema(ecZone.getSchema()))
+        .setCellSize(ecZone.getCellSize()).build();
+  }
+
+  public static ErasureCodingZone convertErasureCodingZone(
+      ErasureCodingZoneProto ecZoneProto) {
+    return new ErasureCodingZone(ecZoneProto.getDir(),
+        convertECSchema(ecZoneProto.getSchema()), ecZoneProto.getCellSize());
+  }
+  
+  public static BlockECRecoveryInfo convertBlockECRecoveryInfo(
+      BlockECRecoveryInfoProto blockEcRecoveryInfoProto) {
+    ExtendedBlockProto blockProto = blockEcRecoveryInfoProto.getBlock();
+    ExtendedBlock block = convert(blockProto);
+
+    DatanodeInfosProto sourceDnInfosProto = blockEcRecoveryInfoProto
+        .getSourceDnInfos();
+    DatanodeInfo[] sourceDnInfos = convert(sourceDnInfosProto);
+
+    DatanodeInfosProto targetDnInfosProto = blockEcRecoveryInfoProto
+        .getTargetDnInfos();
+    DatanodeInfo[] targetDnInfos = convert(targetDnInfosProto);
+
+    StorageUuidsProto targetStorageUuidsProto = blockEcRecoveryInfoProto
+        .getTargetStorageUuids();
+    String[] targetStorageUuids = convert(targetStorageUuidsProto);
+
+    StorageTypesProto targetStorageTypesProto = blockEcRecoveryInfoProto
+        .getTargetStorageTypes();
+    StorageType[] convertStorageTypes = convertStorageTypes(
+        targetStorageTypesProto.getStorageTypesList(), targetStorageTypesProto
+            .getStorageTypesList().size());
+
+    List<Integer> liveBlockIndicesList = blockEcRecoveryInfoProto
+        .getLiveBlockIndicesList();
+    short[] liveBlkIndices = new short[liveBlockIndicesList.size()];
+    for (int i = 0; i < liveBlockIndicesList.size(); i++) {
+      liveBlkIndices[i] = liveBlockIndicesList.get(i).shortValue();
+    }
+
+    ECSchema ecSchema = convertECSchema(blockEcRecoveryInfoProto.getEcSchema());
+    int cellSize = blockEcRecoveryInfoProto.getCellSize();
+
+    return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos,
+        targetStorageUuids, convertStorageTypes, liveBlkIndices, ecSchema,
+        cellSize);
+  }
+
+  public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo(
+      BlockECRecoveryInfo blockEcRecoveryInfo) {
+    BlockECRecoveryInfoProto.Builder builder = BlockECRecoveryInfoProto
+        .newBuilder();
+    builder.setBlock(convert(blockEcRecoveryInfo.getExtendedBlock()));
+
+    DatanodeInfo[] sourceDnInfos = blockEcRecoveryInfo.getSourceDnInfos();
+    builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos));
+
+    DatanodeInfo[] targetDnInfos = blockEcRecoveryInfo.getTargetDnInfos();
+    builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos));
+
+    String[] targetStorageIDs = blockEcRecoveryInfo.getTargetStorageIDs();
+    builder.setTargetStorageUuids(convertStorageIDs(targetStorageIDs));
+
+    StorageType[] targetStorageTypes = blockEcRecoveryInfo
+        .getTargetStorageTypes();
+    builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes));
+
+    short[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices();
+    builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices));
+
+    builder.setEcSchema(convertECSchema(blockEcRecoveryInfo.getECSchema()));
+    builder.setCellSize(blockEcRecoveryInfo.getCellSize());
+
+    return builder.build();
+  }
+
+  private static List<Integer> convertIntArray(short[] liveBlockIndices) {
+    List<Integer> liveBlockIndicesList = new ArrayList<Integer>();
+    for (short s : liveBlockIndices) {
+      liveBlockIndicesList.add((int) s);
+    }
+    return liveBlockIndicesList;
+  }
+
+  private static StorageTypesProto convertStorageTypesProto(
+      StorageType[] targetStorageTypes) {
+    StorageTypesProto.Builder builder = StorageTypesProto.newBuilder();
+    for (StorageType storageType : targetStorageTypes) {
+      builder.addStorageTypes(convertStorageType(storageType));
+    }
+    return builder.build();
+  }
+
+  private static StorageUuidsProto convertStorageIDs(String[] targetStorageIDs) {
+    StorageUuidsProto.Builder builder = StorageUuidsProto.newBuilder();
+    for (String storageUuid : targetStorageIDs) {
+      builder.addStorageUuids(storageUuid);
+    }
+    return builder.build();
+  }
+
+  private static DatanodeInfosProto convertToDnInfosProto(DatanodeInfo[] dnInfos) {
+    DatanodeInfosProto.Builder builder = DatanodeInfosProto.newBuilder();
+    for (DatanodeInfo datanodeInfo : dnInfos) {
+      builder.addDatanodes(convert(datanodeInfo));
+    }
+    return builder.build();
+  }
+
+  private static String[] convert(StorageUuidsProto targetStorageUuidsProto) {
+    List<String> storageUuidsList = targetStorageUuidsProto
+        .getStorageUuidsList();
+    String[] storageUuids = new String[storageUuidsList.size()];
+    for (int i = 0; i < storageUuidsList.size(); i++) {
+      storageUuids[i] = storageUuidsList.get(i);
+    }
+    return storageUuids;
+  }
+  
+  public static BlockECRecoveryCommandProto convert(
+      BlockECRecoveryCommand blkECRecoveryCmd) {
+    BlockECRecoveryCommandProto.Builder builder = BlockECRecoveryCommandProto
+        .newBuilder();
+    Collection<BlockECRecoveryInfo> blockECRecoveryInfos = blkECRecoveryCmd
+        .getECTasks();
+    for (BlockECRecoveryInfo blkECRecoveryInfo : blockECRecoveryInfos) {
+      builder
+          .addBlockECRecoveryinfo(convertBlockECRecoveryInfo(blkECRecoveryInfo));
+    }
+    return builder.build();
+  }
+  
+  public static BlockECRecoveryCommand convert(
+      BlockECRecoveryCommandProto blkECRecoveryCmdProto) {
+    Collection<BlockECRecoveryInfo> blkECRecoveryInfos = new ArrayList<BlockECRecoveryInfo>();
+    List<BlockECRecoveryInfoProto> blockECRecoveryinfoList = blkECRecoveryCmdProto
+        .getBlockECRecoveryinfoList();
+    for (BlockECRecoveryInfoProto blockECRecoveryInfoProto : blockECRecoveryinfoList) {
+      blkECRecoveryInfos
+          .add(convertBlockECRecoveryInfo(blockECRecoveryInfoProto));
+    }
+    return new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY,
+        blkECRecoveryInfos);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index 742a300..33fdc43 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -48,8 +48,8 @@ import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
 import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source;
 import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Task;
 import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Util;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicies;
 import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -205,7 +205,9 @@ public class Balancer {
    */
   private static void checkReplicationPolicyCompatibility(Configuration conf
       ) throws UnsupportedActionException {
-    if (!(BlockPlacementPolicy.getInstance(conf, null, null, null) instanceof 
+    BlockPlacementPolicies placementPolicies =
+        new BlockPlacementPolicies(conf, null, null, null);
+    if (!(placementPolicies.getPolicy(false) instanceof
         BlockPlacementPolicyDefault)) {
       throw new UnsupportedActionException(
           "Balancer without BlockPlacementPolicyDefault");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index b4b06ee..a9e4f41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.balancer;
 
 import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
@@ -52,6 +53,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -65,6 +67,7 @@ import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
@@ -149,18 +152,17 @@ public class Dispatcher {
     private final Map<Block, DBlock> map = new HashMap<Block, DBlock>();
 
     /**
-     * Get the block from the map;
-     * if the block is not found, create a new block and put it in the map.
+     * Put block in the map if it's not found
+     * @return the block which be put in the map the first time
      */
-    private DBlock get(Block b) {
-      DBlock block = map.get(b);
-      if (block == null) {
-        block = new DBlock(b);
-        map.put(b, block);
+    private DBlock putIfAbsent(Block blk, DBlock dblk) {
+      if (!map.containsKey(blk)) {
+        map.put(blk, dblk);
+        return dblk;
       }
-      return block;
+      return map.get(blk);
     }
-    
+
     /** Remove all blocks except for the moved blocks. */
     private void removeAllButRetain(MovedBlocks<StorageGroup> movedBlocks) {
       for (Iterator<Block> i = map.keySet().iterator(); i.hasNext();) {
@@ -201,9 +203,9 @@ public class Dispatcher {
     }
   }
 
-  /** This class keeps track of a scheduled block move */
+  /** This class keeps track of a scheduled reportedBlock move */
   public class PendingMove {
-    private DBlock block;
+    private DBlock reportedBlock;
     private Source source;
     private DDatanode proxySource;
     private StorageGroup target;
@@ -215,7 +217,7 @@ public class Dispatcher {
 
     @Override
     public String toString() {
-      final Block b = block != null ? block.getBlock() : null;
+      final Block b = reportedBlock != null ? reportedBlock.getBlock() : null;
       String bStr = b != null ? (b + " with size=" + b.getNumBytes() + " ")
           : " ";
       return bStr + "from " + source.getDisplayName() + " to " + target
@@ -224,8 +226,8 @@ public class Dispatcher {
     }
 
     /**
-     * Choose a block & a proxy source for this pendingMove whose source &
-     * target have already been chosen.
+     * Choose a good block/blockGroup from source & Get reportedBlock from
+     * the block & Choose a proxy source for the reportedBlock.
      * 
      * @return true if a block and its proxy are chosen; false otherwise
      */
@@ -249,7 +251,11 @@ public class Dispatcher {
       synchronized (block) {
         synchronized (movedBlocks) {
           if (isGoodBlockCandidate(source, target, targetStorageType, block)) {
-            this.block = block;
+            if (block instanceof DBlockStriped) {
+              reportedBlock = ((DBlockStriped) block).getInternalBlock(source);
+            } else {
+              reportedBlock = block;
+            }
             if (chooseProxySource()) {
               movedBlocks.put(block);
               if (LOG.isDebugEnabled()) {
@@ -276,7 +282,7 @@ public class Dispatcher {
       }
       // if node group is supported, first try add nodes in the same node group
       if (cluster.isNodeGroupAware()) {
-        for (StorageGroup loc : block.getLocations()) {
+        for (StorageGroup loc : reportedBlock.getLocations()) {
           if (cluster.isOnSameNodeGroup(loc.getDatanodeInfo(), targetDN)
               && addTo(loc)) {
             return true;
@@ -284,13 +290,13 @@ public class Dispatcher {
         }
       }
       // check if there is replica which is on the same rack with the target
-      for (StorageGroup loc : block.getLocations()) {
+      for (StorageGroup loc : reportedBlock.getLocations()) {
         if (cluster.isOnSameRack(loc.getDatanodeInfo(), targetDN) && addTo(loc)) {
           return true;
         }
       }
       // find out a non-busy replica
-      for (StorageGroup loc : block.getLocations()) {
+      for (StorageGroup loc : reportedBlock.getLocations()) {
         if (addTo(loc)) {
           return true;
         }
@@ -298,7 +304,7 @@ public class Dispatcher {
       return false;
     }
 
-    /** add to a proxy source for specific block movement */
+    /** add to a proxy source for specific reportedBlock movement */
     private boolean addTo(StorageGroup g) {
       final DDatanode dn = g.getDDatanode();
       if (dn.addPendingBlock(this)) {
@@ -311,6 +317,7 @@ public class Dispatcher {
     /** Dispatch the move to the proxy source & wait for the response. */
     private void dispatch() {
       LOG.info("Start moving " + this);
+      assert !(reportedBlock instanceof DBlockStriped);
 
       Socket sock = new Socket();
       DataOutputStream out = null;
@@ -325,7 +332,7 @@ public class Dispatcher {
         OutputStream unbufOut = sock.getOutputStream();
         InputStream unbufIn = sock.getInputStream();
         ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
-            block.getBlock());
+            reportedBlock.getBlock());
         final KeyManager km = nnc.getKeyManager(); 
         Token<BlockTokenIdentifier> accessToken = km.getAccessToken(eb);
         IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
@@ -339,7 +346,7 @@ public class Dispatcher {
 
         sendRequest(out, eb, accessToken);
         receiveResponse(in);
-        nnc.getBytesMoved().addAndGet(block.getNumBytes());
+        nnc.getBytesMoved().addAndGet(reportedBlock.getNumBytes());
         target.getDDatanode().setHasSuccess();
         LOG.info("Successfully moved " + this);
       } catch (IOException e) {
@@ -368,14 +375,14 @@ public class Dispatcher {
       }
     }
 
-    /** Send a block replace request to the output stream */
+    /** Send a reportedBlock replace request to the output stream */
     private void sendRequest(DataOutputStream out, ExtendedBlock eb,
         Token<BlockTokenIdentifier> accessToken) throws IOException {
       new Sender(out).replaceBlock(eb, target.storageType, accessToken,
           source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode);
     }
 
-    /** Receive a block copy response from the input stream */
+    /** Receive a reportedBlock copy response from the input stream */
     private void receiveResponse(DataInputStream in) throws IOException {
       BlockOpResponseProto response =
           BlockOpResponseProto.parseFrom(vintPrefixed(in));
@@ -383,13 +390,13 @@ public class Dispatcher {
         // read intermediate responses
         response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
       }
-      String logInfo = "block move is failed";
+      String logInfo = "reportedBlock move is failed";
       DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
     }
 
     /** reset the object */
     private void reset() {
-      block = null;
+      reportedBlock = null;
       source = null;
       proxySource = null;
       target = null;
@@ -401,6 +408,44 @@ public class Dispatcher {
     public DBlock(Block block) {
       super(block);
     }
+
+    public long getNumBytes(StorageGroup storage) {
+      return super.getNumBytes();
+    }
+  }
+
+  public static class DBlockStriped extends DBlock {
+
+    final byte[] indices;
+    final short dataBlockNum;
+
+    public DBlockStriped(Block block, byte[] indices, short dataBlockNum) {
+      super(block);
+      this.indices = indices;
+      this.dataBlockNum = dataBlockNum;
+    }
+
+    public DBlock getInternalBlock(StorageGroup storage) {
+      int idxInLocs = locations.indexOf(storage);
+      if (idxInLocs == -1) {
+        return null;
+      }
+      byte idxInGroup = indices[idxInLocs];
+      long blkId = getBlock().getBlockId() + idxInGroup;
+      long numBytes = getInternalBlockLength(getNumBytes(),
+          HdfsConstants.BLOCK_STRIPED_CELL_SIZE, dataBlockNum, idxInGroup);
+      Block blk = new Block(getBlock());
+      blk.setBlockId(blkId);
+      blk.setNumBytes(numBytes);
+      DBlock dblk = new DBlock(blk);
+      dblk.addLocation(storage);
+      return dblk;
+    }
+
+    @Override
+    public long getNumBytes(StorageGroup storage) {
+      return getInternalBlock(storage).getNumBytes();
+    }
   }
 
   /** The class represents a desired move. */
@@ -476,7 +521,7 @@ public class Dispatcher {
       private PendingMove addPendingMove(DBlock block, final PendingMove pm) {
         if (getDDatanode().addPendingBlock(pm)) {
           if (pm.markMovedIfGoodBlock(block, getStorageType())) {
-            incScheduledSize(pm.block.getNumBytes());
+            incScheduledSize(pm.reportedBlock.getNumBytes());
             return pm;
           } else {
             getDDatanode().removePendingBlock(pm);
@@ -651,24 +696,39 @@ public class Dispatcher {
      */
     private long getBlockList() throws IOException {
       final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
-      final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size);
+      final BlocksWithLocations newBlksLocs =
+          nnc.getBlocks(getDatanodeInfo(), size);
       if (LOG.isTraceEnabled()) {
         LOG.trace("getBlocks(" + getDatanodeInfo() + ", "
             + StringUtils.TraditionalBinaryPrefix.long2String(size, "B", 2)
-            + ") returns " + newBlocks.getBlocks().length + " blocks.");
+            + ") returns " + newBlksLocs.getBlocks().length + " blocks.");
       }
 
       long bytesReceived = 0;
-      for (BlockWithLocations blk : newBlocks.getBlocks()) {
-        bytesReceived += blk.getBlock().getNumBytes();
+      for (BlockWithLocations blkLocs : newBlksLocs.getBlocks()) {
+
+        DBlock block;
+        if (blkLocs instanceof StripedBlockWithLocations) {
+          StripedBlockWithLocations sblkLocs =
+              (StripedBlockWithLocations) blkLocs;
+          // approximate size
+          bytesReceived += sblkLocs.getBlock().getNumBytes() /
+              sblkLocs.getDataBlockNum();
+          block = new DBlockStriped(sblkLocs.getBlock(), sblkLocs.getIndices(),
+              sblkLocs.getDataBlockNum());
+        } else{
+          bytesReceived += blkLocs.getBlock().getNumBytes();
+          block = new DBlock(blkLocs.getBlock());
+        }
+
         synchronized (globalBlocks) {
-          final DBlock block = globalBlocks.get(blk.getBlock());
+          block = globalBlocks.putIfAbsent(blkLocs.getBlock(), block);
           synchronized (block) {
             block.clearLocations();
 
             // update locations
-            final String[] datanodeUuids = blk.getDatanodeUuids();
-            final StorageType[] storageTypes = blk.getStorageTypes();
+            final String[] datanodeUuids = blkLocs.getDatanodeUuids();
+            final StorageType[] storageTypes = blkLocs.getStorageTypes();
             for (int i = 0; i < datanodeUuids.length; i++) {
               final StorageGroup g = storageGroupMap.get(
                   datanodeUuids[i], storageTypes[i]);
@@ -707,6 +767,8 @@ public class Dispatcher {
      * target throttling has been considered. They are chosen only when they
      * have the capacity to support this block move. The block should be
      * dispatched immediately after this method is returned.
+     * If the block is a block group. Only the internal block on this source
+     * will be dispatched.
      * 
      * @return a move that's good for the source to dispatch immediately.
      */
@@ -718,7 +780,7 @@ public class Dispatcher {
         if (target.addPendingBlock(pendingBlock)) {
           // target is not busy, so do a tentative block allocation
           if (pendingBlock.chooseBlockAndProxy()) {
-            long blockSize = pendingBlock.block.getNumBytes();
+            long blockSize = pendingBlock.reportedBlock.getNumBytes(this);
             incScheduledSize(-blockSize);
             task.size -= blockSize;
             if (task.size == 0) {
@@ -793,7 +855,7 @@ public class Dispatcher {
             blocksToReceive -= getBlockList();
             continue;
           } catch (IOException e) {
-            LOG.warn("Exception while getting block list", e);
+            LOG.warn("Exception while getting reportedBlock list", e);
             return;
           }
         } else {
@@ -937,7 +999,7 @@ public class Dispatcher {
 
 
   public void executePendingMove(final PendingMove p) {
-    // move the block
+    // move the reportedBlock
     final DDatanode targetDn = p.target.getDDatanode();
     ExecutorService moveExecutor = targetDn.getMoveExecutor();
     if (moveExecutor == null) {
@@ -995,17 +1057,17 @@ public class Dispatcher {
       }
     }
 
-    // wait for all block moving to be done
+    // wait for all reportedBlock moving to be done
     waitForMoveCompletion(targets);
 
     return getBytesMoved() - bytesLastMoved;
   }
 
-  /** The sleeping period before checking if block move is completed again */
+  /** The sleeping period before checking if reportedBlock move is completed again */
   static private long blockMoveWaitTime = 30000L;
 
   /**
-   * Wait for all block move confirmations.
+   * Wait for all reportedBlock move confirmations.
    * @return true if there is failed move execution
    */
   public static boolean waitForMoveCompletion(
@@ -1044,10 +1106,10 @@ public class Dispatcher {
   }
 
   /**
-   * Decide if the block is a good candidate to be moved from source to target.
-   * A block is a good candidate if
+   * Decide if the block/blockGroup is a good candidate to be moved from source
+   * to target. A block is a good candidate if
    * 1. the block is not in the process of being moved/has not been moved;
-   * 2. the block does not have a replica on the target;
+   * 2. the block does not have a replica/internalBlock on the target;
    * 3. doing the move does not reduce the number of racks that the block has
    */
   private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target,
@@ -1064,7 +1126,7 @@ public class Dispatcher {
     }
     final DatanodeInfo targetDatanode = target.getDatanodeInfo();
     if (source.getDatanodeInfo().equals(targetDatanode)) {
-      // the block is moved inside same DN
+      // the reportedBlock is moved inside same DN
       return true;
     }
 
@@ -1152,7 +1214,7 @@ public class Dispatcher {
     movedBlocks.cleanup();
   }
 
-  /** set the sleeping period for block move completion check */
+  /** set the sleeping period for reportedBlock move completion check */
   @VisibleForTesting
   public static void setBlockMoveWaitTime(long time) {
     blockMoveWaitTime = time;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
index 02a1d05..928424b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
@@ -39,12 +39,12 @@ public interface BlockCollection {
   public ContentSummary computeContentSummary(BlockStoragePolicySuite bsps);
 
   /**
-   * @return the number of blocks
+   * @return the number of blocks or block groups
    */ 
   public int numBlocks();
 
   /**
-   * Get the blocks.
+   * Get the blocks (striped or contiguous).
    */
   public BlockInfo[] getBlocks();
 
@@ -55,8 +55,8 @@ public interface BlockCollection {
   public long getPreferredBlockSize();
 
   /**
-   * Get block replication for the collection 
-   * @return block replication value
+   * Get block replication for the collection.
+   * @return block replication value. Return 0 if the file is erasure coded.
    */
   public short getPreferredBlockReplication();
 
@@ -71,7 +71,7 @@ public interface BlockCollection {
   public String getName();
 
   /**
-   * Set the block at the given index.
+   * Set the block (contiguous or striped) at the given index.
    */
   public void setBlock(int index, BlockInfo blk);
 
@@ -79,11 +79,16 @@ public interface BlockCollection {
    * Convert the last block of the collection to an under-construction block
    * and set the locations.
    */
-  public BlockInfoContiguousUnderConstruction setLastBlock(BlockInfo lastBlock,
+  public void convertLastBlockToUC(BlockInfo lastBlock,
       DatanodeStorageInfo[] targets) throws IOException;
 
   /**
    * @return whether the block collection is under construction.
    */
   public boolean isUnderConstruction();
+
+  /**
+   * @return whether the block collection is in striping format
+   */
+  public boolean isStriped();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java
index feebd87..685cfcb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java
@@ -54,10 +54,12 @@ public class BlockIdManager {
    * The global block ID space for this file system.
    */
   private final SequentialBlockIdGenerator blockIdGenerator;
+  private final SequentialBlockGroupIdGenerator blockGroupIdGenerator;
 
   public BlockIdManager(BlockManager blockManager) {
     this.generationStampV1Limit = HdfsConstants.GRANDFATHER_GENERATION_STAMP;
     this.blockIdGenerator = new SequentialBlockIdGenerator(blockManager);
+    this.blockGroupIdGenerator = new SequentialBlockGroupIdGenerator(blockManager);
   }
 
   /**
@@ -102,21 +104,38 @@ public class BlockIdManager {
   }
 
   /**
-   * Sets the maximum allocated block ID for this filesystem. This is
+   * Sets the maximum allocated contiguous block ID for this filesystem. This is
    * the basis for allocating new block IDs.
    */
-  public void setLastAllocatedBlockId(long blockId) {
+  public void setLastAllocatedContiguousBlockId(long blockId) {
     blockIdGenerator.skipTo(blockId);
   }
 
   /**
-   * Gets the maximum sequentially allocated block ID for this filesystem
+   * Gets the maximum sequentially allocated contiguous block ID for this
+   * filesystem
    */
-  public long getLastAllocatedBlockId() {
+  public long getLastAllocatedContiguousBlockId() {
     return blockIdGenerator.getCurrentValue();
   }
 
   /**
+   * Sets the maximum allocated striped block ID for this filesystem. This is
+   * the basis for allocating new block IDs.
+   */
+  public void setLastAllocatedStripedBlockId(long blockId) {
+    blockGroupIdGenerator.skipTo(blockId);
+  }
+
+  /**
+   * Gets the maximum sequentially allocated striped block ID for this
+   * filesystem
+   */
+  public long getLastAllocatedStripedBlockId() {
+    return blockGroupIdGenerator.getCurrentValue();
+  }
+
+  /**
    * Sets the current generation stamp for legacy blocks
    */
   public void setGenerationStampV1(long stamp) {
@@ -187,10 +206,14 @@ public class BlockIdManager {
   /**
    * Increments, logs and then returns the block ID
    */
-  public long nextBlockId() {
+  public long nextContiguousBlockId() {
     return blockIdGenerator.nextValue();
   }
 
+  public long nextStripedBlockId() {
+    return blockGroupIdGenerator.nextValue();
+  }
+
   public boolean isGenStampInFuture(Block block) {
     if (isLegacyBlock(block)) {
       return block.getGenerationStamp() > getGenerationStampV1();
@@ -206,4 +229,27 @@ public class BlockIdManager {
       .LAST_RESERVED_BLOCK_ID);
     generationStampV1Limit = HdfsConstants.GRANDFATHER_GENERATION_STAMP;
   }
-}
\ No newline at end of file
+
+  public static boolean isStripedBlockID(long id) {
+    return id < 0;
+  }
+
+  /**
+   * The last 4 bits of HdfsConstants.BLOCK_GROUP_INDEX_MASK(15) is 1111,
+   * so the last 4 bits of (~HdfsConstants.BLOCK_GROUP_INDEX_MASK) is 0000
+   * and the other 60 bits are 1. Group ID is the first 60 bits of any
+   * data/parity block id in the same striped block group.
+   */
+  public static long convertToStripedID(long id) {
+    return id & (~HdfsServerConstants.BLOCK_GROUP_INDEX_MASK);
+  }
+
+  public static int getBlockIndex(Block reportedBlock) {
+    return (int) (reportedBlock.getBlockId() &
+        HdfsServerConstants.BLOCK_GROUP_INDEX_MASK);
+  }
+
+  SequentialBlockGroupIdGenerator getBlockGroupIdGenerator() {
+    return blockGroupIdGenerator;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
index dea31c4..a913535 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
@@ -55,16 +55,17 @@ public abstract class  BlockInfo extends Block
 
   /**
    * Construct an entry for blocksmap
-   * @param replication the block's replication factor
+   * @param size the block's replication factor, or the total number of blocks
+   *             in the block group
    */
-  public BlockInfo(short replication) {
-    this.triplets = new Object[3*replication];
+  public BlockInfo(short size) {
+    this.triplets = new Object[3*size];
     this.bc = null;
   }
 
-  public BlockInfo(Block blk, short replication) {
+  public BlockInfo(Block blk, short size) {
     super(blk);
-    this.triplets = new Object[3*replication];
+    this.triplets = new Object[3 * size];
     this.bc = null;
   }
 
@@ -108,7 +109,7 @@ public abstract class  BlockInfo extends Block
     BlockInfo info = (BlockInfo)triplets[index*3+1];
     assert info == null ||
         info.getClass().getName().startsWith(BlockInfo.class.getName()) :
-              "BlockInfo is expected at " + index*3;
+        "BlockInfo is expected at " + index*3;
     return info;
   }
 
@@ -150,7 +151,7 @@ public abstract class  BlockInfo extends Block
    *
    * @param index - the datanode index
    * @param to - block to be set to next on the list of blocks
-   *    * @return current next block on the list of blocks
+   * @return current next block on the list of blocks
    */
   BlockInfo setNext(int index, BlockInfo to) {
     assert this.triplets != null : "BlockInfo is not initialized";
@@ -167,14 +168,20 @@ public abstract class  BlockInfo extends Block
   }
 
   /**
-   * Count the number of data-nodes the block belongs to.
+   * Count the number of data-nodes the block currently belongs to (i.e., NN
+   * has received block reports from the DN).
    */
   public abstract int numNodes();
 
   /**
-   * Add a {@link DatanodeStorageInfo} location for a block.
+   * Add a {@link DatanodeStorageInfo} location for a block
+   * @param storage The storage to add
+   * @param reportedBlock The block reported from the datanode. This is only
+   *                      used by erasure coded blocks, this block's id contains
+   *                      information indicating the index of the block in the
+   *                      corresponding block group.
    */
-  abstract boolean addStorage(DatanodeStorageInfo storage);
+  abstract boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock);
 
   /**
    * Remove {@link DatanodeStorageInfo} location for a block
@@ -188,6 +195,11 @@ public abstract class  BlockInfo extends Block
    */
   abstract void replaceBlock(BlockInfo newBlock);
 
+  public abstract boolean isStriped();
+
+  /** @return true if there is no datanode storage associated with the block */
+  abstract boolean hasNoStorage();
+
   /**
    * Find specified DatanodeStorageInfo.
    * @return DatanodeStorageInfo or null if not found.
@@ -196,10 +208,9 @@ public abstract class  BlockInfo extends Block
     int len = getCapacity();
     for(int idx = 0; idx < len; idx++) {
       DatanodeStorageInfo cur = getStorageInfo(idx);
-      if(cur == null)
-        break;
-      if(cur.getDatanodeDescriptor() == dn)
+      if(cur != null && cur.getDatanodeDescriptor() == dn) {
         return cur;
+      }
     }
     return null;
   }
@@ -215,9 +226,6 @@ public abstract class  BlockInfo extends Block
       if (cur == storageInfo) {
         return idx;
       }
-      if (cur == null) {
-        break;
-      }
     }
     return -1;
   }
@@ -261,12 +269,15 @@ public abstract class  BlockInfo extends Block
     BlockInfo prev = this.getPrevious(dnIndex);
     this.setNext(dnIndex, null);
     this.setPrevious(dnIndex, null);
-    if(prev != null)
+    if (prev != null) {
       prev.setNext(prev.findStorageInfo(storage), next);
-    if(next != null)
+    }
+    if (next != null) {
       next.setPrevious(next.findStorageInfo(storage), prev);
-    if(this == head)  // removing the head
+    }
+    if (this == head) { // removing the head
       head = next;
+    }
     return head;
   }
 
@@ -295,7 +306,7 @@ public abstract class  BlockInfo extends Block
   /**
    * BlockInfo represents a block that is not being constructed.
    * In order to start modifying the block, the BlockInfo should be converted
-   * to {@link BlockInfoContiguousUnderConstruction}.
+   * to {@link BlockInfoUnderConstruction}.
    * @return {@link BlockUCState#COMPLETE}
    */
   public BlockUCState getBlockUCState() {
@@ -311,28 +322,6 @@ public abstract class  BlockInfo extends Block
     return getBlockUCState().equals(BlockUCState.COMPLETE);
   }
 
-  /**
-   * Convert a complete block to an under construction block.
-   * @return BlockInfoUnderConstruction -  an under construction block.
-   */
-  public BlockInfoContiguousUnderConstruction convertToBlockUnderConstruction(
-      BlockUCState s, DatanodeStorageInfo[] targets) {
-    if(isComplete()) {
-      BlockInfoContiguousUnderConstruction ucBlock =
-          new BlockInfoContiguousUnderConstruction(this,
-          getBlockCollection().getPreferredBlockReplication(), s, targets);
-      ucBlock.setBlockCollection(getBlockCollection());
-      return ucBlock;
-    }
-    // the block is already under construction
-    BlockInfoContiguousUnderConstruction ucBlock =
-        (BlockInfoContiguousUnderConstruction)this;
-    ucBlock.setBlockUCState(s);
-    ucBlock.setExpectedLocations(targets);
-    ucBlock.setBlockCollection(getBlockCollection());
-    return ucBlock;
-  }
-
   @Override
   public int hashCode() {
     // Super implementation is sufficient

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
index eff89a8..7e03ca5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 
 /**
  * Subclass of {@link BlockInfo}, used for a block with replication scheme.
@@ -37,11 +38,12 @@ public class BlockInfoContiguous extends BlockInfo {
 
   /**
    * Copy construction.
-   * This is used to convert BlockReplicationInfoUnderConstruction
-   * @param from BlockReplicationInfo to copy from.
+   * This is used to convert BlockInfoContiguous
+   * @param from BlockInfoContiguous to copy from.
    */
   protected BlockInfoContiguous(BlockInfoContiguous from) {
-    super(from);
+    this(from, (short) (from.triplets.length / 3));
+    this.setBlockCollection(from.getBlockCollection());
   }
 
   /**
@@ -63,7 +65,7 @@ public class BlockInfoContiguous extends BlockInfo {
   }
 
   @Override
-  boolean addStorage(DatanodeStorageInfo storage) {
+  boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
     // find the last null node
     int lastNode = ensureCapacity(1);
     setStorageInfo(lastNode, storage);
@@ -115,9 +117,42 @@ public class BlockInfoContiguous extends BlockInfo {
       assert removed : "currentBlock not found.";
 
       final DatanodeStorageInfo.AddBlockResult result = storage.addBlock(
-          newBlock);
+          newBlock, newBlock);
       assert result == DatanodeStorageInfo.AddBlockResult.ADDED :
           "newBlock already exists.";
     }
   }
+
+  /**
+   * Convert a complete block to an under construction block.
+   *
+   * @return BlockInfoUnderConstruction -  an under construction block.
+   */
+  public BlockInfoContiguousUnderConstruction convertToBlockUnderConstruction(
+      BlockUCState s, DatanodeStorageInfo[] targets) {
+    if (isComplete()) {
+      BlockInfoContiguousUnderConstruction ucBlock =
+          new BlockInfoContiguousUnderConstruction(this,
+              getBlockCollection().getPreferredBlockReplication(), s, targets);
+      ucBlock.setBlockCollection(getBlockCollection());
+      return ucBlock;
+    }
+    // the block is already under construction
+    BlockInfoContiguousUnderConstruction ucBlock =
+        (BlockInfoContiguousUnderConstruction) this;
+    ucBlock.setBlockUCState(s);
+    ucBlock.setExpectedLocations(targets);
+    ucBlock.setBlockCollection(getBlockCollection());
+    return ucBlock;
+  }
+
+  @Override
+  public final boolean isStriped() {
+    return false;
+  }
+
+  @Override
+  final boolean hasNoStorage() {
+    return getStorageInfo(0) == null;
+  }
 }


Mime
View raw message