Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DA842175AF for ; Tue, 24 Mar 2015 18:41:08 +0000 (UTC) Received: (qmail 68878 invoked by uid 500); 24 Mar 2015 18:40:51 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 68380 invoked by uid 500); 24 Mar 2015 18:40:51 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 66350 invoked by uid 99); 24 Mar 2015 18:40:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Mar 2015 18:40:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2F84FE18F8; Tue, 24 Mar 2015 18:40:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhz@apache.org To: common-commits@hadoop.apache.org Date: Tue, 24 Mar 2015 18:41:18 -0000 Message-Id: <4d5e4da60a8640fe84fbb685e1cd0eb8@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [30/50] [abbrv] hadoop git commit: HDFS-7716. Erasure Coding: extend BlockInfo to handle EC info. Contributed by Jing Zhao. HDFS-7716. Erasure Coding: extend BlockInfo to handle EC info. Contributed by Jing Zhao. Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/249ca621 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/249ca621 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/249ca621 Branch: refs/heads/HDFS-7285 Commit: 249ca621cfbc8c23d2da36eb9b7336349aef69bd Parents: 97ce9b9 Author: Jing Zhao Authored: Tue Feb 10 17:54:10 2015 -0800 Committer: Zhe Zhang Committed: Tue Mar 24 11:16:32 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hdfs/protocol/HdfsConstants.java | 1 + .../server/blockmanagement/BlockCollection.java | 13 +- .../server/blockmanagement/BlockIdManager.java | 7 +- .../hdfs/server/blockmanagement/BlockInfo.java | 339 +++++++++++++++++ .../blockmanagement/BlockInfoContiguous.java | 363 +++---------------- .../BlockInfoContiguousUnderConstruction.java | 137 +------ .../blockmanagement/BlockInfoStriped.java | 179 +++++++++ .../server/blockmanagement/BlockManager.java | 188 +++++----- .../hdfs/server/blockmanagement/BlocksMap.java | 46 +-- .../CacheReplicationMonitor.java | 10 +- .../blockmanagement/DatanodeDescriptor.java | 22 +- .../blockmanagement/DatanodeStorageInfo.java | 38 +- .../ReplicaUnderConstruction.java | 119 ++++++ .../hdfs/server/namenode/FSDirectory.java | 4 +- .../hdfs/server/namenode/FSNamesystem.java | 24 +- .../hdfs/server/namenode/NamenodeFsck.java | 3 +- .../snapshot/FSImageFormatPBSnapshot.java | 4 +- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 4 +- .../server/blockmanagement/TestBlockInfo.java | 6 +- .../blockmanagement/TestBlockInfoStriped.java | 219 +++++++++++ .../blockmanagement/TestBlockManager.java | 4 +- .../blockmanagement/TestReplicationPolicy.java | 2 +- 22 files changed, 1125 insertions(+), 607 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/249ca621/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java index de60b6e..245b630 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java @@ -184,5 +184,6 @@ public class HdfsConstants { public static final byte NUM_DATA_BLOCKS = 3; public static final byte NUM_PARITY_BLOCKS = 2; + public static final long BLOCK_GROUP_INDEX_MASK = 15; public static final byte MAX_BLOCKS_IN_GROUP = 16; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/249ca621/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java index 1547611..974cac3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java @@ -39,12 +39,12 @@ public interface BlockCollection { public ContentSummary computeContentSummary(); /** - * @return the number of blocks + * @return the number of blocks or block groups */ public int numBlocks(); /** - * Get the blocks. + * Get the blocks or block groups. */ public BlockInfoContiguous[] getBlocks(); @@ -55,8 +55,8 @@ public interface BlockCollection { public long getPreferredBlockSize(); /** - * Get block replication for the collection - * @return block replication value + * Get block replication for the collection. + * @return block replication value. Return 0 if the file is erasure coded. */ public short getBlockReplication(); @@ -71,7 +71,7 @@ public interface BlockCollection { public String getName(); /** - * Set the block at the given index. + * Set the block/block-group at the given index. */ public void setBlock(int index, BlockInfoContiguous blk); @@ -79,7 +79,8 @@ public interface BlockCollection { * Convert the last block of the collection to an under-construction block * and set the locations. */ - public BlockInfoContiguousUnderConstruction setLastBlock(BlockInfoContiguous lastBlock, + public BlockInfoContiguousUnderConstruction setLastBlock( + BlockInfoContiguous lastBlock, DatanodeStorageInfo[] targets) throws IOException; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/249ca621/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java index e7f8a05..3ae54ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockIdManager.java @@ -217,6 +217,11 @@ public class BlockIdManager { } public static long convertToGroupID(long id) { - return id & (~(HdfsConstants.MAX_BLOCKS_IN_GROUP - 1)); + return id & (~HdfsConstants.BLOCK_GROUP_INDEX_MASK); + } + + public static int getBlockIndex(Block reportedBlock) { + return (int) (reportedBlock.getBlockId() & + HdfsConstants.BLOCK_GROUP_INDEX_MASK); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/249ca621/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java new file mode 100644 index 0000000..f19ad32 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.util.LightWeightGSet; + +import java.util.LinkedList; + +/** + * For a given block (or an erasure coding block group), BlockInfo class + * maintains 1) the {@link BlockCollection} it is part of, and 2) datanodes + * where the replicas of the block, or blocks belonging to the erasure coding + * block group, are stored. + */ +public abstract class BlockInfo extends Block + implements LightWeightGSet.LinkedElement { + private BlockCollection bc; + + /** For implementing {@link LightWeightGSet.LinkedElement} interface */ + private LightWeightGSet.LinkedElement nextLinkedElement; + + /** + * This array contains triplets of references. For each i-th storage, the + * block belongs to triplets[3*i] is the reference to the + * {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are + * references to the previous and the next blocks, respectively, in the list + * of blocks belonging to this storage. + * + * Using previous and next in Object triplets is done instead of a + * {@link LinkedList} list to efficiently use memory. With LinkedList the cost + * per replica is 42 bytes (LinkedList#Entry object per replica) versus 16 + * bytes using the triplets. + */ + protected Object[] triplets; + + /** + * Construct an entry for blocksmap + * @param size the block's replication factor, or the total number of blocks + * in the block group + */ + public BlockInfo(short size) { + this.triplets = new Object[3 * size]; + this.bc = null; + } + + public BlockInfo(Block blk, short size) { + super(blk); + this.triplets = new Object[3 * size]; + this.bc = null; + } + + public BlockCollection getBlockCollection() { + return bc; + } + + public void setBlockCollection(BlockCollection bc) { + this.bc = bc; + } + + public DatanodeDescriptor getDatanode(int index) { + DatanodeStorageInfo storage = getStorageInfo(index); + return storage == null ? null : storage.getDatanodeDescriptor(); + } + + DatanodeStorageInfo getStorageInfo(int index) { + assert this.triplets != null : "BlockInfo is not initialized"; + assert index >= 0 && index*3 < triplets.length : "Index is out of bound"; + return (DatanodeStorageInfo)triplets[index*3]; + } + + BlockInfo getPrevious(int index) { + assert this.triplets != null : "BlockInfo is not initialized"; + assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound"; + return (BlockInfo) triplets[index*3+1]; + } + + BlockInfo getNext(int index) { + assert this.triplets != null : "BlockInfo is not initialized"; + assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound"; + return (BlockInfo) triplets[index*3+2]; + } + + void setStorageInfo(int index, DatanodeStorageInfo storage) { + assert this.triplets != null : "BlockInfo is not initialized"; + assert index >= 0 && index*3 < triplets.length : "Index is out of bound"; + triplets[index*3] = storage; + } + + /** + * Return the previous block on the block list for the datanode at + * position index. Set the previous block on the list to "to". + * + * @param index - the datanode index + * @param to - block to be set to previous on the list of blocks + * @return current previous block on the list of blocks + */ + BlockInfo setPrevious(int index, BlockInfo to) { + assert this.triplets != null : "BlockInfo is not initialized"; + assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound"; + BlockInfo info = (BlockInfo) triplets[index*3+1]; + triplets[index*3+1] = to; + return info; + } + + /** + * Return the next block on the block list for the datanode at + * position index. Set the next block on the list to "to". + * + * @param index - the datanode index + * @param to - block to be set to next on the list of blocks + * @return current next block on the list of blocks + */ + BlockInfo setNext(int index, BlockInfo to) { + assert this.triplets != null : "BlockInfo is not initialized"; + assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound"; + BlockInfo info = (BlockInfo) triplets[index*3+2]; + triplets[index*3+2] = to; + return info; + } + + public int getCapacity() { + assert this.triplets != null : "BlockInfo is not initialized"; + assert triplets.length % 3 == 0 : "Malformed BlockInfo"; + return triplets.length / 3; + } + + /** + * Count the number of data-nodes the block currently belongs to (i.e., NN + * has received block reports from the DN). + */ + public abstract int numNodes(); + + /** + * Add a {@link DatanodeStorageInfo} location for a block + * @param storage The storage to add + * @param reportedBlock The block reported from the datanode. This is only + * used by erasure coded blocks, this block's id contains + * information indicating the index of the block in the + * corresponding block group. + */ + abstract boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock); + + /** + * Remove {@link DatanodeStorageInfo} location for a block + */ + abstract boolean removeStorage(DatanodeStorageInfo storage); + + /** + * Replace the current BlockInfo with the new one in corresponding + * DatanodeStorageInfo's linked list + */ + abstract void replaceBlock(BlockInfo newBlock); + + /** + * Find specified DatanodeDescriptor. + * @return index or -1 if not found. + */ + boolean findDatanode(DatanodeDescriptor dn) { + int len = getCapacity(); + for (int idx = 0; idx < len; idx++) { + DatanodeDescriptor cur = getDatanode(idx); + if(cur == dn) { + return true; + } + } + return false; + } + + /** + * Find specified DatanodeStorageInfo. + * @return DatanodeStorageInfo or null if not found. + */ + DatanodeStorageInfo findStorageInfo(DatanodeDescriptor dn) { + int len = getCapacity(); + for(int idx = 0; idx < len; idx++) { + DatanodeStorageInfo cur = getStorageInfo(idx); + if(cur != null && cur.getDatanodeDescriptor() == dn) { + return cur; + } + } + return null; + } + + /** + * Find specified DatanodeStorageInfo. + * @return index or -1 if not found. + */ + int findStorageInfo(DatanodeStorageInfo storageInfo) { + int len = getCapacity(); + for(int idx = 0; idx < len; idx++) { + DatanodeStorageInfo cur = getStorageInfo(idx); + if (cur == storageInfo) { + return idx; + } + } + return -1; + } + + /** + * Insert this block into the head of the list of blocks + * related to the specified DatanodeStorageInfo. + * If the head is null then form a new list. + * @return current block as the new head of the list. + */ + BlockInfo listInsert(BlockInfo head, DatanodeStorageInfo storage) { + int dnIndex = this.findStorageInfo(storage); + assert dnIndex >= 0 : "Data node is not found: current"; + assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : + "Block is already in the list and cannot be inserted."; + this.setPrevious(dnIndex, null); + this.setNext(dnIndex, head); + if (head != null) { + head.setPrevious(head.findStorageInfo(storage), this); + } + return this; + } + + /** + * Remove this block from the list of blocks + * related to the specified DatanodeStorageInfo. + * If this block is the head of the list then return the next block as + * the new head. + * @return the new head of the list or null if the list becomes + * empy after deletion. + */ + BlockInfo listRemove(BlockInfo head, DatanodeStorageInfo storage) { + if (head == null) { + return null; + } + int dnIndex = this.findStorageInfo(storage); + if (dnIndex < 0) { // this block is not on the data-node list + return head; + } + + BlockInfo next = this.getNext(dnIndex); + BlockInfo prev = this.getPrevious(dnIndex); + this.setNext(dnIndex, null); + this.setPrevious(dnIndex, null); + if (prev != null) { + prev.setNext(prev.findStorageInfo(storage), next); + } + if (next != null) { + next.setPrevious(next.findStorageInfo(storage), prev); + } + if (this == head) { // removing the head + head = next; + } + return head; + } + + /** + * Remove this block from the list of blocks related to the specified + * DatanodeDescriptor. Insert it into the head of the list of blocks. + * + * @return the new head of the list. + */ + public BlockInfo moveBlockToHead(BlockInfo head, DatanodeStorageInfo storage, + int curIndex, int headIndex) { + if (head == this) { + return this; + } + BlockInfo next = this.setNext(curIndex, head); + BlockInfo prev = this.setPrevious(curIndex, null); + + head.setPrevious(headIndex, this); + prev.setNext(prev.findStorageInfo(storage), next); + if (next != null) { + next.setPrevious(next.findStorageInfo(storage), prev); + } + return this; + } + + /** + * BlockInfo represents a block that is not being constructed. + * In order to start modifying the block, the BlockInfo should be converted + * to {@link BlockInfoContiguousUnderConstruction}. + * @return {@link HdfsServerConstants.BlockUCState#COMPLETE} + */ + public HdfsServerConstants.BlockUCState getBlockUCState() { + return HdfsServerConstants.BlockUCState.COMPLETE; + } + + /** + * Is this block complete? + * + * @return true if the state of the block is + * {@link HdfsServerConstants.BlockUCState#COMPLETE} + */ + public boolean isComplete() { + return getBlockUCState().equals(HdfsServerConstants.BlockUCState.COMPLETE); + } + + @Override + public int hashCode() { + // Super implementation is sufficient + return super.hashCode(); + } + + @Override + public boolean equals(Object obj) { + // Sufficient to rely on super's implementation + return (this == obj) || super.equals(obj); + } + + @Override + public LightWeightGSet.LinkedElement getNext() { + return nextLinkedElement; + } + + @Override + public void setNext(LightWeightGSet.LinkedElement next) { + this.nextLinkedElement = next; + } + + static BlockInfo copyOf(BlockInfo b) { + if (b instanceof BlockInfoContiguous) { + return new BlockInfoContiguous((BlockInfoContiguous) b); + } else { + return new BlockInfoStriped((BlockInfoStriped) b); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/249ca621/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java index 48069c1..e54cba3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java @@ -17,148 +17,33 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; -import java.util.LinkedList; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; -import org.apache.hadoop.util.LightWeightGSet; /** - * BlockInfo class maintains for a given block - * the {@link BlockCollection} it is part of and datanodes where the replicas of - * the block are stored. + * Subclass of {@link BlockInfo}, used for a block with replication scheme. */ @InterfaceAudience.Private -public class BlockInfoContiguous extends Block - implements LightWeightGSet.LinkedElement { +public class BlockInfoContiguous extends BlockInfo { public static final BlockInfoContiguous[] EMPTY_ARRAY = {}; - private BlockCollection bc; - - /** For implementing {@link LightWeightGSet.LinkedElement} interface */ - private LightWeightGSet.LinkedElement nextLinkedElement; - - /** - * This array contains triplets of references. For each i-th storage, the - * block belongs to triplets[3*i] is the reference to the - * {@link DatanodeStorageInfo} and triplets[3*i+1] and triplets[3*i+2] are - * references to the previous and the next blocks, respectively, in the list - * of blocks belonging to this storage. - * - * Using previous and next in Object triplets is done instead of a - * {@link LinkedList} list to efficiently use memory. With LinkedList the cost - * per replica is 42 bytes (LinkedList#Entry object per replica) versus 16 - * bytes using the triplets. - */ - private Object[] triplets; - - /** - * Construct an entry for blocksmap - * @param replication the block's replication factor - */ - public BlockInfoContiguous(short replication) { - this.triplets = new Object[3*replication]; - this.bc = null; + public BlockInfoContiguous(short size) { + super(size); } - - public BlockInfoContiguous(Block blk, short replication) { - super(blk); - this.triplets = new Object[3*replication]; - this.bc = null; + + public BlockInfoContiguous(Block blk, short size) { + super(blk, size); } /** * Copy construction. - * This is used to convert BlockInfoUnderConstruction - * @param from BlockInfo to copy from. + * This is used to convert BlockReplicationInfoUnderConstruction + * @param from BlockReplicationInfo to copy from. */ protected BlockInfoContiguous(BlockInfoContiguous from) { - this(from, from.bc.getBlockReplication()); - this.bc = from.bc; - } - - public BlockCollection getBlockCollection() { - return bc; - } - - public void setBlockCollection(BlockCollection bc) { - this.bc = bc; - } - - public DatanodeDescriptor getDatanode(int index) { - DatanodeStorageInfo storage = getStorageInfo(index); - return storage == null ? null : storage.getDatanodeDescriptor(); - } - - DatanodeStorageInfo getStorageInfo(int index) { - assert this.triplets != null : "BlockInfo is not initialized"; - assert index >= 0 && index*3 < triplets.length : "Index is out of bound"; - return (DatanodeStorageInfo)triplets[index*3]; - } - - private BlockInfoContiguous getPrevious(int index) { - assert this.triplets != null : "BlockInfo is not initialized"; - assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound"; - BlockInfoContiguous info = (BlockInfoContiguous)triplets[index*3+1]; - assert info == null || - info.getClass().getName().startsWith(BlockInfoContiguous.class.getName()) : - "BlockInfo is expected at " + index*3; - return info; - } - - BlockInfoContiguous getNext(int index) { - assert this.triplets != null : "BlockInfo is not initialized"; - assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound"; - BlockInfoContiguous info = (BlockInfoContiguous)triplets[index*3+2]; - assert info == null || info.getClass().getName().startsWith( - BlockInfoContiguous.class.getName()) : - "BlockInfo is expected at " + index*3; - return info; - } - - private void setStorageInfo(int index, DatanodeStorageInfo storage) { - assert this.triplets != null : "BlockInfo is not initialized"; - assert index >= 0 && index*3 < triplets.length : "Index is out of bound"; - triplets[index*3] = storage; - } - - /** - * Return the previous block on the block list for the datanode at - * position index. Set the previous block on the list to "to". - * - * @param index - the datanode index - * @param to - block to be set to previous on the list of blocks - * @return current previous block on the list of blocks - */ - private BlockInfoContiguous setPrevious(int index, BlockInfoContiguous to) { - assert this.triplets != null : "BlockInfo is not initialized"; - assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound"; - BlockInfoContiguous info = (BlockInfoContiguous)triplets[index*3+1]; - triplets[index*3+1] = to; - return info; - } - - /** - * Return the next block on the block list for the datanode at - * position index. Set the next block on the list to "to". - * - * @param index - the datanode index - * @param to - block to be set to next on the list of blocks - * * @return current next block on the list of blocks - */ - private BlockInfoContiguous setNext(int index, BlockInfoContiguous to) { - assert this.triplets != null : "BlockInfo is not initialized"; - assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound"; - BlockInfoContiguous info = (BlockInfoContiguous)triplets[index*3+2]; - triplets[index*3+2] = to; - return info; - } - - public int getCapacity() { - assert this.triplets != null : "BlockInfo is not initialized"; - assert triplets.length % 3 == 0 : "Malformed BlockInfo"; - return triplets.length / 3; + this(from, from.getBlockCollection().getBlockReplication()); + this.setBlockCollection(from.getBlockCollection()); } /** @@ -168,9 +53,10 @@ public class BlockInfoContiguous extends Block private int ensureCapacity(int num) { assert this.triplets != null : "BlockInfo is not initialized"; int last = numNodes(); - if(triplets.length >= (last+num)*3) + if (triplets.length >= (last+num)*3) { return last; - /* Not enough space left. Create a new array. Should normally + } + /* Not enough space left. Create a new array. Should normally * happen only when replication is manually increased by the user. */ Object[] old = triplets; triplets = new Object[(last+num)*3]; @@ -178,23 +64,8 @@ public class BlockInfoContiguous extends Block return last; } - /** - * Count the number of data-nodes the block belongs to. - */ - public int numNodes() { - assert this.triplets != null : "BlockInfo is not initialized"; - assert triplets.length % 3 == 0 : "Malformed BlockInfo"; - for(int idx = getCapacity()-1; idx >= 0; idx--) { - if(getDatanode(idx) != null) - return idx+1; - } - return 0; - } - - /** - * Add a {@link DatanodeStorageInfo} location for a block - */ - boolean addStorage(DatanodeStorageInfo storage) { + @Override + boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) { // find the last null node int lastNode = ensureCapacity(1); setStorageInfo(lastNode, storage); @@ -203,167 +74,53 @@ public class BlockInfoContiguous extends Block return true; } - /** - * Remove {@link DatanodeStorageInfo} location for a block - */ + @Override boolean removeStorage(DatanodeStorageInfo storage) { int dnIndex = findStorageInfo(storage); - if(dnIndex < 0) // the node is not found + if (dnIndex < 0) { // the node is not found return false; - assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : - "Block is still in the list and must be removed first."; + } + assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : + "Block is still in the list and must be removed first."; // find the last not null node - int lastNode = numNodes()-1; - // replace current node triplet by the lastNode one + int lastNode = numNodes()-1; + // replace current node triplet by the lastNode one setStorageInfo(dnIndex, getStorageInfo(lastNode)); - setNext(dnIndex, getNext(lastNode)); - setPrevious(dnIndex, getPrevious(lastNode)); + setNext(dnIndex, getNext(lastNode)); + setPrevious(dnIndex, getPrevious(lastNode)); // set the last triplet to null setStorageInfo(lastNode, null); - setNext(lastNode, null); - setPrevious(lastNode, null); + setNext(lastNode, null); + setPrevious(lastNode, null); return true; } - /** - * Find specified DatanodeDescriptor. - * @return index or -1 if not found. - */ - boolean findDatanode(DatanodeDescriptor dn) { - int len = getCapacity(); - for(int idx = 0; idx < len; idx++) { - DatanodeDescriptor cur = getDatanode(idx); - if(cur == dn) { - return true; - } - if(cur == null) { - break; - } - } - return false; - } + @Override + public int numNodes() { + assert this.triplets != null : "BlockInfo is not initialized"; + assert triplets.length % 3 == 0 : "Malformed BlockInfo"; - /** - * Find specified DatanodeStorageInfo. - * @return DatanodeStorageInfo or null if not found. - */ - DatanodeStorageInfo findStorageInfo(DatanodeDescriptor dn) { - int len = getCapacity(); - for(int idx = 0; idx < len; idx++) { - DatanodeStorageInfo cur = getStorageInfo(idx); - if(cur == null) - break; - if(cur.getDatanodeDescriptor() == dn) - return cur; - } - return null; - } - - /** - * Find specified DatanodeStorageInfo. - * @return index or -1 if not found. - */ - int findStorageInfo(DatanodeStorageInfo storageInfo) { - int len = getCapacity(); - for(int idx = 0; idx < len; idx++) { - DatanodeStorageInfo cur = getStorageInfo(idx); - if (cur == storageInfo) { - return idx; - } - if (cur == null) { - break; + for (int idx = getCapacity()-1; idx >= 0; idx--) { + if (getDatanode(idx) != null) { + return idx + 1; } } - return -1; - } - - /** - * Insert this block into the head of the list of blocks - * related to the specified DatanodeStorageInfo. - * If the head is null then form a new list. - * @return current block as the new head of the list. - */ - BlockInfoContiguous listInsert(BlockInfoContiguous head, - DatanodeStorageInfo storage) { - int dnIndex = this.findStorageInfo(storage); - assert dnIndex >= 0 : "Data node is not found: current"; - assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : - "Block is already in the list and cannot be inserted."; - this.setPrevious(dnIndex, null); - this.setNext(dnIndex, head); - if(head != null) - head.setPrevious(head.findStorageInfo(storage), this); - return this; - } - - /** - * Remove this block from the list of blocks - * related to the specified DatanodeStorageInfo. - * If this block is the head of the list then return the next block as - * the new head. - * @return the new head of the list or null if the list becomes - * empy after deletion. - */ - BlockInfoContiguous listRemove(BlockInfoContiguous head, - DatanodeStorageInfo storage) { - if(head == null) - return null; - int dnIndex = this.findStorageInfo(storage); - if(dnIndex < 0) // this block is not on the data-node list - return head; - - BlockInfoContiguous next = this.getNext(dnIndex); - BlockInfoContiguous prev = this.getPrevious(dnIndex); - this.setNext(dnIndex, null); - this.setPrevious(dnIndex, null); - if(prev != null) - prev.setNext(prev.findStorageInfo(storage), next); - if(next != null) - next.setPrevious(next.findStorageInfo(storage), prev); - if(this == head) // removing the head - head = next; - return head; + return 0; } - /** - * Remove this block from the list of blocks related to the specified - * DatanodeDescriptor. Insert it into the head of the list of blocks. - * - * @return the new head of the list. - */ - public BlockInfoContiguous moveBlockToHead(BlockInfoContiguous head, - DatanodeStorageInfo storage, int curIndex, int headIndex) { - if (head == this) { - return this; - } - BlockInfoContiguous next = this.setNext(curIndex, head); - BlockInfoContiguous prev = this.setPrevious(curIndex, null); - - head.setPrevious(headIndex, this); - prev.setNext(prev.findStorageInfo(storage), next); - if (next != null) { - next.setPrevious(next.findStorageInfo(storage), prev); + @Override + void replaceBlock(BlockInfo newBlock) { + assert newBlock instanceof BlockInfoContiguous; + for (int i = this.numNodes() - 1; i >= 0; i--) { + final DatanodeStorageInfo storage = this.getStorageInfo(i); + final boolean removed = storage.removeBlock(this); + assert removed : "currentBlock not found."; + + final DatanodeStorageInfo.AddBlockResult result = storage.addBlock( + newBlock, newBlock); + assert result == DatanodeStorageInfo.AddBlockResult.ADDED : + "newBlock already exists."; } - return this; - } - - /** - * BlockInfo represents a block that is not being constructed. - * In order to start modifying the block, the BlockInfo should be converted - * to {@link BlockInfoContiguousUnderConstruction}. - * @return {@link BlockUCState#COMPLETE} - */ - public BlockUCState getBlockUCState() { - return BlockUCState.COMPLETE; - } - - /** - * Is this block complete? - * - * @return true if the state of the block is {@link BlockUCState#COMPLETE} - */ - public boolean isComplete() { - return getBlockUCState().equals(BlockUCState.COMPLETE); } /** @@ -375,38 +132,16 @@ public class BlockInfoContiguous extends Block if(isComplete()) { BlockInfoContiguousUnderConstruction ucBlock = new BlockInfoContiguousUnderConstruction(this, - getBlockCollection().getBlockReplication(), s, targets); + getBlockCollection().getBlockReplication(), s, targets); ucBlock.setBlockCollection(getBlockCollection()); return ucBlock; } // the block is already under construction BlockInfoContiguousUnderConstruction ucBlock = - (BlockInfoContiguousUnderConstruction)this; + (BlockInfoContiguousUnderConstruction) this; ucBlock.setBlockUCState(s); ucBlock.setExpectedLocations(targets); ucBlock.setBlockCollection(getBlockCollection()); return ucBlock; } - - @Override - public int hashCode() { - // Super implementation is sufficient - return super.hashCode(); - } - - @Override - public boolean equals(Object obj) { - // Sufficient to rely on super's implementation - return (this == obj) || super.equals(obj); - } - - @Override - public LightWeightGSet.LinkedElement getNext() { - return nextLinkedElement; - } - - @Override - public void setNext(LightWeightGSet.LinkedElement next) { - this.nextLinkedElement = next; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/249ca621/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java index 92153ab..c78c9e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java @@ -60,101 +60,6 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { private Block truncateBlock; /** - * ReplicaUnderConstruction contains information about replicas while - * they are under construction. - * The GS, the length and the state of the replica is as reported by - * the data-node. - * It is not guaranteed, but expected, that data-nodes actually have - * corresponding replicas. - */ - static class ReplicaUnderConstruction extends Block { - private final DatanodeStorageInfo expectedLocation; - private ReplicaState state; - private boolean chosenAsPrimary; - - ReplicaUnderConstruction(Block block, - DatanodeStorageInfo target, - ReplicaState state) { - super(block); - this.expectedLocation = target; - this.state = state; - this.chosenAsPrimary = false; - } - - /** - * Expected block replica location as assigned when the block was allocated. - * This defines the pipeline order. - * It is not guaranteed, but expected, that the data-node actually has - * the replica. - */ - private DatanodeStorageInfo getExpectedStorageLocation() { - return expectedLocation; - } - - /** - * Get replica state as reported by the data-node. - */ - ReplicaState getState() { - return state; - } - - /** - * Whether the replica was chosen for recovery. - */ - boolean getChosenAsPrimary() { - return chosenAsPrimary; - } - - /** - * Set replica state. - */ - void setState(ReplicaState s) { - state = s; - } - - /** - * Set whether this replica was chosen for recovery. - */ - void setChosenAsPrimary(boolean chosenAsPrimary) { - this.chosenAsPrimary = chosenAsPrimary; - } - - /** - * Is data-node the replica belongs to alive. - */ - boolean isAlive() { - return expectedLocation.getDatanodeDescriptor().isAlive; - } - - @Override // Block - public int hashCode() { - return super.hashCode(); - } - - @Override // Block - public boolean equals(Object obj) { - // Sufficient to rely on super's implementation - return (this == obj) || super.equals(obj); - } - - @Override - public String toString() { - final StringBuilder b = new StringBuilder(50); - appendStringTo(b); - return b.toString(); - } - - @Override - public void appendStringTo(StringBuilder sb) { - sb.append("ReplicaUC[") - .append(expectedLocation) - .append("|") - .append(state) - .append("]"); - } - } - - /** * Create block and set its state to * {@link BlockUCState#UNDER_CONSTRUCTION}. */ @@ -165,7 +70,8 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { /** * Create a block that is currently being constructed. */ - public BlockInfoContiguousUnderConstruction(Block blk, short replication, BlockUCState state, DatanodeStorageInfo[] targets) { + public BlockInfoContiguousUnderConstruction(Block blk, short replication, + BlockUCState state, DatanodeStorageInfo[] targets) { super(blk, replication); assert getBlockUCState() != BlockUCState.COMPLETE : "BlockInfoUnderConstruction cannot be in COMPLETE state"; @@ -191,10 +97,11 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { /** Set expected locations */ public void setExpectedLocations(DatanodeStorageInfo[] targets) { int numLocations = targets == null ? 0 : targets.length; - this.replicas = new ArrayList(numLocations); - for(int i = 0; i < numLocations; i++) - replicas.add( - new ReplicaUnderConstruction(this, targets[i], ReplicaState.RBW)); + this.replicas = new ArrayList<>(numLocations); + for(int i = 0; i < numLocations; i++) { + replicas.add(new ReplicaUnderConstruction(this, targets[i], + ReplicaState.RBW)); + } } /** @@ -204,8 +111,9 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { public DatanodeStorageInfo[] getExpectedStorageLocations() { int numLocations = replicas == null ? 0 : replicas.size(); DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations]; - for(int i = 0; i < numLocations; i++) + for (int i = 0; i < numLocations; i++) { storages[i] = replicas.get(i).getExpectedStorageLocation(); + } return storages; } @@ -293,17 +201,17 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { + " No blocks found, lease removed."); } boolean allLiveReplicasTriedAsPrimary = true; - for (int i = 0; i < replicas.size(); i++) { + for (ReplicaUnderConstruction replica : replicas) { // Check if all replicas have been tried or not. - if (replicas.get(i).isAlive()) { - allLiveReplicasTriedAsPrimary = - (allLiveReplicasTriedAsPrimary && replicas.get(i).getChosenAsPrimary()); + if (replica.isAlive()) { + allLiveReplicasTriedAsPrimary = (allLiveReplicasTriedAsPrimary && + replica.getChosenAsPrimary()); } } if (allLiveReplicasTriedAsPrimary) { // Just set all the replicas to be chosen whether they are alive or not. - for (int i = 0; i < replicas.size(); i++) { - replicas.get(i).setChosenAsPrimary(false); + for (ReplicaUnderConstruction replica : replicas) { + replica.setChosenAsPrimary(false); } } long mostRecentLastUpdate = 0; @@ -324,7 +232,8 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { } } if (primary != null) { - primary.getExpectedStorageLocation().getDatanodeDescriptor().addBlockToBeRecovered(this); + primary.getExpectedStorageLocation().getDatanodeDescriptor() + .addBlockToBeRecovered(this); primary.setChosenAsPrimary(true); NameNode.blockStateChangeLog.info( "BLOCK* {} recovery started, primary={}", this, primary); @@ -357,18 +266,6 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { replicas.add(new ReplicaUnderConstruction(block, storage, rState)); } - @Override // BlockInfo - // BlockInfoUnderConstruction participates in maps the same way as BlockInfo - public int hashCode() { - return super.hashCode(); - } - - @Override // BlockInfo - public boolean equals(Object obj) { - // Sufficient to rely on super's implementation - return (this == obj) || super.equals(obj); - } - @Override public String toString() { final StringBuilder b = new StringBuilder(100); http://git-wip-us.apache.org/repos/asf/hadoop/blob/249ca621/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java new file mode 100644 index 0000000..5fff41e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.hdfs.protocol.Block; + +/** + * Subclass of {@link BlockInfo}, presenting a block group in erasure coding. + * + * We still use triplets to store DatanodeStorageInfo for each block in the + * block group, as well as the previous/next block in the corresponding + * DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units + * are sorted and strictly mapped to the corresponding block. + * + * Normally each block belonging to group is stored in only one DataNode. + * However, it is possible that some block is over-replicated. Thus the triplet + * array's size can be larger than (m+k). Thus currently we use an extra byte + * array to record the block index for each triplet. + */ +public class BlockInfoStriped extends BlockInfo { + private final short dataBlockNum; + private final short parityBlockNum; + /** + * Always the same size with triplets. Record the block index for each triplet + * TODO: actually this is only necessary for over-replicated block. Thus can + * be further optimized to save memory usage. + */ + private byte[] indices; + + public BlockInfoStriped(Block blk, short dataBlockNum, short parityBlockNum) { + super(blk, (short) (dataBlockNum + parityBlockNum)); + indices = new byte[dataBlockNum + parityBlockNum]; + initIndices(); + this.dataBlockNum = dataBlockNum; + this.parityBlockNum = parityBlockNum; + } + + BlockInfoStriped(BlockInfoStriped b) { + this(b, b.dataBlockNum, b.parityBlockNum); + this.setBlockCollection(b.getBlockCollection()); + } + + private short getTotalBlockNum() { + return (short) (dataBlockNum + parityBlockNum); + } + + private void initIndices() { + for (int i = 0; i < indices.length; i++) { + indices[i] = -1; + } + } + + private int findSlot() { + int i = getTotalBlockNum(); + for (; i < getCapacity(); i++) { + if (getStorageInfo(i) == null) { + return i; + } + } + // need to expand the triplet size + ensureCapacity(i + 1, true); + return i; + } + + @Override + boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) { + int blockIndex = BlockIdManager.getBlockIndex(reportedBlock); + int index = blockIndex; + DatanodeStorageInfo old = getStorageInfo(index); + if (old != null && !old.equals(storage)) { // over replicated + // check if the storage has been stored + int i = findStorageInfo(storage); + if (i == -1) { + index = findSlot(); + } else { + return true; + } + } + addStorage(storage, index, blockIndex); + return true; + } + + private void addStorage(DatanodeStorageInfo storage, int index, + int blockIndex) { + setStorageInfo(index, storage); + setNext(index, null); + setPrevious(index, null); + indices[index] = (byte) blockIndex; + } + + private int findStorageInfoFromEnd(DatanodeStorageInfo storage) { + final int len = getCapacity(); + for(int idx = len - 1; idx >= 0; idx--) { + DatanodeStorageInfo cur = getStorageInfo(idx); + if (storage.equals(cur)) { + return idx; + } + } + return -1; + } + + @Override + boolean removeStorage(DatanodeStorageInfo storage) { + int dnIndex = findStorageInfoFromEnd(storage); + if (dnIndex < 0) { // the node is not found + return false; + } + assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : + "Block is still in the list and must be removed first."; + // set the triplet to null + setStorageInfo(dnIndex, null); + setNext(dnIndex, null); + setPrevious(dnIndex, null); + indices[dnIndex] = -1; + return true; + } + + private void ensureCapacity(int totalSize, boolean keepOld) { + if (getCapacity() < totalSize) { + Object[] old = triplets; + byte[] oldIndices = indices; + triplets = new Object[totalSize * 3]; + indices = new byte[totalSize]; + initIndices(); + + if (keepOld) { + System.arraycopy(old, 0, triplets, 0, old.length); + System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length); + } + } + } + + @Override + void replaceBlock(BlockInfo newBlock) { + assert newBlock instanceof BlockInfoStriped; + BlockInfoStriped newBlockGroup = (BlockInfoStriped) newBlock; + final int size = getCapacity(); + newBlockGroup.ensureCapacity(size, false); + for (int i = 0; i < size; i++) { + final DatanodeStorageInfo storage = this.getStorageInfo(i); + if (storage != null) { + final int blockIndex = indices[i]; + final boolean removed = storage.removeBlock(this); + assert removed : "currentBlock not found."; + + newBlockGroup.addStorage(storage, i, blockIndex); + storage.insertToList(newBlockGroup); + } + } + } + + @Override + public int numNodes() { + assert this.triplets != null : "BlockInfo is not initialized"; + assert triplets.length % 3 == 0 : "Malformed BlockInfo"; + int num = 0; + for (int idx = getCapacity()-1; idx >= 0; idx--) { + if (getStorageInfo(idx) != null) { + num++; + } + } + return num; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/249ca621/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 10afdaf..5ed06cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -596,8 +596,8 @@ public class BlockManager { * of replicas reported from data-nodes. */ private static boolean commitBlock( - final BlockInfoContiguousUnderConstruction block, final Block commitBlock) - throws IOException { + final BlockInfoContiguousUnderConstruction block, + final Block commitBlock) throws IOException { if (block.getBlockUCState() == BlockUCState.COMMITTED) return false; assert block.getNumBytes() <= commitBlock.getNumBytes() : @@ -628,7 +628,7 @@ public class BlockManager { return false; // already completed (e.g. by syncBlock) final boolean b = commitBlock( - (BlockInfoContiguousUnderConstruction) lastBlock, commitBlock); + (BlockInfoContiguousUnderConstruction)lastBlock, commitBlock); if(countNodes(lastBlock).liveReplicas() >= minReplication) completeBlock(bc, bc.numBlocks()-1, false); return b; @@ -641,15 +641,16 @@ public class BlockManager { * @throws IOException if the block does not have at least a minimal number * of replicas reported from data-nodes. */ - private BlockInfoContiguous completeBlock(final BlockCollection bc, + private BlockInfo completeBlock(final BlockCollection bc, final int blkIndex, boolean force) throws IOException { if(blkIndex < 0) return null; BlockInfoContiguous curBlock = bc.getBlocks()[blkIndex]; - if(curBlock.isComplete()) + if (curBlock.isComplete()) return curBlock; + // TODO: support BlockInfoStripedUC BlockInfoContiguousUnderConstruction ucBlock = - (BlockInfoContiguousUnderConstruction) curBlock; + (BlockInfoContiguousUnderConstruction)curBlock; int numNodes = ucBlock.numNodes(); if (!force && numNodes < minReplication) throw new IOException("Cannot complete block: " + @@ -675,13 +676,15 @@ public class BlockManager { return blocksMap.replaceBlock(completeBlock); } - private BlockInfoContiguous completeBlock(final BlockCollection bc, - final BlockInfoContiguous block, boolean force) throws IOException { + // TODO: support BlockInfoStrippedUC + private BlockInfo completeBlock(final BlockCollection bc, + final BlockInfo block, boolean force) throws IOException { BlockInfoContiguous[] fileBlocks = bc.getBlocks(); - for(int idx = 0; idx < fileBlocks.length; idx++) - if(fileBlocks[idx] == block) { + for (int idx = 0; idx < fileBlocks.length; idx++) { + if (fileBlocks[idx] == block) { return completeBlock(bc, idx, force); } + } return block; } @@ -690,7 +693,7 @@ public class BlockManager { * regardless of whether enough replicas are present. This is necessary * when tailing edit logs as a Standby. */ - public BlockInfoContiguous forceCompleteBlock(final BlockCollection bc, + public BlockInfo forceCompleteBlock(final BlockCollection bc, final BlockInfoContiguousUnderConstruction block) throws IOException { block.commitBlock(block); return completeBlock(bc, block, true); @@ -722,8 +725,8 @@ public class BlockManager { DatanodeStorageInfo[] targets = getStorages(oldBlock); - BlockInfoContiguousUnderConstruction ucBlock = - bc.setLastBlock(oldBlock, targets); + BlockInfoContiguousUnderConstruction ucBlock = bc.setLastBlock(oldBlock, + targets); blocksMap.replaceBlock(ucBlock); // Remove block from replication queue. @@ -1023,7 +1026,7 @@ public class BlockManager { if(numBlocks == 0) { return new BlocksWithLocations(new BlockWithLocations[0]); } - Iterator iter = node.getBlockIterator(); + Iterator iter = node.getBlockIterator(); int startBlock = DFSUtil.getRandom().nextInt(numBlocks); // starting from a random block // skip blocks for(int i=0; i results = new ArrayList(); long totalSize = 0; - BlockInfoContiguous curBlock; + BlockInfo curBlock; while(totalSizedatanode) map, according to the difference // between the old and new block report. // - Collection toAdd = new LinkedList(); + Collection toAdd = new LinkedList<>(); Collection toRemove = new TreeSet(); Collection toInvalidate = new LinkedList(); Collection toCorrupt = new LinkedList(); @@ -1982,8 +2000,9 @@ public class BlockManager { removeStoredBlock(b, node); } int numBlocksLogged = 0; - for (BlockInfoContiguous b : toAdd) { - addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog); + for (BlockInfoToAdd b : toAdd) { + addStoredBlock(b.stored, b.reported, storageInfo, null, + numBlocksLogged < maxNumBlocksToLog); numBlocksLogged++; } if (numBlocksLogged > maxNumBlocksToLog) { @@ -2068,7 +2087,7 @@ public class BlockManager { continue; } - BlockInfoContiguous storedBlock = getStoredBlock(iblk); + BlockInfo storedBlock = getStoredBlock(iblk); // If block does not belong to any file, we are done. if (storedBlock == null) continue; @@ -2091,7 +2110,7 @@ public class BlockManager { // If block is under construction, add this replica to its list if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { - ((BlockInfoContiguousUnderConstruction)storedBlock) + ((BlockInfoContiguousUnderConstruction) storedBlock) .addReplicaIfNotPresent(storageInfo, iblk, reportedState); // OpenFileBlocks only inside snapshots also will be added to safemode // threshold. So we need to update such blocks to safemode @@ -2106,14 +2125,14 @@ public class BlockManager { } //add replica if appropriate if (reportedState == ReplicaState.FINALIZED) { - addStoredBlockImmediate(storedBlock, storageInfo); + addStoredBlockImmediate(storedBlock, iblk, storageInfo); } } } private void reportDiff(DatanodeStorageInfo storageInfo, BlockListAsLongs newReport, - Collection toAdd, // add to DatanodeDescriptor + Collection toAdd, // add to DatanodeDescriptor Collection toRemove, // remove from DatanodeDescriptor Collection toInvalidate, // should be removed from DN Collection toCorrupt, // add to corrupt replicas list @@ -2121,8 +2140,10 @@ public class BlockManager { // place a delimiter in the list which separates blocks // that have been reported from those that have not - BlockInfoContiguous delimiter = new BlockInfoContiguous(new Block(), (short) 1); - AddBlockResult result = storageInfo.addBlock(delimiter); + Block delimiterBlock = new Block(); + BlockInfoContiguous delimiter = new BlockInfoContiguous(delimiterBlock, + (short) 1); + AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock); assert result == AddBlockResult.ADDED : "Delimiting block cannot be present in the node"; int headIndex = 0; //currently the delimiter is in the head of the list @@ -2134,7 +2155,7 @@ public class BlockManager { // scan the report and process newly reported blocks for (BlockReportReplica iblk : newReport) { ReplicaState iState = iblk.getState(); - BlockInfoContiguous storedBlock = processReportedBlock(storageInfo, + BlockInfo storedBlock = processReportedBlock(storageInfo, iblk, iState, toAdd, toInvalidate, toCorrupt, toUC); // move block to the head of the list @@ -2146,8 +2167,7 @@ public class BlockManager { // collect blocks that have not been reported // all of them are next to the delimiter - Iterator it = - storageInfo.new BlockIterator(delimiter.getNext(0)); + Iterator it = storageInfo.new BlockIterator(delimiter.getNext(0)); while(it.hasNext()) toRemove.add(it.next()); storageInfo.removeBlock(delimiter); @@ -2184,10 +2204,10 @@ public class BlockManager { * @return the up-to-date stored block, if it should be kept. * Otherwise, null. */ - private BlockInfoContiguous processReportedBlock( + private BlockInfo processReportedBlock( final DatanodeStorageInfo storageInfo, final Block block, final ReplicaState reportedState, - final Collection toAdd, + final Collection toAdd, final Collection toInvalidate, final Collection toCorrupt, final Collection toUC) { @@ -2208,7 +2228,7 @@ public class BlockManager { } // find block by blockId - BlockInfoContiguous storedBlock = getStoredBlock(block); + BlockInfo storedBlock = getStoredBlock(block); if(storedBlock == null) { // If blocksMap does not contain reported block id, // the replica should be removed from the data-node. @@ -2262,7 +2282,7 @@ public class BlockManager { if (reportedState == ReplicaState.FINALIZED && (storedBlock.findStorageInfo(storageInfo) == -1 || corruptReplicas.isReplicaCorrupt(storedBlock, dn))) { - toAdd.add(storedBlock); + toAdd.add(new BlockInfoToAdd(storedBlock, block)); } return storedBlock; } @@ -2340,7 +2360,7 @@ public class BlockManager { */ private BlockToMarkCorrupt checkReplicaCorrupt( Block reported, ReplicaState reportedState, - BlockInfoContiguous storedBlock, BlockUCState ucState, + BlockInfo storedBlock, BlockUCState ucState, DatanodeDescriptor dn) { switch(reportedState) { case FINALIZED: @@ -2349,12 +2369,12 @@ public class BlockManager { case COMMITTED: if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) { final long reportedGS = reported.getGenerationStamp(); - return new BlockToMarkCorrupt(storedBlock, reportedGS, + return new BlockToMarkCorrupt(reported, storedBlock, reportedGS, "block is " + ucState + " and reported genstamp " + reportedGS + " does not match genstamp in block map " + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); } else if (storedBlock.getNumBytes() != reported.getNumBytes()) { - return new BlockToMarkCorrupt(storedBlock, + return new BlockToMarkCorrupt(reported, storedBlock, "block is " + ucState + " and reported length " + reported.getNumBytes() + " does not match " + "length in block map " + storedBlock.getNumBytes(), @@ -2365,8 +2385,8 @@ public class BlockManager { case UNDER_CONSTRUCTION: if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) { final long reportedGS = reported.getGenerationStamp(); - return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is " - + ucState + " and reported state " + reportedState + return new BlockToMarkCorrupt(reported, storedBlock, reportedGS, + "block is " + ucState + " and reported state " + reportedState + ", But reported genstamp " + reportedGS + " does not match genstamp in block map " + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); @@ -2381,7 +2401,7 @@ public class BlockManager { return null; // not corrupt } else if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) { final long reportedGS = reported.getGenerationStamp(); - return new BlockToMarkCorrupt(storedBlock, reportedGS, + return new BlockToMarkCorrupt(reported, storedBlock, reportedGS, "reported " + reportedState + " replica with genstamp " + reportedGS + " does not match COMPLETE block's genstamp in block map " + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); @@ -2396,7 +2416,7 @@ public class BlockManager { "complete with the same genstamp"); return null; } else { - return new BlockToMarkCorrupt(storedBlock, + return new BlockToMarkCorrupt(reported, storedBlock, "reported replica has invalid state " + reportedState, Reason.INVALID_STATE); } @@ -2409,11 +2429,12 @@ public class BlockManager { " on " + dn + " size " + storedBlock.getNumBytes(); // log here at WARN level since this is really a broken HDFS invariant LOG.warn(msg); - return new BlockToMarkCorrupt(storedBlock, msg, Reason.INVALID_STATE); + return new BlockToMarkCorrupt(reported, storedBlock, msg, + Reason.INVALID_STATE); } } - private boolean isBlockUnderConstruction(BlockInfoContiguous storedBlock, + private boolean isBlockUnderConstruction(BlockInfo storedBlock, BlockUCState ucState, ReplicaState reportedState) { switch(reportedState) { case FINALIZED: @@ -2442,7 +2463,7 @@ public class BlockManager { if (ucBlock.reportedState == ReplicaState.FINALIZED && !block.findDatanode(storageInfo.getDatanodeDescriptor())) { - addStoredBlock(block, storageInfo, null, true); + addStoredBlock(block, ucBlock.reportedBlock, storageInfo, null, true); } } @@ -2457,18 +2478,18 @@ public class BlockManager { * * @throws IOException */ - private void addStoredBlockImmediate(BlockInfoContiguous storedBlock, + private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported, DatanodeStorageInfo storageInfo) throws IOException { assert (storedBlock != null && namesystem.hasWriteLock()); if (!namesystem.isInStartupSafeMode() || namesystem.isPopulatingReplQueues()) { - addStoredBlock(storedBlock, storageInfo, null, false); + addStoredBlock(storedBlock, reported, storageInfo, null, false); return; } // just add it - storageInfo.addBlock(storedBlock); + storageInfo.addBlock(storedBlock, reported); // Now check for completion of blocks and safe block count int numCurrentReplica = countLiveNodes(storedBlock); @@ -2489,13 +2510,14 @@ public class BlockManager { * needed replications if this takes care of the problem. * @return the block that is stored in blockMap. */ - private Block addStoredBlock(final BlockInfoContiguous block, + private Block addStoredBlock(final BlockInfo block, + final Block reportedBlock, DatanodeStorageInfo storageInfo, DatanodeDescriptor delNodeHint, boolean logEveryBlock) throws IOException { assert block != null && namesystem.hasWriteLock(); - BlockInfoContiguous storedBlock; + BlockInfo storedBlock; DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); if (block instanceof BlockInfoContiguousUnderConstruction) { //refresh our copy in case the block got completed in another thread @@ -2516,7 +2538,7 @@ public class BlockManager { assert bc != null : "Block must belong to a file"; // add block to the datanode - AddBlockResult result = storageInfo.addBlock(storedBlock); + AddBlockResult result = storageInfo.addBlock(storedBlock, reportedBlock); int curReplicaDelta; if (result == AddBlockResult.ADDED) { @@ -2588,13 +2610,13 @@ public class BlockManager { storedBlock + "blockMap has " + numCorruptNodes + " but corrupt replicas map has " + corruptReplicasCount); } - if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) - invalidateCorruptReplicas(storedBlock); + if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) { + invalidateCorruptReplicas(storedBlock, reportedBlock); + } return storedBlock; } - private void logAddStoredBlock(BlockInfoContiguous storedBlock, - DatanodeDescriptor node) { + private void logAddStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) { if (!blockLog.isInfoEnabled()) { return; } @@ -2621,7 +2643,7 @@ public class BlockManager { * * @param blk Block whose corrupt replicas need to be invalidated */ - private void invalidateCorruptReplicas(BlockInfoContiguous blk) { + private void invalidateCorruptReplicas(BlockInfo blk, Block reported) { Collection nodes = corruptReplicas.getNodes(blk); boolean removedFromBlocksMap = true; if (nodes == null) @@ -2631,7 +2653,7 @@ public class BlockManager { DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]); for (DatanodeDescriptor node : nodesCopy) { try { - if (!invalidateBlock(new BlockToMarkCorrupt(blk, null, + if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null, Reason.ANY), node)) { removedFromBlocksMap = false; } @@ -2700,7 +2722,7 @@ public class BlockManager { long nrInvalid = 0, nrOverReplicated = 0; long nrUnderReplicated = 0, nrPostponed = 0, nrUnderConstruction = 0; long startTimeMisReplicatedScan = Time.monotonicNow(); - Iterator blocksItr = blocksMap.getBlocks().iterator(); + Iterator blocksItr = blocksMap.getBlocks().iterator(); long totalBlocks = blocksMap.size(); replicationQueuesInitProgress = 0; long totalProcessed = 0; @@ -2712,7 +2734,7 @@ public class BlockManager { namesystem.writeLockInterruptibly(); try { while (processed < numBlocksPerIteration && blocksItr.hasNext()) { - BlockInfoContiguous block = blocksItr.next(); + BlockInfo block = blocksItr.next(); MisReplicationResult res = processMisReplicatedBlock(block); if (LOG.isTraceEnabled()) { LOG.trace("block " + block + ": " + res); @@ -2787,7 +2809,7 @@ public class BlockManager { * appropriate queues if necessary, and returns a result code indicating * what happened with it. */ - private MisReplicationResult processMisReplicatedBlock(BlockInfoContiguous block) { + private MisReplicationResult processMisReplicatedBlock(BlockInfo block) { BlockCollection bc = block.getBlockCollection(); if (bc == null) { // block does not belong to any file @@ -3116,14 +3138,14 @@ public class BlockManager { ReplicaState reportedState, DatanodeDescriptor delHintNode) throws IOException { // blockReceived reports a finalized block - Collection toAdd = new LinkedList(); + Collection toAdd = new LinkedList<>(); Collection toInvalidate = new LinkedList(); Collection toCorrupt = new LinkedList(); Collection toUC = new LinkedList(); final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); - processReportedBlock(storageInfo, block, reportedState, - toAdd, toInvalidate, toCorrupt, toUC); + processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate, + toCorrupt, toUC); // the block is only in one of the to-do lists // if it is in none then data-node already has it assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1 @@ -3133,8 +3155,9 @@ public class BlockManager { addStoredBlockUnderConstruction(b, storageInfo); } long numBlocksLogged = 0; - for (BlockInfoContiguous b : toAdd) { - addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog); + for (BlockInfoToAdd b : toAdd) { + addStoredBlock(b.stored, b.reported, storageInfo, delHintNode, + numBlocksLogged < maxNumBlocksToLog); numBlocksLogged++; } if (numBlocksLogged > maxNumBlocksToLog) { @@ -3257,7 +3280,7 @@ public class BlockManager { * @param b - the block being tested * @return count of live nodes for this block */ - int countLiveNodes(BlockInfoContiguous b) { + int countLiveNodes(BlockInfo b) { if (!namesystem.isInStartupSafeMode()) { return countNodes(b).liveReplicas(); } @@ -3331,7 +3354,7 @@ public class BlockManager { return blocksMap.size(); } - public DatanodeStorageInfo[] getStorages(BlockInfoContiguous block) { + public DatanodeStorageInfo[] getStorages(BlockInfo block) { final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[block.numNodes()]; int i = 0; for(DatanodeStorageInfo s : blocksMap.getStorages(block)) { @@ -3361,8 +3384,8 @@ public class BlockManager { } } - public BlockInfoContiguous getStoredBlock(Block block) { - BlockInfoContiguous info = null; + public BlockInfo getStoredBlock(Block block) { + BlockInfo info = null; if (BlockIdManager.isStripedBlockID(block.getBlockId())) { info = blocksMap.getStoredBlock( new Block(BlockIdManager.convertToGroupID(block.getBlockId()))); @@ -3519,7 +3542,8 @@ public class BlockManager { public BlockInfoContiguous addBlockCollection(BlockInfoContiguous block, BlockCollection bc) { - return blocksMap.addBlockCollection(block, bc); + // TODO + return (BlockInfoContiguous) blocksMap.addBlockCollection(block, bc); } public BlockCollection getBlockCollection(Block b) { @@ -3727,7 +3751,7 @@ public class BlockManager { /** * A simple result enum for the result of - * {@link BlockManager#processMisReplicatedBlock(BlockInfoContiguous)}. + * {@link BlockManager#processMisReplicatedBlock}. */ enum MisReplicationResult { /** The block should be invalidated since it belongs to a deleted file. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/249ca621/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java index 806a4cb..d383de8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java @@ -20,12 +20,10 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.Iterator; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.util.GSet; import org.apache.hadoop.util.LightWeightGSet; -import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; @@ -36,10 +34,10 @@ import com.google.common.collect.Iterables; */ class BlocksMap { private static class StorageIterator implements Iterator { - private final BlockInfoContiguous blockInfo; + private final BlockInfo blockInfo; private int nextIdx = 0; - StorageIterator(BlockInfoContiguous blkInfo) { + StorageIterator(BlockInfo blkInfo) { this.blockInfo = blkInfo; } @@ -63,14 +61,14 @@ class BlocksMap { /** Constant {@link LightWeightGSet} capacity. */ private final int capacity; - private GSet blocks; + private GSet blocks; BlocksMap(int capacity) { // Use 2% of total memory to size the GSet capacity this.capacity = capacity; - this.blocks = new LightWeightGSet(capacity) { + this.blocks = new LightWeightGSet(capacity) { @Override - public Iterator iterator() { + public Iterator iterator() { SetIterator iterator = new SetIterator(); /* * Not tracking any modifications to set. As this set will be used @@ -97,15 +95,15 @@ class BlocksMap { } BlockCollection getBlockCollection(Block b) { - BlockInfoContiguous info = blocks.get(b); + BlockInfo info = blocks.get(b); return (info != null) ? info.getBlockCollection() : null; } /** * Add block b belonging to the specified block collection to the map. */ - BlockInfoContiguous addBlockCollection(BlockInfoContiguous b, BlockCollection bc) { - BlockInfoContiguous info = blocks.get(b); + BlockInfo addBlockCollection(BlockInfo b, BlockCollection bc) { + BlockInfo info = blocks.get(b); if (info != b) { info = b; blocks.put(info); @@ -120,11 +118,12 @@ class BlocksMap { * and remove all data-node locations associated with the block. */ void removeBlock(Block block) { - BlockInfoContiguous blockInfo = blocks.remove(block); + BlockInfo blockInfo = blocks.remove(block); if (blockInfo == null) return; blockInfo.setBlockCollection(null); + // TODO: fix this logic for block group for(int idx = blockInfo.numNodes()-1; idx >= 0; idx--) { DatanodeDescriptor dn = blockInfo.getDatanode(idx); dn.removeBlock(blockInfo); // remove from the list and wipe the location @@ -132,7 +131,7 @@ class BlocksMap { } /** Returns the block object it it exists in the map. */ - BlockInfoContiguous getStoredBlock(Block b) { + BlockInfo getStoredBlock(Block b) { return blocks.get(b); } @@ -164,7 +163,7 @@ class BlocksMap { * For a block that has already been retrieved from the BlocksMap * returns {@link Iterable} of the storages the block belongs to. */ - Iterable getStorages(final BlockInfoContiguous storedBlock) { + Iterable getStorages(final BlockInfo storedBlock) { return new Iterable() { @Override public Iterator iterator() { @@ -175,7 +174,7 @@ class BlocksMap { /** counts number of containing nodes. Better than using iterator. */ int numNodes(Block b) { - BlockInfoContiguous info = blocks.get(b); + BlockInfo info = blocks.get(b); return info == null ? 0 : info.numNodes(); } @@ -185,7 +184,7 @@ class BlocksMap { * only if it does not belong to any file and data-nodes. */ boolean removeNode(Block b, DatanodeDescriptor node) { - BlockInfoContiguous info = blocks.get(b); + BlockInfo info = blocks.get(b); if (info == null) return false; @@ -203,7 +202,7 @@ class BlocksMap { return blocks.size(); } - Iterable getBlocks() { + Iterable getBlocks() { return blocks; } @@ -218,20 +217,11 @@ class BlocksMap { * @param newBlock - block for replacement * @return new block */ - BlockInfoContiguous replaceBlock(BlockInfoContiguous newBlock) { - BlockInfoContiguous currentBlock = blocks.get(newBlock); + BlockInfo replaceBlock(BlockInfo newBlock) { + BlockInfo currentBlock = blocks.get(newBlock); assert currentBlock != null : "the block if not in blocksMap"; // replace block in data-node lists - for (int i = currentBlock.numNodes() - 1; i >= 0; i--) { - final DatanodeDescriptor dn = currentBlock.getDatanode(i); - final DatanodeStorageInfo storage = currentBlock.findStorageInfo(dn); - final boolean removed = storage.removeBlock(currentBlock); - Preconditions.checkState(removed, "currentBlock not found."); - - final AddBlockResult result = storage.addBlock(newBlock); - Preconditions.checkState(result == AddBlockResult.ADDED, - "newBlock already exists."); - } + currentBlock.replaceBlock(newBlock); // replace block in the map itself blocks.put(newBlock); return newBlock; http://git-wip-us.apache.org/repos/asf/hadoop/blob/249ca621/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java index bf5ece9..79d7713 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java @@ -513,8 +513,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable { iter.remove(); } } - BlockInfoContiguous blockInfo = blockManager. - getStoredBlock(new Block(cblock.getBlockId())); + BlockInfoContiguous blockInfo = namesystem.getStoredBlock(new Block(cblock.getBlockId())); String reason = findReasonForNotCaching(cblock, blockInfo); int neededCached = 0; if (reason != null) { @@ -628,8 +627,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable { List pendingCached) { // To figure out which replicas can be cached, we consult the // blocksMap. We don't want to try to cache a corrupt replica, though. - BlockInfoContiguous blockInfo = blockManager. - getStoredBlock(new Block(cachedBlock.getBlockId())); + BlockInfoContiguous blockInfo = namesystem.getStoredBlock(new Block(cachedBlock.getBlockId())); if (blockInfo == null) { LOG.debug("Block {}: can't add new cached replicas," + " because there is no record of this block " + @@ -668,7 +666,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable { while (it.hasNext()) { CachedBlock cBlock = it.next(); BlockInfoContiguous info = - blockManager.getStoredBlock(new Block(cBlock.getBlockId())); + namesystem.getStoredBlock(new Block(cBlock.getBlockId())); if (info != null) { pendingBytes -= info.getNumBytes(); } @@ -678,7 +676,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable { while (it.hasNext()) { CachedBlock cBlock = it.next(); BlockInfoContiguous info = - blockManager.getStoredBlock(new Block(cBlock.getBlockId())); + namesystem.getStoredBlock(new Block(cBlock.getBlockId())); if (info != null) { pendingBytes += info.getNumBytes(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/249ca621/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index d0d7a72..a022c81 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -335,7 +335,7 @@ public class DatanodeDescriptor extends DatanodeInfo { * Remove block from the list of blocks belonging to the data-node. Remove * data-node from the block. */ - boolean removeBlock(BlockInfoContiguous b) { + boolean removeBlock(BlockInfo b) { final DatanodeStorageInfo s = b.findStorageInfo(this); // if block exists on this datanode if (s != null) { @@ -348,12 +348,9 @@ public class DatanodeDescriptor extends DatanodeInfo { * Remove block from the list of blocks belonging to the data-node. Remove * data-node from the block. */ - boolean removeBlock(String storageID, BlockInfoContiguous b) { + boolean removeBlock(String storageID, BlockInfo b) { DatanodeStorageInfo s = getStorageInfo(storageID); - if (s != null) { - return s.removeBlock(b); - } - return false; + return s != null && s.removeBlock(b); } public void resetBlocks() { @@ -532,12 +529,12 @@ public class DatanodeDescriptor extends DatanodeInfo { } } - private static class BlockIterator implements Iterator { + private static class BlockIterator implements Iterator { private int index = 0; - private final List> iterators; + private final List> iterators; private BlockIterator(final DatanodeStorageInfo... storages) { - List> iterators = new ArrayList>(); + List> iterators = new ArrayList<>(); for (DatanodeStorageInfo e : storages) { iterators.add(e.getBlockIterator()); } @@ -551,7 +548,7 @@ public class DatanodeDescriptor extends DatanodeInfo { } @Override - public BlockInfoContiguous next() { + public BlockInfo next() { update(); return iterators.get(index).next(); } @@ -568,10 +565,11 @@ public class DatanodeDescriptor extends DatanodeInfo { } } - Iterator getBlockIterator() { + Iterator getBlockIterator() { return new BlockIterator(getStorageInfos()); } - Iterator getBlockIterator(final String storageID) { + + Iterator getBlockIterator(final String storageID) { return new BlockIterator(getStorageInfo(storageID)); }