Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EC4FA18ACB for ; Tue, 11 Aug 2015 17:44:56 +0000 (UTC) Received: (qmail 84394 invoked by uid 500); 11 Aug 2015 17:44:52 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 84248 invoked by uid 500); 11 Aug 2015 17:44:52 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 82652 invoked by uid 99); 11 Aug 2015 17:44:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Aug 2015 17:44:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 96A37E024E; Tue, 11 Aug 2015 17:44:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhz@apache.org To: common-commits@hadoop.apache.org Date: Tue, 11 Aug 2015 17:45:06 -0000 Message-Id: <7e77658fd59f4753b893baf59d2bba2d@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [16/21] hadoop git commit: HDFS-7285. Erasure Coding Support inside HDFS. http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java index 23e8f57..4701538 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSUtilClient; +import org.apache.hadoop.io.erasurecode.ECSchema; /** * Interface that represents the over the wire information @@ -58,10 +59,11 @@ public class HdfsLocatedFileStatus extends HdfsFileStatus { int block_replication, long blocksize, long modification_time, long access_time, FsPermission permission, String owner, String group, byte[] symlink, byte[] path, long fileId, LocatedBlocks locations, - int childrenNum, FileEncryptionInfo feInfo, byte storagePolicy) { + int childrenNum, FileEncryptionInfo feInfo, byte storagePolicy, + ECSchema schema, int stripeCellSize) { super(length, isdir, block_replication, blocksize, modification_time, access_time, permission, owner, group, symlink, path, fileId, - childrenNum, feInfo, storagePolicy); + childrenNum, feInfo, storagePolicy, schema, stripeCellSize); this.locations = locations; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 8e81fdc..f988ae3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.CachePoolEntry; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.ErasureCodingZone; import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -201,6 +202,12 @@ import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathR import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesResponseProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetECSchemasRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetECSchemasResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingZoneRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingZoneResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.CreateErasureCodingZoneRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.CreateErasureCodingZoneResponseProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; @@ -216,6 +223,7 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto; import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto; import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto; @@ -422,7 +430,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements req.getClientName(), flags); AppendResponseProto.Builder builder = AppendResponseProto.newBuilder(); if (result.getLastBlock() != null) { - builder.setBlock(PBHelper.convert(result.getLastBlock())); + builder.setBlock(PBHelper.convertLocatedBlock(result.getLastBlock())); } if (result.getFileStatus() != null) { builder.setStat(PBHelper.convert(result.getFileStatus())); @@ -498,7 +506,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements (favor == null || favor.size() == 0) ? null : favor .toArray(new String[favor.size()])); return AddBlockResponseProto.newBuilder() - .setBlock(PBHelper.convert(result)).build(); + .setBlock(PBHelper.convertLocatedBlock(result)).build(); } catch (IOException e) { throw new ServiceException(e); } @@ -522,7 +530,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements new DatanodeInfoProto[excludesList.size()])), req.getNumAdditionalNodes(), req.getClientName()); return GetAdditionalDatanodeResponseProto.newBuilder().setBlock( - PBHelper.convert(result)) + PBHelper.convertLocatedBlock(result)) .build(); } catch (IOException e) { throw new ServiceException(e); @@ -548,8 +556,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements ReportBadBlocksRequestProto req) throws ServiceException { try { List bl = req.getBlocksList(); - server.reportBadBlocks(PBHelper.convertLocatedBlock( - bl.toArray(new LocatedBlockProto[bl.size()]))); + server.reportBadBlocks(PBHelper.convertLocatedBlocks( + bl.toArray(new LocatedBlockProto[bl.size()]))); } catch (IOException e) { throw new ServiceException(e); } @@ -953,8 +961,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements RpcController controller, UpdateBlockForPipelineRequestProto req) throws ServiceException { try { - LocatedBlockProto result = PBHelper.convert(server - .updateBlockForPipeline(PBHelper.convert(req.getBlock()), + LocatedBlockProto result = PBHelper.convertLocatedBlock( + server.updateBlockForPipeline(PBHelper.convert(req.getBlock()), req.getClientName())); return UpdateBlockForPipelineResponseProto.newBuilder().setBlock(result) .build(); @@ -1394,6 +1402,21 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements } @Override + public CreateErasureCodingZoneResponseProto createErasureCodingZone( + RpcController controller, CreateErasureCodingZoneRequestProto req) + throws ServiceException { + try { + ECSchema schema = req.hasSchema() ? PBHelper.convertECSchema(req + .getSchema()) : null; + int cellSize = req.hasCellSize() ? req.getCellSize() : 0; + server.createErasureCodingZone(req.getSrc(), schema, cellSize); + return CreateErasureCodingZoneResponseProto.newBuilder().build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override public SetXAttrResponseProto setXAttr(RpcController controller, SetXAttrRequestProto req) throws ServiceException { try { @@ -1514,4 +1537,35 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements throw new ServiceException(e); } } + + @Override + public GetECSchemasResponseProto getECSchemas(RpcController controller, + GetECSchemasRequestProto request) throws ServiceException { + try { + ECSchema[] ecSchemas = server.getECSchemas(); + GetECSchemasResponseProto.Builder resBuilder = GetECSchemasResponseProto + .newBuilder(); + for (ECSchema ecSchema : ecSchemas) { + resBuilder.addSchemas(PBHelper.convertECSchema(ecSchema)); + } + return resBuilder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetErasureCodingZoneResponseProto getErasureCodingZone(RpcController controller, + GetErasureCodingZoneRequestProto request) throws ServiceException { + try { + ErasureCodingZone ecZone = server.getErasureCodingZone(request.getSrc()); + GetErasureCodingZoneResponseProto.Builder builder = GetErasureCodingZoneResponseProto.newBuilder(); + if (ecZone != null) { + builder.setECZone(PBHelper.convertErasureCodingZone(ecZone)); + } + return builder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index d6afa6e..342da0c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.ErasureCodingZone; import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; @@ -161,10 +162,16 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Trunca import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto; -import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos; +import org.apache.hadoop.hdfs.protocol.proto.*; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto; import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetECSchemasRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetECSchemasResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingZoneRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingZoneResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.CreateErasureCodingZoneRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto; @@ -176,6 +183,7 @@ import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolTranslator; @@ -233,6 +241,10 @@ public class ClientNamenodeProtocolTranslatorPB implements VOID_GET_STORAGE_POLICIES_REQUEST = GetStoragePoliciesRequestProto.newBuilder().build(); + private final static GetECSchemasRequestProto + VOID_GET_ECSCHEMAS_REQUEST = GetECSchemasRequestProto + .newBuilder().build(); + public ClientNamenodeProtocolTranslatorPB(ClientNamenodeProtocolPB proxy) { rpcProxy = proxy; } @@ -328,7 +340,7 @@ public class ClientNamenodeProtocolTranslatorPB implements try { AppendResponseProto res = rpcProxy.append(null, req); LocatedBlock lastBlock = res.hasBlock() ? PBHelper - .convert(res.getBlock()) : null; + .convertLocatedBlockProto(res.getBlock()) : null; HdfsFileStatus stat = (res.hasStat()) ? PBHelper.convert(res.getStat()) : null; return new LastBlockWithStatus(lastBlock, stat); @@ -416,7 +428,8 @@ public class ClientNamenodeProtocolTranslatorPB implements req.addAllFavoredNodes(Arrays.asList(favoredNodes)); } try { - return PBHelper.convert(rpcProxy.addBlock(null, req.build()).getBlock()); + return PBHelper.convertLocatedBlockProto( + rpcProxy.addBlock(null, req.build()).getBlock()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } @@ -441,8 +454,8 @@ public class ClientNamenodeProtocolTranslatorPB implements .setClientName(clientName) .build(); try { - return PBHelper.convert(rpcProxy.getAdditionalDatanode(null, req) - .getBlock()); + return PBHelper.convertLocatedBlockProto( + rpcProxy.getAdditionalDatanode(null, req).getBlock()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } @@ -469,7 +482,7 @@ public class ClientNamenodeProtocolTranslatorPB implements @Override public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { ReportBadBlocksRequestProto req = ReportBadBlocksRequestProto.newBuilder() - .addAllBlocks(Arrays.asList(PBHelper.convertLocatedBlock(blocks))) + .addAllBlocks(Arrays.asList(PBHelper.convertLocatedBlocks(blocks))) .build(); try { rpcProxy.reportBadBlocks(null, req); @@ -901,7 +914,7 @@ public class ClientNamenodeProtocolTranslatorPB implements .setClientName(clientName) .build(); try { - return PBHelper.convert( + return PBHelper.convertLocatedBlockProto( rpcProxy.updateBlockForPipeline(null, req).getBlock()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); @@ -1407,6 +1420,26 @@ public class ClientNamenodeProtocolTranslatorPB implements } @Override + public void createErasureCodingZone(String src, ECSchema schema, int cellSize) + throws IOException { + final CreateErasureCodingZoneRequestProto.Builder builder = + CreateErasureCodingZoneRequestProto.newBuilder(); + builder.setSrc(src); + if (schema != null) { + builder.setSchema(PBHelper.convertECSchema(schema)); + } + if (cellSize > 0) { + builder.setCellSize(cellSize); + } + CreateErasureCodingZoneRequestProto req = builder.build(); + try { + rpcProxy.createErasureCodingZone(null, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override public void setXAttr(String src, XAttr xAttr, EnumSet flag) throws IOException { SetXAttrRequestProto req = SetXAttrRequestProto.newBuilder() @@ -1528,4 +1561,36 @@ public class ClientNamenodeProtocolTranslatorPB implements throw ProtobufHelper.getRemoteException(e); } } + + @Override + public ECSchema[] getECSchemas() throws IOException { + try { + GetECSchemasResponseProto response = rpcProxy.getECSchemas(null, + VOID_GET_ECSCHEMAS_REQUEST); + ECSchema[] schemas = new ECSchema[response.getSchemasCount()]; + int i = 0; + for (ECSchemaProto schemaProto : response.getSchemasList()) { + schemas[i++] = PBHelper.convertECSchema(schemaProto); + } + return schemas; + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public ErasureCodingZone getErasureCodingZone(String src) throws IOException { + GetErasureCodingZoneRequestProto req = GetErasureCodingZoneRequestProto.newBuilder() + .setSrc(src).build(); + try { + GetErasureCodingZoneResponseProto response = rpcProxy.getErasureCodingZone( + null, req); + if (response.hasECZone()) { + return PBHelper.convertErasureCodingZone(response.getECZone()); + } + return null; + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 94028a2..e71e24c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -281,7 +281,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements ReportBadBlocksRequestProto.Builder builder = ReportBadBlocksRequestProto .newBuilder(); for (int i = 0; i < blocks.length; i++) { - builder.addBlocks(i, PBHelper.convert(blocks[i])); + builder.addBlocks(i, PBHelper.convertLocatedBlock(blocks[i])); } ReportBadBlocksRequestProto req = builder.build(); try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java index e133ec7..09ba564 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java @@ -259,7 +259,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements List lbps = request.getBlocksList(); LocatedBlock [] blocks = new LocatedBlock [lbps.size()]; for(int i=0; i datanodeUuids = b.getDatanodeUuidsList(); final List storageUuids = b.getStorageUuidsList(); final List storageTypes = b.getStorageTypesList(); - return new BlockWithLocations(convert(b.getBlock()), + BlockWithLocations blk = new BlockWithLocations(convert(b.getBlock()), datanodeUuids.toArray(new String[datanodeUuids.size()]), storageUuids.toArray(new String[storageUuids.size()]), convertStorageTypes(storageTypes, storageUuids.size())); + if (b.hasIndices()) { + blk = new StripedBlockWithLocations(blk, b.getIndices().toByteArray(), + (short) b.getDataBlockNum()); + } + return blk; } public static BlocksWithLocationsProto convert(BlocksWithLocations blks) { @@ -609,7 +633,7 @@ public class PBHelper { if (b == null) { return null; } - LocatedBlockProto lb = PBHelper.convert((LocatedBlock)b); + LocatedBlockProto lb = PBHelper.convertLocatedBlock(b); RecoveringBlockProto.Builder builder = RecoveringBlockProto.newBuilder(); builder.setBlock(lb).setNewGenStamp(b.getNewGenerationStamp()); if(b.getNewBlock() != null) @@ -759,7 +783,7 @@ public class PBHelper { } } - public static LocatedBlockProto convert(LocatedBlock b) { + public static LocatedBlockProto convertLocatedBlock(LocatedBlock b) { if (b == null) return null; Builder builder = LocatedBlockProto.newBuilder(); DatanodeInfo[] locs = b.getLocations(); @@ -780,21 +804,30 @@ public class PBHelper { StorageType[] storageTypes = b.getStorageTypes(); if (storageTypes != null) { - for (int i = 0; i < storageTypes.length; ++i) { - builder.addStorageTypes(PBHelper.convertStorageType(storageTypes[i])); + for (StorageType storageType : storageTypes) { + builder.addStorageTypes(PBHelper.convertStorageType(storageType)); } } final String[] storageIDs = b.getStorageIDs(); if (storageIDs != null) { builder.addAllStorageIDs(Arrays.asList(storageIDs)); } + if (b instanceof LocatedStripedBlock) { + LocatedStripedBlock sb = (LocatedStripedBlock) b; + int[] indices = sb.getBlockIndices(); + Token[] blockTokens = sb.getBlockTokens(); + for (int i = 0; i < indices.length; i++) { + builder.addBlockIndex(indices[i]); + builder.addBlockTokens(PBHelper.convert(blockTokens[i])); + } + } return builder.setB(PBHelper.convert(b.getBlock())) .setBlockToken(PBHelper.convert(b.getBlockToken())) .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build(); } - public static LocatedBlock convert(LocatedBlockProto proto) { + public static LocatedBlock convertLocatedBlockProto(LocatedBlockProto proto) { if (proto == null) return null; List locs = proto.getLocsList(); DatanodeInfo[] targets = new DatanodeInfo[locs.size()]; @@ -814,6 +847,15 @@ public class PBHelper { storageIDs = proto.getStorageIDsList().toArray(new String[storageIDsCount]); } + int[] indices = null; + final int indexCount = proto.getBlockIndexCount(); + if (indexCount > 0) { + indices = new int[indexCount]; + for (int i = 0; i < indexCount; i++) { + indices[i] = proto.getBlockIndex(i); + } + } + // Set values from the isCached list, re-using references from loc List cachedLocs = new ArrayList(locs.size()); List isCachedList = proto.getIsCachedList(); @@ -823,9 +865,23 @@ public class PBHelper { } } - LocatedBlock lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets, - storageIDs, storageTypes, proto.getOffset(), proto.getCorrupt(), - cachedLocs.toArray(new DatanodeInfo[0])); + final LocatedBlock lb; + if (indices == null) { + lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets, storageIDs, + storageTypes, proto.getOffset(), proto.getCorrupt(), + cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()])); + } else { + lb = new LocatedStripedBlock(PBHelper.convert(proto.getB()), targets, + storageIDs, storageTypes, indices, proto.getOffset(), + proto.getCorrupt(), + cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()])); + List tokenProtos = proto.getBlockTokensList(); + Token[] blockTokens = new Token[indices.length]; + for (int i = 0; i < indices.length; i++) { + blockTokens[i] = PBHelper.convert(tokenProtos.get(i)); + } + ((LocatedStripedBlock) lb).setBlockTokens(blockTokens); + } lb.setBlockToken(PBHelper.convert(proto.getBlockToken())); return lb; @@ -918,6 +974,8 @@ public class PBHelper { return REG_CMD; case BlockIdCommand: return PBHelper.convert(proto.getBlkIdCmd()); + case BlockECRecoveryCommand: + return PBHelper.convert(proto.getBlkECRecoveryCmd()); default: return null; } @@ -1068,6 +1126,11 @@ public class PBHelper { builder.setCmdType(DatanodeCommandProto.Type.BlockIdCommand). setBlkIdCmd(PBHelper.convert((BlockIdCommand) datanodeCommand)); break; + case DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY: + builder.setCmdType(DatanodeCommandProto.Type.BlockECRecoveryCommand) + .setBlkECRecoveryCmd( + convert((BlockECRecoveryCommand) datanodeCommand)); + break; case DatanodeProtocol.DNA_UNKNOWN: //Not expected default: builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand); @@ -1241,36 +1304,36 @@ public class PBHelper { } // Located Block Arrays and Lists - public static LocatedBlockProto[] convertLocatedBlock(LocatedBlock[] lb) { + public static LocatedBlockProto[] convertLocatedBlocks(LocatedBlock[] lb) { if (lb == null) return null; - return convertLocatedBlock2(Arrays.asList(lb)).toArray( - new LocatedBlockProto[lb.length]); + return convertLocatedBlocks2(Arrays.asList(lb)) + .toArray(new LocatedBlockProto[lb.length]); } - public static LocatedBlock[] convertLocatedBlock(LocatedBlockProto[] lb) { + public static LocatedBlock[] convertLocatedBlocks(LocatedBlockProto[] lb) { if (lb == null) return null; - return convertLocatedBlock(Arrays.asList(lb)).toArray( - new LocatedBlock[lb.length]); + return convertLocatedBlocks(Arrays.asList(lb)) + .toArray(new LocatedBlock[lb.length]); } - public static List convertLocatedBlock( + public static List convertLocatedBlocks( List lb) { if (lb == null) return null; final int len = lb.size(); - List result = - new ArrayList(len); - for (int i = 0; i < len; ++i) { - result.add(PBHelper.convert(lb.get(i))); + List result = new ArrayList<>(len); + for (LocatedBlockProto aLb : lb) { + result.add(PBHelper.convertLocatedBlockProto(aLb)); } return result; } - public static List convertLocatedBlock2(List lb) { + public static List convertLocatedBlocks2( + List lb) { if (lb == null) return null; final int len = lb.size(); - List result = new ArrayList(len); - for (int i = 0; i < len; ++i) { - result.add(PBHelper.convert(lb.get(i))); + List result = new ArrayList<>(len); + for (LocatedBlock aLb : lb) { + result.add(PBHelper.convertLocatedBlock(aLb)); } return result; } @@ -1280,11 +1343,13 @@ public class PBHelper { public static LocatedBlocks convert(LocatedBlocksProto lb) { return new LocatedBlocks( lb.getFileLength(), lb.getUnderConstruction(), - PBHelper.convertLocatedBlock(lb.getBlocksList()), - lb.hasLastBlock() ? PBHelper.convert(lb.getLastBlock()) : null, + PBHelper.convertLocatedBlocks(lb.getBlocksList()), + lb.hasLastBlock() ? + PBHelper.convertLocatedBlockProto(lb.getLastBlock()) : null, lb.getIsLastBlockComplete(), - lb.hasFileEncryptionInfo() ? convert(lb.getFileEncryptionInfo()) : - null); + lb.hasFileEncryptionInfo() ? convert(lb.getFileEncryptionInfo()) : null, + lb.hasECSchema() ? convertECSchema(lb.getECSchema()) : null, + lb.hasStripeCellSize() ? lb.getStripeCellSize() : 0); } public static LocatedBlocksProto convert(LocatedBlocks lb) { @@ -1294,14 +1359,21 @@ public class PBHelper { LocatedBlocksProto.Builder builder = LocatedBlocksProto.newBuilder(); if (lb.getLastLocatedBlock() != null) { - builder.setLastBlock(PBHelper.convert(lb.getLastLocatedBlock())); + builder.setLastBlock( + PBHelper.convertLocatedBlock(lb.getLastLocatedBlock())); } if (lb.getFileEncryptionInfo() != null) { builder.setFileEncryptionInfo(convert(lb.getFileEncryptionInfo())); } + if (lb.getECSchema() != null) { + builder.setECSchema(convertECSchema(lb.getECSchema())); + } + if (lb.getStripeCellSize() != 0) { + builder.setStripeCellSize(lb.getStripeCellSize()); + } return builder.setFileLength(lb.getFileLength()) .setUnderConstruction(lb.isUnderConstruction()) - .addAllBlocks(PBHelper.convertLocatedBlock2(lb.getLocatedBlocks())) + .addAllBlocks(PBHelper.convertLocatedBlocks2(lb.getLocatedBlocks())) .setIsLastBlockComplete(lb.isLastBlockComplete()).build(); } @@ -1441,7 +1513,9 @@ public class PBHelper { fs.hasChildrenNum() ? fs.getChildrenNum() : -1, fs.hasFileEncryptionInfo() ? convert(fs.getFileEncryptionInfo()) : null, fs.hasStoragePolicy() ? (byte) fs.getStoragePolicy() - : HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED); + : HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED, + fs.hasEcSchema() ? PBHelper.convertECSchema(fs.getEcSchema()) : null, + fs.hasStripeCellSize() ? fs.getStripeCellSize() : 0); } public static SnapshottableDirectoryStatus convert( @@ -1502,6 +1576,10 @@ public class PBHelper { builder.setLocations(PBHelper.convert(locations)); } } + if(fs.getECSchema() != null) { + builder.setEcSchema(PBHelper.convertECSchema(fs.getECSchema())); + } + builder.setStripeCellSize(fs.getStripeCellSize()); return builder.build(); } @@ -3070,4 +3148,176 @@ public class PBHelper { setLeaseId(context.getLeaseId()). build(); } + + public static ECSchema convertECSchema(ECSchemaProto schema) { + List optionsList = schema.getOptionsList(); + Map options = new HashMap<>(optionsList.size()); + for (ECSchemaOptionEntryProto option : optionsList) { + options.put(option.getKey(), option.getValue()); + } + return new ECSchema(schema.getSchemaName(), schema.getCodecName(), + schema.getDataUnits(), schema.getParityUnits(), options); + } + + public static ECSchemaProto convertECSchema(ECSchema schema) { + ECSchemaProto.Builder builder = ECSchemaProto.newBuilder() + .setSchemaName(schema.getSchemaName()) + .setCodecName(schema.getCodecName()) + .setDataUnits(schema.getNumDataUnits()) + .setParityUnits(schema.getNumParityUnits()); + Set> entrySet = schema.getExtraOptions().entrySet(); + for (Entry entry : entrySet) { + builder.addOptions(ECSchemaOptionEntryProto.newBuilder() + .setKey(entry.getKey()).setValue(entry.getValue()).build()); + } + return builder.build(); + } + + public static ErasureCodingZoneProto convertErasureCodingZone( + ErasureCodingZone ecZone) { + return ErasureCodingZoneProto.newBuilder().setDir(ecZone.getDir()) + .setSchema(convertECSchema(ecZone.getSchema())) + .setCellSize(ecZone.getCellSize()).build(); + } + + public static ErasureCodingZone convertErasureCodingZone( + ErasureCodingZoneProto ecZoneProto) { + return new ErasureCodingZone(ecZoneProto.getDir(), + convertECSchema(ecZoneProto.getSchema()), ecZoneProto.getCellSize()); + } + + public static BlockECRecoveryInfo convertBlockECRecoveryInfo( + BlockECRecoveryInfoProto blockEcRecoveryInfoProto) { + ExtendedBlockProto blockProto = blockEcRecoveryInfoProto.getBlock(); + ExtendedBlock block = convert(blockProto); + + DatanodeInfosProto sourceDnInfosProto = blockEcRecoveryInfoProto + .getSourceDnInfos(); + DatanodeInfo[] sourceDnInfos = convert(sourceDnInfosProto); + + DatanodeInfosProto targetDnInfosProto = blockEcRecoveryInfoProto + .getTargetDnInfos(); + DatanodeInfo[] targetDnInfos = convert(targetDnInfosProto); + + StorageUuidsProto targetStorageUuidsProto = blockEcRecoveryInfoProto + .getTargetStorageUuids(); + String[] targetStorageUuids = convert(targetStorageUuidsProto); + + StorageTypesProto targetStorageTypesProto = blockEcRecoveryInfoProto + .getTargetStorageTypes(); + StorageType[] convertStorageTypes = convertStorageTypes( + targetStorageTypesProto.getStorageTypesList(), targetStorageTypesProto + .getStorageTypesList().size()); + + List liveBlockIndicesList = blockEcRecoveryInfoProto + .getLiveBlockIndicesList(); + short[] liveBlkIndices = new short[liveBlockIndicesList.size()]; + for (int i = 0; i < liveBlockIndicesList.size(); i++) { + liveBlkIndices[i] = liveBlockIndicesList.get(i).shortValue(); + } + + ECSchema ecSchema = convertECSchema(blockEcRecoveryInfoProto.getEcSchema()); + int cellSize = blockEcRecoveryInfoProto.getCellSize(); + + return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos, + targetStorageUuids, convertStorageTypes, liveBlkIndices, ecSchema, + cellSize); + } + + public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo( + BlockECRecoveryInfo blockEcRecoveryInfo) { + BlockECRecoveryInfoProto.Builder builder = BlockECRecoveryInfoProto + .newBuilder(); + builder.setBlock(convert(blockEcRecoveryInfo.getExtendedBlock())); + + DatanodeInfo[] sourceDnInfos = blockEcRecoveryInfo.getSourceDnInfos(); + builder.setSourceDnInfos(convertToDnInfosProto(sourceDnInfos)); + + DatanodeInfo[] targetDnInfos = blockEcRecoveryInfo.getTargetDnInfos(); + builder.setTargetDnInfos(convertToDnInfosProto(targetDnInfos)); + + String[] targetStorageIDs = blockEcRecoveryInfo.getTargetStorageIDs(); + builder.setTargetStorageUuids(convertStorageIDs(targetStorageIDs)); + + StorageType[] targetStorageTypes = blockEcRecoveryInfo + .getTargetStorageTypes(); + builder.setTargetStorageTypes(convertStorageTypesProto(targetStorageTypes)); + + short[] liveBlockIndices = blockEcRecoveryInfo.getLiveBlockIndices(); + builder.addAllLiveBlockIndices(convertIntArray(liveBlockIndices)); + + builder.setEcSchema(convertECSchema(blockEcRecoveryInfo.getECSchema())); + builder.setCellSize(blockEcRecoveryInfo.getCellSize()); + + return builder.build(); + } + + private static List convertIntArray(short[] liveBlockIndices) { + List liveBlockIndicesList = new ArrayList(); + for (short s : liveBlockIndices) { + liveBlockIndicesList.add((int) s); + } + return liveBlockIndicesList; + } + + private static StorageTypesProto convertStorageTypesProto( + StorageType[] targetStorageTypes) { + StorageTypesProto.Builder builder = StorageTypesProto.newBuilder(); + for (StorageType storageType : targetStorageTypes) { + builder.addStorageTypes(convertStorageType(storageType)); + } + return builder.build(); + } + + private static StorageUuidsProto convertStorageIDs(String[] targetStorageIDs) { + StorageUuidsProto.Builder builder = StorageUuidsProto.newBuilder(); + for (String storageUuid : targetStorageIDs) { + builder.addStorageUuids(storageUuid); + } + return builder.build(); + } + + private static DatanodeInfosProto convertToDnInfosProto(DatanodeInfo[] dnInfos) { + DatanodeInfosProto.Builder builder = DatanodeInfosProto.newBuilder(); + for (DatanodeInfo datanodeInfo : dnInfos) { + builder.addDatanodes(convert(datanodeInfo)); + } + return builder.build(); + } + + private static String[] convert(StorageUuidsProto targetStorageUuidsProto) { + List storageUuidsList = targetStorageUuidsProto + .getStorageUuidsList(); + String[] storageUuids = new String[storageUuidsList.size()]; + for (int i = 0; i < storageUuidsList.size(); i++) { + storageUuids[i] = storageUuidsList.get(i); + } + return storageUuids; + } + + public static BlockECRecoveryCommandProto convert( + BlockECRecoveryCommand blkECRecoveryCmd) { + BlockECRecoveryCommandProto.Builder builder = BlockECRecoveryCommandProto + .newBuilder(); + Collection blockECRecoveryInfos = blkECRecoveryCmd + .getECTasks(); + for (BlockECRecoveryInfo blkECRecoveryInfo : blockECRecoveryInfos) { + builder + .addBlockECRecoveryinfo(convertBlockECRecoveryInfo(blkECRecoveryInfo)); + } + return builder.build(); + } + + public static BlockECRecoveryCommand convert( + BlockECRecoveryCommandProto blkECRecoveryCmdProto) { + Collection blkECRecoveryInfos = new ArrayList(); + List blockECRecoveryinfoList = blkECRecoveryCmdProto + .getBlockECRecoveryinfoList(); + for (BlockECRecoveryInfoProto blockECRecoveryInfoProto : blockECRecoveryinfoList) { + blkECRecoveryInfos + .add(convertBlockECRecoveryInfo(blockECRecoveryInfoProto)); + } + return new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY, + blkECRecoveryInfos); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 742a300..33fdc43 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -48,8 +48,8 @@ import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Task; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Util; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicies; import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -205,7 +205,9 @@ public class Balancer { */ private static void checkReplicationPolicyCompatibility(Configuration conf ) throws UnsupportedActionException { - if (!(BlockPlacementPolicy.getInstance(conf, null, null, null) instanceof + BlockPlacementPolicies placementPolicies = + new BlockPlacementPolicies(conf, null, null, null); + if (!(placementPolicies.getPolicy(false) instanceof BlockPlacementPolicyDefault)) { throw new UnsupportedActionException( "Balancer without BlockPlacementPolicyDefault"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index b4b06ee..a9e4f41 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.balancer; import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; @@ -52,6 +53,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; @@ -65,6 +67,7 @@ import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; @@ -149,18 +152,17 @@ public class Dispatcher { private final Map map = new HashMap(); /** - * Get the block from the map; - * if the block is not found, create a new block and put it in the map. + * Put block in the map if it's not found + * @return the block which be put in the map the first time */ - private DBlock get(Block b) { - DBlock block = map.get(b); - if (block == null) { - block = new DBlock(b); - map.put(b, block); + private DBlock putIfAbsent(Block blk, DBlock dblk) { + if (!map.containsKey(blk)) { + map.put(blk, dblk); + return dblk; } - return block; + return map.get(blk); } - + /** Remove all blocks except for the moved blocks. */ private void removeAllButRetain(MovedBlocks movedBlocks) { for (Iterator i = map.keySet().iterator(); i.hasNext();) { @@ -201,9 +203,9 @@ public class Dispatcher { } } - /** This class keeps track of a scheduled block move */ + /** This class keeps track of a scheduled reportedBlock move */ public class PendingMove { - private DBlock block; + private DBlock reportedBlock; private Source source; private DDatanode proxySource; private StorageGroup target; @@ -215,7 +217,7 @@ public class Dispatcher { @Override public String toString() { - final Block b = block != null ? block.getBlock() : null; + final Block b = reportedBlock != null ? reportedBlock.getBlock() : null; String bStr = b != null ? (b + " with size=" + b.getNumBytes() + " ") : " "; return bStr + "from " + source.getDisplayName() + " to " + target @@ -224,8 +226,8 @@ public class Dispatcher { } /** - * Choose a block & a proxy source for this pendingMove whose source & - * target have already been chosen. + * Choose a good block/blockGroup from source & Get reportedBlock from + * the block & Choose a proxy source for the reportedBlock. * * @return true if a block and its proxy are chosen; false otherwise */ @@ -249,7 +251,11 @@ public class Dispatcher { synchronized (block) { synchronized (movedBlocks) { if (isGoodBlockCandidate(source, target, targetStorageType, block)) { - this.block = block; + if (block instanceof DBlockStriped) { + reportedBlock = ((DBlockStriped) block).getInternalBlock(source); + } else { + reportedBlock = block; + } if (chooseProxySource()) { movedBlocks.put(block); if (LOG.isDebugEnabled()) { @@ -276,7 +282,7 @@ public class Dispatcher { } // if node group is supported, first try add nodes in the same node group if (cluster.isNodeGroupAware()) { - for (StorageGroup loc : block.getLocations()) { + for (StorageGroup loc : reportedBlock.getLocations()) { if (cluster.isOnSameNodeGroup(loc.getDatanodeInfo(), targetDN) && addTo(loc)) { return true; @@ -284,13 +290,13 @@ public class Dispatcher { } } // check if there is replica which is on the same rack with the target - for (StorageGroup loc : block.getLocations()) { + for (StorageGroup loc : reportedBlock.getLocations()) { if (cluster.isOnSameRack(loc.getDatanodeInfo(), targetDN) && addTo(loc)) { return true; } } // find out a non-busy replica - for (StorageGroup loc : block.getLocations()) { + for (StorageGroup loc : reportedBlock.getLocations()) { if (addTo(loc)) { return true; } @@ -298,7 +304,7 @@ public class Dispatcher { return false; } - /** add to a proxy source for specific block movement */ + /** add to a proxy source for specific reportedBlock movement */ private boolean addTo(StorageGroup g) { final DDatanode dn = g.getDDatanode(); if (dn.addPendingBlock(this)) { @@ -311,6 +317,7 @@ public class Dispatcher { /** Dispatch the move to the proxy source & wait for the response. */ private void dispatch() { LOG.info("Start moving " + this); + assert !(reportedBlock instanceof DBlockStriped); Socket sock = new Socket(); DataOutputStream out = null; @@ -325,7 +332,7 @@ public class Dispatcher { OutputStream unbufOut = sock.getOutputStream(); InputStream unbufIn = sock.getInputStream(); ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(), - block.getBlock()); + reportedBlock.getBlock()); final KeyManager km = nnc.getKeyManager(); Token accessToken = km.getAccessToken(eb); IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, @@ -339,7 +346,7 @@ public class Dispatcher { sendRequest(out, eb, accessToken); receiveResponse(in); - nnc.getBytesMoved().addAndGet(block.getNumBytes()); + nnc.getBytesMoved().addAndGet(reportedBlock.getNumBytes()); target.getDDatanode().setHasSuccess(); LOG.info("Successfully moved " + this); } catch (IOException e) { @@ -368,14 +375,14 @@ public class Dispatcher { } } - /** Send a block replace request to the output stream */ + /** Send a reportedBlock replace request to the output stream */ private void sendRequest(DataOutputStream out, ExtendedBlock eb, Token accessToken) throws IOException { new Sender(out).replaceBlock(eb, target.storageType, accessToken, source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode); } - /** Receive a block copy response from the input stream */ + /** Receive a reportedBlock copy response from the input stream */ private void receiveResponse(DataInputStream in) throws IOException { BlockOpResponseProto response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); @@ -383,13 +390,13 @@ public class Dispatcher { // read intermediate responses response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); } - String logInfo = "block move is failed"; + String logInfo = "reportedBlock move is failed"; DataTransferProtoUtil.checkBlockOpStatus(response, logInfo); } /** reset the object */ private void reset() { - block = null; + reportedBlock = null; source = null; proxySource = null; target = null; @@ -401,6 +408,44 @@ public class Dispatcher { public DBlock(Block block) { super(block); } + + public long getNumBytes(StorageGroup storage) { + return super.getNumBytes(); + } + } + + public static class DBlockStriped extends DBlock { + + final byte[] indices; + final short dataBlockNum; + + public DBlockStriped(Block block, byte[] indices, short dataBlockNum) { + super(block); + this.indices = indices; + this.dataBlockNum = dataBlockNum; + } + + public DBlock getInternalBlock(StorageGroup storage) { + int idxInLocs = locations.indexOf(storage); + if (idxInLocs == -1) { + return null; + } + byte idxInGroup = indices[idxInLocs]; + long blkId = getBlock().getBlockId() + idxInGroup; + long numBytes = getInternalBlockLength(getNumBytes(), + HdfsConstants.BLOCK_STRIPED_CELL_SIZE, dataBlockNum, idxInGroup); + Block blk = new Block(getBlock()); + blk.setBlockId(blkId); + blk.setNumBytes(numBytes); + DBlock dblk = new DBlock(blk); + dblk.addLocation(storage); + return dblk; + } + + @Override + public long getNumBytes(StorageGroup storage) { + return getInternalBlock(storage).getNumBytes(); + } } /** The class represents a desired move. */ @@ -476,7 +521,7 @@ public class Dispatcher { private PendingMove addPendingMove(DBlock block, final PendingMove pm) { if (getDDatanode().addPendingBlock(pm)) { if (pm.markMovedIfGoodBlock(block, getStorageType())) { - incScheduledSize(pm.block.getNumBytes()); + incScheduledSize(pm.reportedBlock.getNumBytes()); return pm; } else { getDDatanode().removePendingBlock(pm); @@ -651,24 +696,39 @@ public class Dispatcher { */ private long getBlockList() throws IOException { final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive); - final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size); + final BlocksWithLocations newBlksLocs = + nnc.getBlocks(getDatanodeInfo(), size); if (LOG.isTraceEnabled()) { LOG.trace("getBlocks(" + getDatanodeInfo() + ", " + StringUtils.TraditionalBinaryPrefix.long2String(size, "B", 2) - + ") returns " + newBlocks.getBlocks().length + " blocks."); + + ") returns " + newBlksLocs.getBlocks().length + " blocks."); } long bytesReceived = 0; - for (BlockWithLocations blk : newBlocks.getBlocks()) { - bytesReceived += blk.getBlock().getNumBytes(); + for (BlockWithLocations blkLocs : newBlksLocs.getBlocks()) { + + DBlock block; + if (blkLocs instanceof StripedBlockWithLocations) { + StripedBlockWithLocations sblkLocs = + (StripedBlockWithLocations) blkLocs; + // approximate size + bytesReceived += sblkLocs.getBlock().getNumBytes() / + sblkLocs.getDataBlockNum(); + block = new DBlockStriped(sblkLocs.getBlock(), sblkLocs.getIndices(), + sblkLocs.getDataBlockNum()); + } else{ + bytesReceived += blkLocs.getBlock().getNumBytes(); + block = new DBlock(blkLocs.getBlock()); + } + synchronized (globalBlocks) { - final DBlock block = globalBlocks.get(blk.getBlock()); + block = globalBlocks.putIfAbsent(blkLocs.getBlock(), block); synchronized (block) { block.clearLocations(); // update locations - final String[] datanodeUuids = blk.getDatanodeUuids(); - final StorageType[] storageTypes = blk.getStorageTypes(); + final String[] datanodeUuids = blkLocs.getDatanodeUuids(); + final StorageType[] storageTypes = blkLocs.getStorageTypes(); for (int i = 0; i < datanodeUuids.length; i++) { final StorageGroup g = storageGroupMap.get( datanodeUuids[i], storageTypes[i]); @@ -707,6 +767,8 @@ public class Dispatcher { * target throttling has been considered. They are chosen only when they * have the capacity to support this block move. The block should be * dispatched immediately after this method is returned. + * If the block is a block group. Only the internal block on this source + * will be dispatched. * * @return a move that's good for the source to dispatch immediately. */ @@ -718,7 +780,7 @@ public class Dispatcher { if (target.addPendingBlock(pendingBlock)) { // target is not busy, so do a tentative block allocation if (pendingBlock.chooseBlockAndProxy()) { - long blockSize = pendingBlock.block.getNumBytes(); + long blockSize = pendingBlock.reportedBlock.getNumBytes(this); incScheduledSize(-blockSize); task.size -= blockSize; if (task.size == 0) { @@ -793,7 +855,7 @@ public class Dispatcher { blocksToReceive -= getBlockList(); continue; } catch (IOException e) { - LOG.warn("Exception while getting block list", e); + LOG.warn("Exception while getting reportedBlock list", e); return; } } else { @@ -937,7 +999,7 @@ public class Dispatcher { public void executePendingMove(final PendingMove p) { - // move the block + // move the reportedBlock final DDatanode targetDn = p.target.getDDatanode(); ExecutorService moveExecutor = targetDn.getMoveExecutor(); if (moveExecutor == null) { @@ -995,17 +1057,17 @@ public class Dispatcher { } } - // wait for all block moving to be done + // wait for all reportedBlock moving to be done waitForMoveCompletion(targets); return getBytesMoved() - bytesLastMoved; } - /** The sleeping period before checking if block move is completed again */ + /** The sleeping period before checking if reportedBlock move is completed again */ static private long blockMoveWaitTime = 30000L; /** - * Wait for all block move confirmations. + * Wait for all reportedBlock move confirmations. * @return true if there is failed move execution */ public static boolean waitForMoveCompletion( @@ -1044,10 +1106,10 @@ public class Dispatcher { } /** - * Decide if the block is a good candidate to be moved from source to target. - * A block is a good candidate if + * Decide if the block/blockGroup is a good candidate to be moved from source + * to target. A block is a good candidate if * 1. the block is not in the process of being moved/has not been moved; - * 2. the block does not have a replica on the target; + * 2. the block does not have a replica/internalBlock on the target; * 3. doing the move does not reduce the number of racks that the block has */ private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target, @@ -1064,7 +1126,7 @@ public class Dispatcher { } final DatanodeInfo targetDatanode = target.getDatanodeInfo(); if (source.getDatanodeInfo().equals(targetDatanode)) { - // the block is moved inside same DN + // the reportedBlock is moved inside same DN return true; } @@ -1152,7 +1214,7 @@ public class Dispatcher { movedBlocks.cleanup(); } - /** set the sleeping period for block move completion check */ + /** set the sleeping period for reportedBlock move completion check */ @VisibleForTesting public static void setBlockMoveWaitTime(long time) { blockMoveWaitTime = time; http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java index 02a1d05..928424b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java @@ -39,12 +39,12 @@ public interface BlockCollection { public ContentSummary computeContentSummary(BlockStoragePolicySuite bsps); /** - * @return the number of blocks + * @return the number of blocks or block groups */ public int numBlocks(); /** - * Get the blocks. + * Get the blocks (striped or contiguous). */ public BlockInfo[] getBlocks(); @@ -55,8 +55,8 @@ public interface BlockCollection { public long getPreferredBlockSize(); /** - * Get block replication for the collection - * @return block replication value + * Get block replication for the collection. + * @return block replication value. Return 0 if the file is erasure coded. */ public short getPreferredBlockReplication(); @@ -71,7 +71,7 @@ public interface BlockCollection { public String getName(); /** - * Set the block at the given index. + * Set the block (contiguous or striped) at the given index. */ public void setBlock(int index, BlockInfo blk); @@ -79,11 +79,16 @@ public interface BlockCollection { * Convert the last block of the collection to an under-construction block * and set the locations. */ - public BlockInfoContiguousUnderConstruction setLastBlock(BlockInfo lastBlock, + public void convertLastBlockToUC(BlockInfo lastBlock, DatanodeStorageInfo[] targets) throws IOException; /** * @return whether the block collection is under construction. */ public boolean isUnderConstruction(); + + /** + * @return whether the block collection is in striping format + */ + public boolean isStriped(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java index feebd87..685cfcb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java @@ -54,10 +54,12 @@ public class BlockIdManager { * The global block ID space for this file system. */ private final SequentialBlockIdGenerator blockIdGenerator; + private final SequentialBlockGroupIdGenerator blockGroupIdGenerator; public BlockIdManager(BlockManager blockManager) { this.generationStampV1Limit = HdfsConstants.GRANDFATHER_GENERATION_STAMP; this.blockIdGenerator = new SequentialBlockIdGenerator(blockManager); + this.blockGroupIdGenerator = new SequentialBlockGroupIdGenerator(blockManager); } /** @@ -102,21 +104,38 @@ public class BlockIdManager { } /** - * Sets the maximum allocated block ID for this filesystem. This is + * Sets the maximum allocated contiguous block ID for this filesystem. This is * the basis for allocating new block IDs. */ - public void setLastAllocatedBlockId(long blockId) { + public void setLastAllocatedContiguousBlockId(long blockId) { blockIdGenerator.skipTo(blockId); } /** - * Gets the maximum sequentially allocated block ID for this filesystem + * Gets the maximum sequentially allocated contiguous block ID for this + * filesystem */ - public long getLastAllocatedBlockId() { + public long getLastAllocatedContiguousBlockId() { return blockIdGenerator.getCurrentValue(); } /** + * Sets the maximum allocated striped block ID for this filesystem. This is + * the basis for allocating new block IDs. + */ + public void setLastAllocatedStripedBlockId(long blockId) { + blockGroupIdGenerator.skipTo(blockId); + } + + /** + * Gets the maximum sequentially allocated striped block ID for this + * filesystem + */ + public long getLastAllocatedStripedBlockId() { + return blockGroupIdGenerator.getCurrentValue(); + } + + /** * Sets the current generation stamp for legacy blocks */ public void setGenerationStampV1(long stamp) { @@ -187,10 +206,14 @@ public class BlockIdManager { /** * Increments, logs and then returns the block ID */ - public long nextBlockId() { + public long nextContiguousBlockId() { return blockIdGenerator.nextValue(); } + public long nextStripedBlockId() { + return blockGroupIdGenerator.nextValue(); + } + public boolean isGenStampInFuture(Block block) { if (isLegacyBlock(block)) { return block.getGenerationStamp() > getGenerationStampV1(); @@ -206,4 +229,27 @@ public class BlockIdManager { .LAST_RESERVED_BLOCK_ID); generationStampV1Limit = HdfsConstants.GRANDFATHER_GENERATION_STAMP; } -} \ No newline at end of file + + public static boolean isStripedBlockID(long id) { + return id < 0; + } + + /** + * The last 4 bits of HdfsConstants.BLOCK_GROUP_INDEX_MASK(15) is 1111, + * so the last 4 bits of (~HdfsConstants.BLOCK_GROUP_INDEX_MASK) is 0000 + * and the other 60 bits are 1. Group ID is the first 60 bits of any + * data/parity block id in the same striped block group. + */ + public static long convertToStripedID(long id) { + return id & (~HdfsServerConstants.BLOCK_GROUP_INDEX_MASK); + } + + public static int getBlockIndex(Block reportedBlock) { + return (int) (reportedBlock.getBlockId() & + HdfsServerConstants.BLOCK_GROUP_INDEX_MASK); + } + + SequentialBlockGroupIdGenerator getBlockGroupIdGenerator() { + return blockGroupIdGenerator; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java index dea31c4..a913535 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java @@ -55,16 +55,17 @@ public abstract class BlockInfo extends Block /** * Construct an entry for blocksmap - * @param replication the block's replication factor + * @param size the block's replication factor, or the total number of blocks + * in the block group */ - public BlockInfo(short replication) { - this.triplets = new Object[3*replication]; + public BlockInfo(short size) { + this.triplets = new Object[3*size]; this.bc = null; } - public BlockInfo(Block blk, short replication) { + public BlockInfo(Block blk, short size) { super(blk); - this.triplets = new Object[3*replication]; + this.triplets = new Object[3 * size]; this.bc = null; } @@ -108,7 +109,7 @@ public abstract class BlockInfo extends Block BlockInfo info = (BlockInfo)triplets[index*3+1]; assert info == null || info.getClass().getName().startsWith(BlockInfo.class.getName()) : - "BlockInfo is expected at " + index*3; + "BlockInfo is expected at " + index*3; return info; } @@ -150,7 +151,7 @@ public abstract class BlockInfo extends Block * * @param index - the datanode index * @param to - block to be set to next on the list of blocks - * * @return current next block on the list of blocks + * @return current next block on the list of blocks */ BlockInfo setNext(int index, BlockInfo to) { assert this.triplets != null : "BlockInfo is not initialized"; @@ -167,14 +168,20 @@ public abstract class BlockInfo extends Block } /** - * Count the number of data-nodes the block belongs to. + * Count the number of data-nodes the block currently belongs to (i.e., NN + * has received block reports from the DN). */ public abstract int numNodes(); /** - * Add a {@link DatanodeStorageInfo} location for a block. + * Add a {@link DatanodeStorageInfo} location for a block + * @param storage The storage to add + * @param reportedBlock The block reported from the datanode. This is only + * used by erasure coded blocks, this block's id contains + * information indicating the index of the block in the + * corresponding block group. */ - abstract boolean addStorage(DatanodeStorageInfo storage); + abstract boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock); /** * Remove {@link DatanodeStorageInfo} location for a block @@ -188,6 +195,11 @@ public abstract class BlockInfo extends Block */ abstract void replaceBlock(BlockInfo newBlock); + public abstract boolean isStriped(); + + /** @return true if there is no datanode storage associated with the block */ + abstract boolean hasNoStorage(); + /** * Find specified DatanodeStorageInfo. * @return DatanodeStorageInfo or null if not found. @@ -196,10 +208,9 @@ public abstract class BlockInfo extends Block int len = getCapacity(); for(int idx = 0; idx < len; idx++) { DatanodeStorageInfo cur = getStorageInfo(idx); - if(cur == null) - break; - if(cur.getDatanodeDescriptor() == dn) + if(cur != null && cur.getDatanodeDescriptor() == dn) { return cur; + } } return null; } @@ -215,9 +226,6 @@ public abstract class BlockInfo extends Block if (cur == storageInfo) { return idx; } - if (cur == null) { - break; - } } return -1; } @@ -261,12 +269,15 @@ public abstract class BlockInfo extends Block BlockInfo prev = this.getPrevious(dnIndex); this.setNext(dnIndex, null); this.setPrevious(dnIndex, null); - if(prev != null) + if (prev != null) { prev.setNext(prev.findStorageInfo(storage), next); - if(next != null) + } + if (next != null) { next.setPrevious(next.findStorageInfo(storage), prev); - if(this == head) // removing the head + } + if (this == head) { // removing the head head = next; + } return head; } @@ -295,7 +306,7 @@ public abstract class BlockInfo extends Block /** * BlockInfo represents a block that is not being constructed. * In order to start modifying the block, the BlockInfo should be converted - * to {@link BlockInfoContiguousUnderConstruction}. + * to {@link BlockInfoUnderConstruction}. * @return {@link BlockUCState#COMPLETE} */ public BlockUCState getBlockUCState() { @@ -311,28 +322,6 @@ public abstract class BlockInfo extends Block return getBlockUCState().equals(BlockUCState.COMPLETE); } - /** - * Convert a complete block to an under construction block. - * @return BlockInfoUnderConstruction - an under construction block. - */ - public BlockInfoContiguousUnderConstruction convertToBlockUnderConstruction( - BlockUCState s, DatanodeStorageInfo[] targets) { - if(isComplete()) { - BlockInfoContiguousUnderConstruction ucBlock = - new BlockInfoContiguousUnderConstruction(this, - getBlockCollection().getPreferredBlockReplication(), s, targets); - ucBlock.setBlockCollection(getBlockCollection()); - return ucBlock; - } - // the block is already under construction - BlockInfoContiguousUnderConstruction ucBlock = - (BlockInfoContiguousUnderConstruction)this; - ucBlock.setBlockUCState(s); - ucBlock.setExpectedLocations(targets); - ucBlock.setBlockCollection(getBlockCollection()); - return ucBlock; - } - @Override public int hashCode() { // Super implementation is sufficient http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc0a6173/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java index eff89a8..7e03ca5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; /** * Subclass of {@link BlockInfo}, used for a block with replication scheme. @@ -37,11 +38,12 @@ public class BlockInfoContiguous extends BlockInfo { /** * Copy construction. - * This is used to convert BlockReplicationInfoUnderConstruction - * @param from BlockReplicationInfo to copy from. + * This is used to convert BlockInfoContiguous + * @param from BlockInfoContiguous to copy from. */ protected BlockInfoContiguous(BlockInfoContiguous from) { - super(from); + this(from, (short) (from.triplets.length / 3)); + this.setBlockCollection(from.getBlockCollection()); } /** @@ -63,7 +65,7 @@ public class BlockInfoContiguous extends BlockInfo { } @Override - boolean addStorage(DatanodeStorageInfo storage) { + boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) { // find the last null node int lastNode = ensureCapacity(1); setStorageInfo(lastNode, storage); @@ -115,9 +117,42 @@ public class BlockInfoContiguous extends BlockInfo { assert removed : "currentBlock not found."; final DatanodeStorageInfo.AddBlockResult result = storage.addBlock( - newBlock); + newBlock, newBlock); assert result == DatanodeStorageInfo.AddBlockResult.ADDED : "newBlock already exists."; } } + + /** + * Convert a complete block to an under construction block. + * + * @return BlockInfoUnderConstruction - an under construction block. + */ + public BlockInfoContiguousUnderConstruction convertToBlockUnderConstruction( + BlockUCState s, DatanodeStorageInfo[] targets) { + if (isComplete()) { + BlockInfoContiguousUnderConstruction ucBlock = + new BlockInfoContiguousUnderConstruction(this, + getBlockCollection().getPreferredBlockReplication(), s, targets); + ucBlock.setBlockCollection(getBlockCollection()); + return ucBlock; + } + // the block is already under construction + BlockInfoContiguousUnderConstruction ucBlock = + (BlockInfoContiguousUnderConstruction) this; + ucBlock.setBlockUCState(s); + ucBlock.setExpectedLocations(targets); + ucBlock.setBlockCollection(getBlockCollection()); + return ucBlock; + } + + @Override + public final boolean isStriped() { + return false; + } + + @Override + final boolean hasNoStorage() { + return getStorageInfo(0) == null; + } }