hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [09/19] hadoop git commit: Revert "HDFS-8623. Refactor NameNode handling of invalid, corrupt, and under-recovery blocks. Contributed by Zhe Zhang."
Date Mon, 10 Aug 2015 16:47:55 GMT
Revert "HDFS-8623. Refactor NameNode handling of invalid, corrupt, and under-recovery blocks. Contributed by Zhe Zhang."

This reverts commit de480d6c8945bd8b5b00e8657b7a72ce8dd9b6b5.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/663eba0a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/663eba0a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/663eba0a

Branch: refs/heads/HADOOP-12111
Commit: 663eba0ab1c73b45f98e46ffc87ad8fd91584046
Parents: fb1be0b
Author: Jing Zhao <jing9@apache.org>
Authored: Wed Aug 5 17:41:54 2015 -0700
Committer: Jing Zhao <jing9@apache.org>
Committed: Thu Aug 6 10:21:54 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 -
 .../hdfs/server/blockmanagement/BlockInfo.java  |  12 +-
 .../blockmanagement/BlockInfoContiguous.java    |   2 +-
 .../BlockInfoUnderConstructionContiguous.java   |   2 +-
 .../server/blockmanagement/BlockManager.java    | 576 +++++++++----------
 .../blockmanagement/DatanodeStorageInfo.java    |  15 +-
 .../hdfs/server/namenode/FSNamesystem.java      |  18 +-
 .../hdfs/server/namenode/NamenodeFsck.java      |   2 +-
 .../server/blockmanagement/TestBlockInfo.java   |   2 +-
 .../blockmanagement/TestBlockManager.java       |   4 +-
 .../server/blockmanagement/TestNodeCount.java   |   2 +-
 .../TestOverReplicatedBlocks.java               |   4 +-
 .../blockmanagement/TestReplicationPolicy.java  |   2 +-
 13 files changed, 284 insertions(+), 360 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/663eba0a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 40f91f9..5962385 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -689,9 +689,6 @@ Release 2.8.0 - UNRELEASED
     HDFS-8651. Make hadoop-hdfs-project Native code -Wall-clean (Alan Burlison
     via Colin P. McCabe)
 
-    HDFS-8623. Refactor NameNode handling of invalid, corrupt, and under-recovery
-    blocks. (Zhe Zhang via jing9)
-
     HDFS-8653. Code cleanup for DatanodeManager, DatanodeDescriptor and
     DatanodeStorageInfo. (Zhe Zhang via wang)
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/663eba0a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
index 5ad992b..4cc2791 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
@@ -172,23 +172,19 @@ public abstract class  BlockInfo extends Block
   public abstract int numNodes();
 
   /**
-   * Add a {@link DatanodeStorageInfo} location for a block
-   * @param storage The storage to add
-   * @param reportedBlock The block reported from the datanode. This is only
-   *                      used by erasure coded blocks, this block's id contains
-   *                      information indicating the index of the block in the
-   *                      corresponding block group.
+   * Add a {@link DatanodeStorageInfo} location for a block.
    */
-  abstract boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock);
+  abstract boolean addStorage(DatanodeStorageInfo storage);
 
   /**
    * Remove {@link DatanodeStorageInfo} location for a block
    */
   abstract boolean removeStorage(DatanodeStorageInfo storage);
 
+
   /**
    * Replace the current BlockInfo with the new one in corresponding
-   * DatanodeStorageInfo's linked list.
+   * DatanodeStorageInfo's linked list
    */
   abstract void replaceBlock(BlockInfo newBlock);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/663eba0a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
index de64ad8..b9abcd0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java
@@ -45,7 +45,7 @@ public class BlockInfoContiguous extends BlockInfo {
   }
 
   @Override
-  boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
+  boolean addStorage(DatanodeStorageInfo storage) {
     return ContiguousBlockStorageOp.addStorage(this, storage);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/663eba0a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java
index d3cb337..c66675a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java
@@ -69,7 +69,7 @@ public class BlockInfoUnderConstructionContiguous extends
   }
 
   @Override
-  boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) {
+  boolean addStorage(DatanodeStorageInfo storage) {
     return ContiguousBlockStorageOp.addStorage(this, storage);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/663eba0a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 3ffd1bf..1597f41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -203,8 +203,8 @@ public class BlockManager implements BlockStatsMXBean {
    * Maps a StorageID to the set of blocks that are "extra" for this
    * DataNode. We'll eventually remove these extras.
    */
-  public final Map<String, LightWeightLinkedSet<BlockInfo>> excessReplicateMap =
-    new TreeMap<>();
+  public final Map<String, LightWeightLinkedSet<Block>> excessReplicateMap =
+    new TreeMap<String, LightWeightLinkedSet<Block>>();
 
   /**
    * Store set of Blocks that need to be replicated 1 or more times.
@@ -508,8 +508,8 @@ public class BlockManager implements BlockStatsMXBean {
   /** Dump meta data to out. */
   public void metaSave(PrintWriter out) {
     assert namesystem.hasWriteLock();
-    final List<DatanodeDescriptor> live = new ArrayList<>();
-    final List<DatanodeDescriptor> dead = new ArrayList<>();
+    final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
+    final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
     datanodeManager.fetchDatanodes(live, dead, false);
     out.println("Live Datanodes: " + live.size());
     out.println("Dead Datanodes: " + dead.size());
@@ -548,8 +548,8 @@ public class BlockManager implements BlockStatsMXBean {
     List<DatanodeDescriptor> containingNodes =
                                       new ArrayList<DatanodeDescriptor>();
     List<DatanodeStorageInfo> containingLiveReplicasNodes =
-      new ArrayList<>();
-
+      new ArrayList<DatanodeStorageInfo>();
+    
     NumberReplicas numReplicas = new NumberReplicas();
     // source node returned is not used
     chooseSourceDatanode(block, containingNodes,
@@ -578,7 +578,7 @@ public class BlockManager implements BlockStatsMXBean {
     Collection<DatanodeDescriptor> corruptNodes = 
                                   corruptReplicas.getNodes(block);
     
-    for (DatanodeStorageInfo storage : getStorages(block)) {
+    for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
       final DatanodeDescriptor node = storage.getDatanodeDescriptor();
       String state = "";
       if (corruptNodes != null && corruptNodes.contains(node)) {
@@ -601,23 +601,11 @@ public class BlockManager implements BlockStatsMXBean {
     return maxReplicationStreams;
   }
 
-  public int getDefaultStorageNum(BlockInfo block) {
-    return defaultReplication;
-  }
-
-  public short getMinStorageNum(BlockInfo block) {
-    return minReplication;
-  }
-
   /**
-   * @return true if the block has minimum stored copies
+   * @return true if the block has minimum replicas
    */
-  public boolean hasMinStorage(BlockInfo block) {
-    return hasMinStorage(block, countNodes(block).liveReplicas());
-  }
-
-  public boolean hasMinStorage(BlockInfo block, int liveNum) {
-    return liveNum >= getMinStorageNum(block);
+  public boolean checkMinReplication(BlockInfo block) {
+    return (countNodes(block).liveReplicas() >= minReplication);
   }
 
   /**
@@ -632,9 +620,8 @@ public class BlockManager implements BlockStatsMXBean {
   private static boolean commitBlock(
       final BlockInfoUnderConstruction block, final Block commitBlock)
       throws IOException {
-    if (block.getBlockUCState() == BlockUCState.COMMITTED) {
+    if (block.getBlockUCState() == BlockUCState.COMMITTED)
       return false;
-    }
     assert block.getNumBytes() <= commitBlock.getNumBytes() :
       "commitBlock length is less than the stored one "
       + commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
@@ -654,22 +641,18 @@ public class BlockManager implements BlockStatsMXBean {
    */
   public boolean commitOrCompleteLastBlock(BlockCollection bc,
       Block commitBlock) throws IOException {
-    if (commitBlock == null) {
+    if(commitBlock == null)
       return false; // not committing, this is a block allocation retry
-    }
     BlockInfo lastBlock = bc.getLastBlock();
-    if (lastBlock == null) {
+    if(lastBlock == null)
       return false; // no blocks in file yet
-    }
-    if (lastBlock.isComplete()) {
+    if(lastBlock.isComplete())
       return false; // already completed (e.g. by syncBlock)
-    }
-
+    
     final boolean b = commitBlock(
         (BlockInfoUnderConstruction) lastBlock, commitBlock);
-    if(hasMinStorage(lastBlock)) {
+    if(countNodes(lastBlock).liveReplicas() >= minReplication)
       completeBlock(bc, bc.numBlocks()-1, false);
-    }
     return b;
   }
 
@@ -682,24 +665,20 @@ public class BlockManager implements BlockStatsMXBean {
    */
   private BlockInfo completeBlock(final BlockCollection bc,
       final int blkIndex, boolean force) throws IOException {
-    if (blkIndex < 0) {
+    if(blkIndex < 0)
       return null;
-    }
     BlockInfo curBlock = bc.getBlocks()[blkIndex];
-    if(curBlock.isComplete()) {
+    if(curBlock.isComplete())
       return curBlock;
-    }
     BlockInfoUnderConstruction ucBlock =
         (BlockInfoUnderConstruction) curBlock;
     int numNodes = ucBlock.numNodes();
-    if (!force && !hasMinStorage(curBlock, numNodes)) {
+    if (!force && numNodes < minReplication)
       throw new IOException("Cannot complete block: " +
           "block does not satisfy minimal replication requirement.");
-    }
-    if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED) {
+    if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED)
       throw new IOException(
           "Cannot complete block: block has not been COMMITTED by the client");
-    }
     BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
     // replace penultimate block in file
     bc.setBlock(blkIndex, completeBlock);
@@ -784,7 +763,7 @@ public class BlockManager implements BlockStatsMXBean {
     // count in safe-mode.
     namesystem.adjustSafeModeBlockTotals(
         // decrement safe if we had enough
-        hasMinStorage(oldBlock, targets.length) ? -1 : 0,
+        targets.length >= minReplication ? -1 : 0,
         // always decrement total blocks
         -1);
 
@@ -798,8 +777,8 @@ public class BlockManager implements BlockStatsMXBean {
    */
   private List<DatanodeStorageInfo> getValidLocations(Block block) {
     final List<DatanodeStorageInfo> locations
-        = new ArrayList<>(blocksMap.numNodes(block));
-    for(DatanodeStorageInfo storage : getStorages(block)) {
+        = new ArrayList<DatanodeStorageInfo>(blocksMap.numNodes(block));
+    for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
       // filter invalidate replicas
       if(!invalidateBlocks.contains(storage.getDatanodeDescriptor(), block)) {
         locations.add(storage);
@@ -812,7 +791,7 @@ public class BlockManager implements BlockStatsMXBean {
       final BlockInfo[] blocks,
       final long offset, final long length, final int nrBlocksToReturn,
       final AccessMode mode) throws IOException {
-    int curBlk;
+    int curBlk = 0;
     long curPos = 0, blkSize = 0;
     int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
     for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
@@ -825,10 +804,10 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     if (nrBlocks > 0 && curBlk == nrBlocks)   // offset >= end of file
-      return Collections.emptyList();
+      return Collections.<LocatedBlock>emptyList();
 
     long endOff = offset + length;
-    List<LocatedBlock> results = new ArrayList<>(blocks.length);
+    List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
     do {
       results.add(createLocatedBlock(blocks[curBlk], curPos, mode));
       curPos += blocks[curBlk].getNumBytes();
@@ -841,7 +820,7 @@ public class BlockManager implements BlockStatsMXBean {
 
   private LocatedBlock createLocatedBlock(final BlockInfo[] blocks,
       final long endPos, final AccessMode mode) throws IOException {
-    int curBlk;
+    int curBlk = 0;
     long curPos = 0;
     int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
     for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
@@ -865,8 +844,8 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   /** @return a LocatedBlock for the given block */
-  private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos)
-      throws IOException {
+  private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos
+      ) throws IOException {
     if (blk instanceof BlockInfoUnderConstruction) {
       if (blk.isComplete()) {
         throw new IOException(
@@ -876,8 +855,7 @@ public class BlockManager implements BlockStatsMXBean {
       final BlockInfoUnderConstruction uc =
           (BlockInfoUnderConstruction) blk;
       final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
-      final ExtendedBlock eb =
-          new ExtendedBlock(namesystem.getBlockPoolId(), blk);
+      final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
       return newLocatedBlock(eb, storages, pos, false);
     }
 
@@ -897,12 +875,11 @@ public class BlockManager implements BlockStatsMXBean {
     final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
     int j = 0;
     if (numMachines > 0) {
-      for(DatanodeStorageInfo storage : getStorages(blk)) {
+      for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
         final DatanodeDescriptor d = storage.getDatanodeDescriptor();
         final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
-        if (isCorrupt || (!replicaCorrupt)) {
+        if (isCorrupt || (!replicaCorrupt))
           machines[j++] = storage;
-        }
       }
     }
     assert j == machines.length :
@@ -1076,7 +1053,7 @@ public class BlockManager implements BlockStatsMXBean {
     for(int i=0; i<startBlock; i++) {
       iter.next();
     }
-    List<BlockWithLocations> results = new ArrayList<>();
+    List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
     long totalSize = 0;
     BlockInfo curBlock;
     while(totalSize<size && iter.hasNext()) {
@@ -1100,7 +1077,7 @@ public class BlockManager implements BlockStatsMXBean {
    
   /** Remove the blocks associated to the given datanode. */
   void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
-    final Iterator<BlockInfo> it = node.getBlockIterator();
+    final Iterator<? extends Block> it = node.getBlockIterator();
     while(it.hasNext()) {
       removeStoredBlock(it.next(), node);
     }
@@ -1114,10 +1091,10 @@ public class BlockManager implements BlockStatsMXBean {
   /** Remove the blocks associated to the given DatanodeStorageInfo. */
   void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) {
     assert namesystem.hasWriteLock();
-    final Iterator<BlockInfo> it = storageInfo.getBlockIterator();
+    final Iterator<? extends Block> it = storageInfo.getBlockIterator();
     DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     while(it.hasNext()) {
-      BlockInfo block = it.next();
+      Block block = it.next();
       removeStoredBlock(block, node);
       invalidateBlocks.remove(node, block);
     }
@@ -1139,19 +1116,18 @@ public class BlockManager implements BlockStatsMXBean {
    * Adds block to list of blocks which will be invalidated on all its
    * datanodes.
    */
-  private void addToInvalidates(BlockInfo storedBlock) {
+  private void addToInvalidates(Block b) {
     if (!namesystem.isPopulatingReplQueues()) {
       return;
     }
     StringBuilder datanodes = new StringBuilder();
-    for(DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock,
-        State.NORMAL)) {
+    for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
       final DatanodeDescriptor node = storage.getDatanodeDescriptor();
-      invalidateBlocks.add(storedBlock, node, false);
+      invalidateBlocks.add(b, node, false);
       datanodes.append(node).append(" ");
     }
     if (datanodes.length() != 0) {
-      blockLog.debug("BLOCK* addToInvalidates: {} {}", storedBlock,
+      blockLog.debug("BLOCK* addToInvalidates: {} {}", b,
           datanodes.toString());
     }
   }
@@ -1179,8 +1155,7 @@ public class BlockManager implements BlockStatsMXBean {
   public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
       final DatanodeInfo dn, String storageID, String reason) throws IOException {
     assert namesystem.hasWriteLock();
-    final Block reportedBlock = blk.getLocalBlock();
-    final BlockInfo storedBlock = getStoredBlock(reportedBlock);
+    final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock());
     if (storedBlock == null) {
       // Check if the replica is in the blockMap, if not
       // ignore the request for now. This could happen when BlockScanner
@@ -1196,8 +1171,8 @@ public class BlockManager implements BlockStatsMXBean {
           + " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid()
           + ") does not exist");
     }
-
-    markBlockAsCorrupt(new BlockToMarkCorrupt(reportedBlock, storedBlock,
+    
+    markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
             blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
         storageID == null ? null : node.getStorageInfo(storageID),
         node);
@@ -1213,18 +1188,18 @@ public class BlockManager implements BlockStatsMXBean {
       DatanodeStorageInfo storageInfo,
       DatanodeDescriptor node) throws IOException {
 
-    if (b.stored.isDeleted()) {
+    if (b.corrupted.isDeleted()) {
       blockLog.debug("BLOCK markBlockAsCorrupt: {} cannot be marked as" +
           " corrupt as it does not belong to any file", b);
       addToInvalidates(b.corrupted, node);
       return;
     } 
     short expectedReplicas =
-        getExpectedReplicaNum(b.stored.getBlockCollection(), b.stored);
+        b.corrupted.getBlockCollection().getPreferredBlockReplication();
 
     // Add replica to the data-node if it is not already there
     if (storageInfo != null) {
-      storageInfo.addBlock(b.stored, b.corrupted);
+      storageInfo.addBlock(b.stored);
     }
 
     // Add this replica to corruptReplicas Map
@@ -1234,8 +1209,8 @@ public class BlockManager implements BlockStatsMXBean {
     NumberReplicas numberOfReplicas = countNodes(b.stored);
     boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >=
         expectedReplicas;
-    boolean minReplicationSatisfied = hasMinStorage(b.stored,
-        numberOfReplicas.liveReplicas());
+    boolean minReplicationSatisfied =
+        numberOfReplicas.liveReplicas() >= minReplication;
     boolean hasMoreCorruptReplicas = minReplicationSatisfied &&
         (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
         expectedReplicas;
@@ -1378,7 +1353,7 @@ public class BlockManager implements BlockStatsMXBean {
     int additionalReplRequired;
 
     int scheduledWork = 0;
-    List<ReplicationWork> work = new LinkedList<>();
+    List<ReplicationWork> work = new LinkedList<ReplicationWork>();
 
     namesystem.writeLock();
     try {
@@ -1395,11 +1370,11 @@ public class BlockManager implements BlockStatsMXBean {
               continue;
             }
 
-            requiredReplication = getExpectedReplicaNum(bc, block);
+            requiredReplication = bc.getPreferredBlockReplication();
 
             // get a source data-node
-            containingNodes = new ArrayList<>();
-            List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
+            containingNodes = new ArrayList<DatanodeDescriptor>();
+            List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>();
             NumberReplicas numReplicas = new NumberReplicas();
             srcNode = chooseSourceDatanode(
                 block, containingNodes, liveReplicaNodes, numReplicas,
@@ -1419,7 +1394,7 @@ public class BlockManager implements BlockStatsMXBean {
       
             if (numEffectiveReplicas >= requiredReplication) {
               if ( (pendingReplications.getNumReplicas(block) > 0) ||
-                   (blockHasEnoughRacks(block, requiredReplication)) ) {
+                   (blockHasEnoughRacks(block)) ) {
                 neededReplications.remove(block, priority); // remove from neededReplications
                 blockLog.debug("BLOCK* Removing {} from neededReplications as" +
                         " it has enough replicas", block);
@@ -1443,7 +1418,7 @@ public class BlockManager implements BlockStatsMXBean {
       namesystem.writeUnlock();
     }
 
-    final Set<Node> excludedNodes = new HashSet<>();
+    final Set<Node> excludedNodes = new HashSet<Node>();
     for(ReplicationWork rw : work){
       // Exclude all of the containing nodes from being targets.
       // This list includes decommissioning or corrupt nodes.
@@ -1479,7 +1454,7 @@ public class BlockManager implements BlockStatsMXBean {
             rw.targets = null;
             continue;
           }
-          requiredReplication = getExpectedReplicaNum(bc, block);
+          requiredReplication = bc.getPreferredBlockReplication();
 
           // do not schedule more if enough replicas is already pending
           NumberReplicas numReplicas = countNodes(block);
@@ -1488,7 +1463,7 @@ public class BlockManager implements BlockStatsMXBean {
 
           if (numEffectiveReplicas >= requiredReplication) {
             if ( (pendingReplications.getNumReplicas(block) > 0) ||
-                 (blockHasEnoughRacks(block, requiredReplication)) ) {
+                 (blockHasEnoughRacks(block)) ) {
               neededReplications.remove(block, priority); // remove from neededReplications
               rw.targets = null;
               blockLog.debug("BLOCK* Removing {} from neededReplications as" +
@@ -1498,7 +1473,7 @@ public class BlockManager implements BlockStatsMXBean {
           }
 
           if ( (numReplicas.liveReplicas() >= requiredReplication) &&
-               (!blockHasEnoughRacks(block, requiredReplication)) ) {
+               (!blockHasEnoughRacks(block)) ) {
             if (rw.srcNode.getNetworkLocation().equals(
                 targets[0].getDatanodeDescriptor().getNetworkLocation())) {
               //No use continuing, unless a new rack in this case
@@ -1613,7 +1588,7 @@ public class BlockManager implements BlockStatsMXBean {
   List<DatanodeDescriptor> getDatanodeDescriptors(List<String> nodes) {
     List<DatanodeDescriptor> datanodeDescriptors = null;
     if (nodes != null) {
-      datanodeDescriptors = new ArrayList<>(nodes.size());
+      datanodeDescriptors = new ArrayList<DatanodeDescriptor>(nodes.size());
       for (int i = 0; i < nodes.size(); i++) {
         DatanodeDescriptor node = datanodeManager.getDatanodeDescriptor(nodes.get(i));
         if (node != null) {
@@ -1669,9 +1644,9 @@ public class BlockManager implements BlockStatsMXBean {
     int excess = 0;
     
     Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
-    for(DatanodeStorageInfo storage : getStorages(block)) {
+    for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
       final DatanodeDescriptor node = storage.getDatanodeDescriptor();
-      LightWeightLinkedSet<BlockInfo> excessBlocks =
+      LightWeightLinkedSet<Block> excessBlocks =
         excessReplicateMap.get(node.getDatanodeUuid());
       int countableReplica = storage.getState() == State.NORMAL ? 1 : 0; 
       if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
@@ -1739,7 +1714,7 @@ public class BlockManager implements BlockStatsMXBean {
            * Use the blockinfo from the blocksmap to be certain we're working
            * with the most up-to-date block information (e.g. genstamp).
            */
-          BlockInfo bi = getStoredBlock(timedOutItems[i]);
+          BlockInfo bi = blocksMap.getStoredBlock(timedOutItems[i]);
           if (bi == null) {
             continue;
           }
@@ -1789,7 +1764,7 @@ public class BlockManager implements BlockStatsMXBean {
     final BlockInfoUnderConstruction storedBlock;
     final Block reportedBlock;
     final ReplicaState reportedState;
-
+    
     StatefulBlockInfo(BlockInfoUnderConstruction storedBlock,
         Block reportedBlock, ReplicaState reportedState) {
       this.storedBlock = storedBlock;
@@ -1797,34 +1772,14 @@ public class BlockManager implements BlockStatsMXBean {
       this.reportedState = reportedState;
     }
   }
-
-  private static class BlockInfoToAdd {
-    private final BlockInfo stored;
-    private final Block reported;
-
-    BlockInfoToAdd(BlockInfo stored, Block reported) {
-      this.stored = stored;
-      this.reported = reported;
-    }
-
-    public BlockInfo getStored() {
-      return stored;
-    }
-
-    public Block getReported() {
-      return reported;
-    }
-  }
-
+  
   /**
    * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
    * list of blocks that should be considered corrupt due to a block report.
    */
   private static class BlockToMarkCorrupt {
-    /** The corrupted block in a datanode. This is the one reported by the
-     * datanode.
-     */
-    final Block corrupted;
+    /** The corrupted block in a datanode. */
+    final BlockInfo corrupted;
     /** The corresponding block stored in the BlockManager. */
     final BlockInfo stored;
     /** The reason to mark corrupt. */
@@ -1832,7 +1787,7 @@ public class BlockManager implements BlockStatsMXBean {
     /** The reason code to be stored */
     final Reason reasonCode;
 
-    BlockToMarkCorrupt(Block corrupted,
+    BlockToMarkCorrupt(BlockInfo corrupted,
         BlockInfo stored, String reason,
         Reason reasonCode) {
       Preconditions.checkNotNull(corrupted, "corrupted is null");
@@ -1844,9 +1799,15 @@ public class BlockManager implements BlockStatsMXBean {
       this.reasonCode = reasonCode;
     }
 
-    BlockToMarkCorrupt(Block corrupted, BlockInfo stored, long gs,
-        String reason, Reason reasonCode) {
-      this(corrupted, stored, reason, reasonCode);
+    BlockToMarkCorrupt(BlockInfo stored, String reason,
+        Reason reasonCode) {
+      this(stored, stored, reason, reasonCode);
+    }
+
+    BlockToMarkCorrupt(BlockInfo stored, long gs, String reason,
+        Reason reasonCode) {
+      this(new BlockInfoContiguous(stored), stored,
+          reason, reasonCode);
       //the corrupted block in datanode has a different generation stamp
       corrupted.setGenerationStamp(gs);
     }
@@ -2033,7 +1994,7 @@ public class BlockManager implements BlockStatsMXBean {
           break;
         }
 
-        BlockInfo bi = getStoredBlock(b);
+        BlockInfo bi = blocksMap.getStoredBlock(b);
         if (bi == null) {
           if (LOG.isDebugEnabled()) {
             LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
@@ -2065,7 +2026,7 @@ public class BlockManager implements BlockStatsMXBean {
           endPostponedMisReplicatedBlocksCount) + " blocks are removed.");
     }
   }
-
+  
   private Collection<Block> processReport(
       final DatanodeStorageInfo storageInfo,
       final BlockListAsLongs report) throws IOException {
@@ -2073,26 +2034,25 @@ public class BlockManager implements BlockStatsMXBean {
     // Modify the (block-->datanode) map, according to the difference
     // between the old and new block report.
     //
-    Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
-    Collection<BlockInfo> toRemove = new TreeSet<>();
-    Collection<Block> toInvalidate = new LinkedList<>();
-    Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<>();
-    Collection<StatefulBlockInfo> toUC = new LinkedList<>();
+    Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
+    Collection<Block> toRemove = new TreeSet<Block>();
+    Collection<Block> toInvalidate = new LinkedList<Block>();
+    Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
+    Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
     reportDiff(storageInfo, report,
         toAdd, toRemove, toInvalidate, toCorrupt, toUC);
-
+   
     DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     // Process the blocks on each queue
-    for (StatefulBlockInfo b : toUC) {
+    for (StatefulBlockInfo b : toUC) { 
       addStoredBlockUnderConstruction(b, storageInfo);
     }
-    for (BlockInfo b : toRemove) {
+    for (Block b : toRemove) {
       removeStoredBlock(b, node);
     }
     int numBlocksLogged = 0;
-    for (BlockInfoToAdd b : toAdd) {
-      addStoredBlock(b.getStored(), b.getReported(), storageInfo, null,
-          numBlocksLogged < maxNumBlocksToLog);
+    for (BlockInfo b : toAdd) {
+      addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog);
       numBlocksLogged++;
     }
     if (numBlocksLogged > maxNumBlocksToLog) {
@@ -2113,17 +2073,17 @@ public class BlockManager implements BlockStatsMXBean {
    * Mark block replicas as corrupt except those on the storages in 
    * newStorages list.
    */
-  public void markBlockReplicasAsCorrupt(Block oldBlock, BlockInfo block,
-      long oldGenerationStamp, long oldNumBytes,
+  public void markBlockReplicasAsCorrupt(BlockInfo block,
+      long oldGenerationStamp, long oldNumBytes, 
       DatanodeStorageInfo[] newStorages) throws IOException {
     assert namesystem.hasWriteLock();
     BlockToMarkCorrupt b = null;
     if (block.getGenerationStamp() != oldGenerationStamp) {
-      b = new BlockToMarkCorrupt(oldBlock, block, oldGenerationStamp,
+      b = new BlockToMarkCorrupt(block, oldGenerationStamp,
           "genstamp does not match " + oldGenerationStamp
           + " : " + block.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
     } else if (block.getNumBytes() != oldNumBytes) {
-      b = new BlockToMarkCorrupt(oldBlock, block,
+      b = new BlockToMarkCorrupt(block,
           "length does not match " + oldNumBytes
           + " : " + block.getNumBytes(), Reason.SIZE_MISMATCH);
     } else {
@@ -2182,7 +2142,7 @@ public class BlockManager implements BlockStatsMXBean {
         continue;
       }
       
-      BlockInfo storedBlock = getStoredBlock(iblk);
+      BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
       // If block does not belong to any file, we are done.
       if (storedBlock == null) continue;
       
@@ -2220,26 +2180,24 @@ public class BlockManager implements BlockStatsMXBean {
       }      
       //add replica if appropriate
       if (reportedState == ReplicaState.FINALIZED) {
-        addStoredBlockImmediate(storedBlock, iblk, storageInfo);
+        addStoredBlockImmediate(storedBlock, storageInfo);
       }
     }
   }
 
-  private void reportDiff(DatanodeStorageInfo storageInfo,
-      BlockListAsLongs newReport,
-      Collection<BlockInfoToAdd> toAdd,     // add to DatanodeDescriptor
-      Collection<BlockInfo> toRemove,       // remove from DatanodeDescriptor
+  private void reportDiff(DatanodeStorageInfo storageInfo, 
+      BlockListAsLongs newReport, 
+      Collection<BlockInfo> toAdd,              // add to DatanodeDescriptor
+      Collection<Block> toRemove,           // remove from DatanodeDescriptor
       Collection<Block> toInvalidate,       // should be removed from DN
       Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
       Collection<StatefulBlockInfo> toUC) { // add to under-construction list
 
-    // place a delimiter in the list which separates blocks
+    // place a delimiter in the list which separates blocks 
     // that have been reported from those that have not
-    Block delimiterBlock = new Block();
-    BlockInfo delimiter = new BlockInfoContiguous(delimiterBlock,
-        (short) 1);
-    AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock);
-    assert result == AddBlockResult.ADDED
+    BlockInfo delimiter = new BlockInfoContiguous(new Block(), (short) 1);
+    AddBlockResult result = storageInfo.addBlock(delimiter);
+    assert result == AddBlockResult.ADDED 
         : "Delimiting block cannot be present in the node";
     int headIndex = 0; //currently the delimiter is in the head of the list
     int curIndex;
@@ -2256,8 +2214,7 @@ public class BlockManager implements BlockStatsMXBean {
       // move block to the head of the list
       if (storedBlock != null &&
           (curIndex = storedBlock.findStorageInfo(storageInfo)) >= 0) {
-        headIndex =
-            storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex);
+        headIndex = storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex);
       }
     }
 
@@ -2265,9 +2222,8 @@ public class BlockManager implements BlockStatsMXBean {
     // all of them are next to the delimiter
     Iterator<BlockInfo> it =
         storageInfo.new BlockIterator(delimiter.getNext(0));
-    while (it.hasNext()) {
+    while(it.hasNext())
       toRemove.add(it.next());
-    }
     storageInfo.removeBlock(delimiter);
   }
 
@@ -2304,12 +2260,12 @@ public class BlockManager implements BlockStatsMXBean {
    */
   private BlockInfo processReportedBlock(
       final DatanodeStorageInfo storageInfo,
-      final Block block, final ReplicaState reportedState,
-      final Collection<BlockInfoToAdd> toAdd,
-      final Collection<Block> toInvalidate,
+      final Block block, final ReplicaState reportedState, 
+      final Collection<BlockInfo> toAdd,
+      final Collection<Block> toInvalidate, 
       final Collection<BlockToMarkCorrupt> toCorrupt,
       final Collection<StatefulBlockInfo> toUC) {
-
+    
     DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor();
 
     if(LOG.isDebugEnabled()) {
@@ -2317,16 +2273,16 @@ public class BlockManager implements BlockStatsMXBean {
           + " on " + dn + " size " + block.getNumBytes()
           + " replicaState = " + reportedState);
     }
-
+  
     if (shouldPostponeBlocksFromFuture &&
         namesystem.isGenStampInFuture(block)) {
       queueReportedBlock(storageInfo, block, reportedState,
           QUEUE_REASON_FUTURE_GENSTAMP);
       return null;
     }
-
+    
     // find block by blockId
-    BlockInfo storedBlock = getStoredBlock(block);
+    BlockInfo storedBlock = blocksMap.getStoredBlock(block);
     if(storedBlock == null) {
       // If blocksMap does not contain reported block id,
       // the replica should be removed from the data-node.
@@ -2334,7 +2290,7 @@ public class BlockManager implements BlockStatsMXBean {
       return null;
     }
     BlockUCState ucState = storedBlock.getBlockUCState();
-
+    
     // Block is on the NN
     if(LOG.isDebugEnabled()) {
       LOG.debug("In memory blockUCState = " + ucState);
@@ -2379,8 +2335,8 @@ public class BlockManager implements BlockStatsMXBean {
     // but now okay, it might need to be updated.
     if (reportedState == ReplicaState.FINALIZED
         && (storedBlock.findStorageInfo(storageInfo) == -1 ||
-        corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
-      toAdd.add(new BlockInfoToAdd(storedBlock, block));
+            corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
+      toAdd.add(storedBlock);
     }
     return storedBlock;
   }
@@ -2426,7 +2382,7 @@ public class BlockManager implements BlockStatsMXBean {
       if (rbi.getReportedState() == null) {
         // This is a DELETE_BLOCK request
         DatanodeStorageInfo storageInfo = rbi.getStorageInfo();
-        removeStoredBlock(getStoredBlock(rbi.getBlock()),
+        removeStoredBlock(rbi.getBlock(),
             storageInfo.getDatanodeDescriptor());
       } else {
         processAndHandleReportedBlock(rbi.getStorageInfo(),
@@ -2474,15 +2430,15 @@ public class BlockManager implements BlockStatsMXBean {
       case COMMITTED:
         if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
           final long reportedGS = reported.getGenerationStamp();
-          return new BlockToMarkCorrupt(new Block(reported), storedBlock, reportedGS,
+          return new BlockToMarkCorrupt(storedBlock, reportedGS,
               "block is " + ucState + " and reported genstamp " + reportedGS
-                  + " does not match genstamp in block map "
-                  + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
+              + " does not match genstamp in block map "
+              + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
         } else if (storedBlock.getNumBytes() != reported.getNumBytes()) {
-          return new BlockToMarkCorrupt(new Block(reported), storedBlock,
+          return new BlockToMarkCorrupt(storedBlock,
               "block is " + ucState + " and reported length " +
-                  reported.getNumBytes() + " does not match " +
-                  "length in block map " + storedBlock.getNumBytes(),
+              reported.getNumBytes() + " does not match " +
+              "length in block map " + storedBlock.getNumBytes(),
               Reason.SIZE_MISMATCH);
         } else {
           return null; // not corrupt
@@ -2490,12 +2446,11 @@ public class BlockManager implements BlockStatsMXBean {
       case UNDER_CONSTRUCTION:
         if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) {
           final long reportedGS = reported.getGenerationStamp();
-          return new BlockToMarkCorrupt(new Block(reported), storedBlock,
-              reportedGS, "block is " + ucState + " and reported state "
-              + reportedState + ", But reported genstamp " + reportedGS
+          return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is "
+              + ucState + " and reported state " + reportedState
+              + ", But reported genstamp " + reportedGS
               + " does not match genstamp in block map "
-              + storedBlock.getGenerationStamp(),
-              Reason.GENSTAMP_MISMATCH);
+              + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
         }
         return null;
       default:
@@ -2505,15 +2460,12 @@ public class BlockManager implements BlockStatsMXBean {
     case RWR:
       if (!storedBlock.isComplete()) {
         return null; // not corrupt
-      } else if (storedBlock.getGenerationStamp() !=
-          reported.getGenerationStamp()) {
+      } else if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) {
         final long reportedGS = reported.getGenerationStamp();
-        return new BlockToMarkCorrupt(
-            new Block(reported), storedBlock, reportedGS,
-            "reported " + reportedState +
-                " replica with genstamp " + reportedGS +
-                " does not match COMPLETE block's genstamp in block map " +
-                storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
+        return new BlockToMarkCorrupt(storedBlock, reportedGS,
+            "reported " + reportedState + " replica with genstamp " + reportedGS
+            + " does not match COMPLETE block's genstamp in block map "
+            + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
       } else { // COMPLETE block, same genstamp
         if (reportedState == ReplicaState.RBW) {
           // If it's a RBW report for a COMPLETE block, it may just be that
@@ -2525,7 +2477,7 @@ public class BlockManager implements BlockStatsMXBean {
               "complete with the same genstamp");
           return null;
         } else {
-          return new BlockToMarkCorrupt(new Block(reported), storedBlock,
+          return new BlockToMarkCorrupt(storedBlock,
               "reported replica has invalid state " + reportedState,
               Reason.INVALID_STATE);
         }
@@ -2538,8 +2490,7 @@ public class BlockManager implements BlockStatsMXBean {
       " on " + dn + " size " + storedBlock.getNumBytes();
       // log here at WARN level since this is really a broken HDFS invariant
       LOG.warn(msg);
-      return new BlockToMarkCorrupt(new Block(reported), storedBlock, msg,
-          Reason.INVALID_STATE);
+      return new BlockToMarkCorrupt(storedBlock, msg, Reason.INVALID_STATE);
     }
   }
 
@@ -2572,7 +2523,7 @@ public class BlockManager implements BlockStatsMXBean {
 
     if (ucBlock.reportedState == ReplicaState.FINALIZED &&
         (block.findStorageInfo(storageInfo) < 0)) {
-      addStoredBlock(block, ucBlock.reportedBlock, storageInfo, null, true);
+      addStoredBlock(block, storageInfo, null, true);
     }
   } 
 
@@ -2587,23 +2538,23 @@ public class BlockManager implements BlockStatsMXBean {
    * 
    * @throws IOException
    */
-  private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported,
+  private void addStoredBlockImmediate(BlockInfo storedBlock,
       DatanodeStorageInfo storageInfo)
-      throws IOException {
+  throws IOException {
     assert (storedBlock != null && namesystem.hasWriteLock());
-    if (!namesystem.isInStartupSafeMode()
+    if (!namesystem.isInStartupSafeMode() 
         || namesystem.isPopulatingReplQueues()) {
-      addStoredBlock(storedBlock, reported, storageInfo, null, false);
+      addStoredBlock(storedBlock, storageInfo, null, false);
       return;
     }
 
     // just add it
-    AddBlockResult result = storageInfo.addBlock(storedBlock, reported);
+    AddBlockResult result = storageInfo.addBlock(storedBlock);
 
     // Now check for completion of blocks and safe block count
     int numCurrentReplica = countLiveNodes(storedBlock);
     if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
-        && hasMinStorage(storedBlock, numCurrentReplica)) {
+        && numCurrentReplica >= minReplication) {
       completeBlock(storedBlock.getBlockCollection(), storedBlock, false);
     } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
       // check whether safe replication is reached for the block
@@ -2617,20 +2568,19 @@ public class BlockManager implements BlockStatsMXBean {
   /**
    * Modify (block-->datanode) map. Remove block from set of
    * needed replications if this takes care of the problem.
-   * @return the block that is stored in blocksMap.
+   * @return the block that is stored in blockMap.
    */
   private Block addStoredBlock(final BlockInfo block,
-      final Block reportedBlock,
-      DatanodeStorageInfo storageInfo,
-      DatanodeDescriptor delNodeHint,
-      boolean logEveryBlock)
-      throws IOException {
+                               DatanodeStorageInfo storageInfo,
+                               DatanodeDescriptor delNodeHint,
+                               boolean logEveryBlock)
+  throws IOException {
     assert block != null && namesystem.hasWriteLock();
     BlockInfo storedBlock;
     DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
     if (block instanceof BlockInfoUnderConstruction) {
       //refresh our copy in case the block got completed in another thread
-      storedBlock = getStoredBlock(block);
+      storedBlock = blocksMap.getStoredBlock(block);
     } else {
       storedBlock = block;
     }
@@ -2644,9 +2594,10 @@ public class BlockManager implements BlockStatsMXBean {
       return block;
     }
     BlockCollection bc = storedBlock.getBlockCollection();
+    assert bc != null : "Block must belong to a file";
 
     // add block to the datanode
-    AddBlockResult result = storageInfo.addBlock(storedBlock, reportedBlock);
+    AddBlockResult result = storageInfo.addBlock(storedBlock);
 
     int curReplicaDelta;
     if (result == AddBlockResult.ADDED) {
@@ -2674,10 +2625,10 @@ public class BlockManager implements BlockStatsMXBean {
     NumberReplicas num = countNodes(storedBlock);
     int numLiveReplicas = num.liveReplicas();
     int numCurrentReplica = numLiveReplicas
-        + pendingReplications.getNumReplicas(storedBlock);
+      + pendingReplications.getNumReplicas(storedBlock);
 
     if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
-        hasMinStorage(storedBlock, numLiveReplicas)) {
+        numLiveReplicas >= minReplication) {
       storedBlock = completeBlock(bc, storedBlock, false);
     } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) {
       // check whether safe replication is reached for the block
@@ -2687,7 +2638,7 @@ public class BlockManager implements BlockStatsMXBean {
       // handles the safe block count maintenance.
       namesystem.incrementSafeBlockCount(numCurrentReplica);
     }
-
+    
     // if file is under construction, then done for now
     if (bc.isUnderConstruction()) {
       return storedBlock;
@@ -2699,7 +2650,7 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     // handle underReplication/overReplication
-    short fileReplication = getExpectedReplicaNum(bc, storedBlock);
+    short fileReplication = bc.getPreferredBlockReplication();
     if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) {
       neededReplications.remove(storedBlock, numCurrentReplica,
           num.decommissionedAndDecommissioning(), fileReplication);
@@ -2715,12 +2666,11 @@ public class BlockManager implements BlockStatsMXBean {
     int numCorruptNodes = num.corruptReplicas();
     if (numCorruptNodes != corruptReplicasCount) {
       LOG.warn("Inconsistent number of corrupt replicas for " +
-          storedBlock + ". blockMap has " + numCorruptNodes +
+          storedBlock + "blockMap has " + numCorruptNodes + 
           " but corrupt replicas map has " + corruptReplicasCount);
     }
-    if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) {
-      invalidateCorruptReplicas(storedBlock, reportedBlock);
-    }
+    if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication))
+      invalidateCorruptReplicas(storedBlock);
     return storedBlock;
   }
 
@@ -2752,7 +2702,7 @@ public class BlockManager implements BlockStatsMXBean {
    *
    * @param blk Block whose corrupt replicas need to be invalidated
    */
-  private void invalidateCorruptReplicas(BlockInfo blk, Block reported) {
+  private void invalidateCorruptReplicas(BlockInfo blk) {
     Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
     boolean removedFromBlocksMap = true;
     if (nodes == null)
@@ -2762,8 +2712,8 @@ public class BlockManager implements BlockStatsMXBean {
     DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
     for (DatanodeDescriptor node : nodesCopy) {
       try {
-        if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null,
-            Reason.ANY), node)) {
+        if (!invalidateBlock(new BlockToMarkCorrupt(blk, null,
+              Reason.ANY), node)) {
           removedFromBlocksMap = false;
         }
       } catch (IOException e) {
@@ -2931,7 +2881,7 @@ public class BlockManager implements BlockStatsMXBean {
     }
     // calculate current replication
     short expectedReplication =
-        getExpectedReplicaNum(block.getBlockCollection(), block);
+        block.getBlockCollection().getPreferredBlockReplication();
     NumberReplicas num = countNodes(block);
     int numCurrentReplica = num.liveReplicas();
     // add to under-replicated queue if need to be
@@ -2990,14 +2940,14 @@ public class BlockManager implements BlockStatsMXBean {
    * If there are any extras, call chooseExcessReplicates() to
    * mark them in the excessReplicateMap.
    */
-  private void processOverReplicatedBlock(final BlockInfo block,
+  private void processOverReplicatedBlock(final Block block,
       final short replication, final DatanodeDescriptor addedNode,
       DatanodeDescriptor delNodeHint) {
     assert namesystem.hasWriteLock();
     if (addedNode == delNodeHint) {
       delNodeHint = null;
     }
-    Collection<DatanodeStorageInfo> nonExcess = new ArrayList<>();
+    Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>();
     Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
         .getNodes(block);
     for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) {
@@ -3011,8 +2961,8 @@ public class BlockManager implements BlockStatsMXBean {
         postponeBlock(block);
         return;
       }
-      LightWeightLinkedSet<BlockInfo> excessBlocks = excessReplicateMap.get(
-          cur.getDatanodeUuid());
+      LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
+          .getDatanodeUuid());
       if (excessBlocks == null || !excessBlocks.contains(block)) {
         if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
           // exclude corrupt replicas
@@ -3022,7 +2972,7 @@ public class BlockManager implements BlockStatsMXBean {
         }
       }
     }
-    chooseExcessReplicates(nonExcess, block, replication,
+    chooseExcessReplicates(nonExcess, block, replication, 
         addedNode, delNodeHint, blockplacement);
   }
 
@@ -3041,29 +2991,29 @@ public class BlockManager implements BlockStatsMXBean {
    * If no such a node is available,
    * then pick a node with least free space
    */
-  private void chooseExcessReplicates(
-      final Collection<DatanodeStorageInfo> nonExcess,
-      BlockInfo storedBlock, short replication,
-      DatanodeDescriptor addedNode,
-      DatanodeDescriptor delNodeHint,
-      BlockPlacementPolicy replicator) {
+  private void chooseExcessReplicates(final Collection<DatanodeStorageInfo> nonExcess, 
+                              Block b, short replication,
+                              DatanodeDescriptor addedNode,
+                              DatanodeDescriptor delNodeHint,
+                              BlockPlacementPolicy replicator) {
     assert namesystem.hasWriteLock();
     // first form a rack to datanodes map and
-    BlockCollection bc = getBlockCollection(storedBlock);
-    final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(
-        bc.getStoragePolicyID());
+    BlockCollection bc = getBlockCollection(b);
+    final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(bc.getStoragePolicyID());
     final List<StorageType> excessTypes = storagePolicy.chooseExcess(
         replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
 
-    final Map<String, List<DatanodeStorageInfo>> rackMap = new HashMap<>();
-    final List<DatanodeStorageInfo> moreThanOne = new ArrayList<>();
-    final List<DatanodeStorageInfo> exactlyOne = new ArrayList<>();
 
+    final Map<String, List<DatanodeStorageInfo>> rackMap
+        = new HashMap<String, List<DatanodeStorageInfo>>();
+    final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
+    final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
+    
     // split nodes into two sets
     // moreThanOne contains nodes on rack with more than one replica
     // exactlyOne contains the remaining nodes
     replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne);
-
+    
     // pick one node to delete that favors the delete hint
     // otherwise pick one with least space from priSet if it is not empty
     // otherwise one node with least space from remains
@@ -3078,7 +3028,7 @@ public class BlockManager implements BlockStatsMXBean {
           moreThanOne, excessTypes)) {
         cur = delNodeHintStorage;
       } else { // regular excessive replica removal
-        cur = replicator.chooseReplicaToDelete(bc, storedBlock, replication,
+        cur = replicator.chooseReplicaToDelete(bc, b, replication,
             moreThanOne, exactlyOne, excessTypes);
       }
       firstOne = false;
@@ -3087,27 +3037,22 @@ public class BlockManager implements BlockStatsMXBean {
       replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne,
           exactlyOne, cur);
 
-      processChosenExcessReplica(nonExcess, cur, storedBlock);
-    }
-  }
+      nonExcess.remove(cur);
+      addToExcessReplicate(cur.getDatanodeDescriptor(), b);
 
-  private void processChosenExcessReplica(
-      final Collection<DatanodeStorageInfo> nonExcess,
-      final DatanodeStorageInfo chosen, BlockInfo storedBlock) {
-    nonExcess.remove(chosen);
-    addToExcessReplicate(chosen.getDatanodeDescriptor(), storedBlock);
-    //
-    // The 'excessblocks' tracks blocks until we get confirmation
-    // that the datanode has deleted them; the only way we remove them
-    // is when we get a "removeBlock" message.
-    //
-    // The 'invalidate' list is used to inform the datanode the block
-    // should be deleted.  Items are removed from the invalidate list
-    // upon giving instructions to the datanodes.
-    //
-    addToInvalidates(storedBlock, chosen.getDatanodeDescriptor());
-    blockLog.debug("BLOCK* chooseExcessReplicates: "
-        +"({}, {}) is added to invalidated blocks set", chosen, storedBlock);
+      //
+      // The 'excessblocks' tracks blocks until we get confirmation
+      // that the datanode has deleted them; the only way we remove them
+      // is when we get a "removeBlock" message.  
+      //
+      // The 'invalidate' list is used to inform the datanode the block 
+      // should be deleted.  Items are removed from the invalidate list
+      // upon giving instructions to the namenode.
+      //
+      addToInvalidates(b, cur.getDatanodeDescriptor());
+      blockLog.debug("BLOCK* chooseExcessReplicates: "
+                +"({}, {}) is added to invalidated blocks set", cur, b);
+    }
   }
 
   /** Check if we can use delHint */
@@ -3131,18 +3076,17 @@ public class BlockManager implements BlockStatsMXBean {
     }
   }
 
-  private void addToExcessReplicate(DatanodeInfo dn, BlockInfo storedBlock) {
+  private void addToExcessReplicate(DatanodeInfo dn, Block block) {
     assert namesystem.hasWriteLock();
-    LightWeightLinkedSet<BlockInfo> excessBlocks = excessReplicateMap.get(
-        dn.getDatanodeUuid());
+    LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(dn.getDatanodeUuid());
     if (excessBlocks == null) {
-      excessBlocks = new LightWeightLinkedSet<>();
+      excessBlocks = new LightWeightLinkedSet<Block>();
       excessReplicateMap.put(dn.getDatanodeUuid(), excessBlocks);
     }
-    if (excessBlocks.add(storedBlock)) {
+    if (excessBlocks.add(block)) {
       excessBlocksCount.incrementAndGet();
       blockLog.debug("BLOCK* addToExcessReplicate: ({}, {}) is added to"
-          + " excessReplicateMap", dn, storedBlock);
+          + " excessReplicateMap", dn, block);
     }
   }
 
@@ -3154,26 +3098,26 @@ public class BlockManager implements BlockStatsMXBean {
           QUEUE_REASON_FUTURE_GENSTAMP);
       return;
     }
-    removeStoredBlock(getStoredBlock(block), node);
+    removeStoredBlock(block, node);
   }
 
   /**
    * Modify (block-->datanode) map. Possibly generate replication tasks, if the
    * removed block is still valid.
    */
-  public void removeStoredBlock(BlockInfo storedBlock,
-      DatanodeDescriptor node) {
-    blockLog.debug("BLOCK* removeStoredBlock: {} from {}", storedBlock, node);
+  public void removeStoredBlock(Block block, DatanodeDescriptor node) {
+    blockLog.debug("BLOCK* removeStoredBlock: {} from {}", block, node);
     assert (namesystem.hasWriteLock());
     {
+      BlockInfo storedBlock = getStoredBlock(block);
       if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) {
         blockLog.debug("BLOCK* removeStoredBlock: {} has already been" +
-            " removed from node {}", storedBlock, node);
+            " removed from node {}", block, node);
         return;
       }
 
       CachedBlock cblock = namesystem.getCacheManager().getCachedBlocks()
-          .get(new CachedBlock(storedBlock.getBlockId(), (short) 0, false));
+          .get(new CachedBlock(block.getBlockId(), (short) 0, false));
       if (cblock != null) {
         boolean removed = false;
         removed |= node.getPendingCached().remove(cblock);
@@ -3181,7 +3125,7 @@ public class BlockManager implements BlockStatsMXBean {
         removed |= node.getPendingUncached().remove(cblock);
         if (removed) {
           blockLog.debug("BLOCK* removeStoredBlock: {} removed from caching "
-              + "related lists on node {}", storedBlock, node);
+              + "related lists on node {}", block, node);
         }
       }
 
@@ -3191,7 +3135,7 @@ public class BlockManager implements BlockStatsMXBean {
       // necessary. In that case, put block on a possibly-will-
       // be-replicated list.
       //
-      BlockCollection bc = storedBlock.getBlockCollection();
+      BlockCollection bc = blocksMap.getBlockCollection(block);
       if (bc != null) {
         namesystem.decrementSafeBlockCount(storedBlock);
         updateNeededReplications(storedBlock, -1, 0);
@@ -3201,13 +3145,13 @@ public class BlockManager implements BlockStatsMXBean {
       // We've removed a block from a node, so it's definitely no longer
       // in "excess" there.
       //
-      LightWeightLinkedSet<BlockInfo> excessBlocks = excessReplicateMap.get(
-          node.getDatanodeUuid());
+      LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(node
+          .getDatanodeUuid());
       if (excessBlocks != null) {
-        if (excessBlocks.remove(storedBlock)) {
+        if (excessBlocks.remove(block)) {
           excessBlocksCount.decrementAndGet();
           blockLog.debug("BLOCK* removeStoredBlock: {} is removed from " +
-              "excessBlocks", storedBlock);
+              "excessBlocks", block);
           if (excessBlocks.size() == 0) {
             excessReplicateMap.remove(node.getDatanodeUuid());
           }
@@ -3215,7 +3159,7 @@ public class BlockManager implements BlockStatsMXBean {
       }
 
       // Remove the replica from corruptReplicas
-      corruptReplicas.removeFromCorruptReplicasMap(storedBlock, node);
+      corruptReplicas.removeFromCorruptReplicasMap(block, node);
     }
   }
 
@@ -3223,7 +3167,7 @@ public class BlockManager implements BlockStatsMXBean {
    * Get all valid locations of the block & add the block to results
    * return the length of the added block; 0 if the block is not added
    */
-  private long addBlock(BlockInfo block, List<BlockWithLocations> results) {
+  private long addBlock(Block block, List<BlockWithLocations> results) {
     final List<DatanodeStorageInfo> locations = getValidLocations(block);
     if(locations.size() == 0) {
       return 0;
@@ -3275,32 +3219,31 @@ public class BlockManager implements BlockStatsMXBean {
     processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
         delHintNode);
   }
-
+  
   private void processAndHandleReportedBlock(
       DatanodeStorageInfo storageInfo, Block block,
       ReplicaState reportedState, DatanodeDescriptor delHintNode)
       throws IOException {
     // blockReceived reports a finalized block
-    Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
-    Collection<Block> toInvalidate = new LinkedList<>();
-    Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<>();
-    Collection<StatefulBlockInfo> toUC = new LinkedList<>();
+    Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
+    Collection<Block> toInvalidate = new LinkedList<Block>();
+    Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
+    Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
     final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor();
 
-    processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate,
-        toCorrupt, toUC);
+    processReportedBlock(storageInfo, block, reportedState,
+                              toAdd, toInvalidate, toCorrupt, toUC);
     // the block is only in one of the to-do lists
     // if it is in none then data-node already has it
     assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1
-        : "The block should be only in one of the lists.";
+      : "The block should be only in one of the lists.";
 
-    for (StatefulBlockInfo b : toUC) {
+    for (StatefulBlockInfo b : toUC) { 
       addStoredBlockUnderConstruction(b, storageInfo);
     }
     long numBlocksLogged = 0;
-    for (BlockInfoToAdd b : toAdd) {
-      addStoredBlock(b.getStored(), b.getReported(), storageInfo, delHintNode,
-          numBlocksLogged < maxNumBlocksToLog);
+    for (BlockInfo b : toAdd) {
+      addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog);
       numBlocksLogged++;
     }
     if (numBlocksLogged > maxNumBlocksToLog) {
@@ -3365,7 +3308,7 @@ public class BlockManager implements BlockStatsMXBean {
                                       ReplicaState.RBW, null);
         break;
       default:
-        String msg =
+        String msg = 
           "Unknown block status code reported by " + nodeID +
           ": " + rdbi;
         blockLog.warn(msg);
@@ -3401,8 +3344,8 @@ public class BlockManager implements BlockStatsMXBean {
       } else if (node.isDecommissioned()) {
         decommissioned++;
       } else {
-        LightWeightLinkedSet<BlockInfo> blocksExcess =
-            excessReplicateMap.get(node.getDatanodeUuid());
+        LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
+            .getDatanodeUuid());
         if (blocksExcess != null && blocksExcess.contains(b)) {
           excess++;
         } else {
@@ -3455,13 +3398,13 @@ public class BlockManager implements BlockStatsMXBean {
     int numOverReplicated = 0;
     while(it.hasNext()) {
       final BlockInfo block = it.next();
-      int expectedReplication = this.getReplication(block);
+      BlockCollection bc = blocksMap.getBlockCollection(block);
+      short expectedReplication = bc.getPreferredBlockReplication();
       NumberReplicas num = countNodes(block);
       int numCurrentReplica = num.liveReplicas();
       if (numCurrentReplica > expectedReplication) {
         // over-replicated block 
-        processOverReplicatedBlock(block, (short) expectedReplication, null,
-            null);
+        processOverReplicatedBlock(block, expectedReplication, null, null);
         numOverReplicated++;
       }
     }
@@ -3487,7 +3430,7 @@ public class BlockManager implements BlockStatsMXBean {
     if (pendingReplicationBlocksCount == 0 &&
         underReplicatedBlocksCount == 0) {
       LOG.info("Node {} is dead and there are no under-replicated" +
-          " blocks or blocks pending replication. Safe to decommission.",
+          " blocks or blocks pending replication. Safe to decommission.", 
           node);
       return true;
     }
@@ -3505,12 +3448,6 @@ public class BlockManager implements BlockStatsMXBean {
     return blocksMap.size();
   }
 
-
-  /** @return an iterator of the datanodes. */
-  public Iterable<DatanodeStorageInfo> getStorages(final Block block) {
-    return blocksMap.getStorages(block);
-  }
-
   public DatanodeStorageInfo[] getStorages(BlockInfo block) {
     final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[block.numNodes()];
     int i = 0;
@@ -3596,13 +3533,13 @@ public class BlockManager implements BlockStatsMXBean {
       String src, BlockInfo[] blocks) {
     for (BlockInfo b: blocks) {
       if (!b.isComplete()) {
+        final BlockInfoUnderConstruction uc =
+            (BlockInfoUnderConstruction)b;
         final int numNodes = b.numNodes();
-        final int min = getMinStorageNum(b);
-        final BlockUCState state = b.getBlockUCState();
-        LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = " + state
-            + ", replication# = " + numNodes
-            + (numNodes < min ? " < " : " >= ")
-            + " minimum = " + min + ") in file " + src);
+        LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = "
+          + uc.getBlockUCState() + ", replication# = " + numNodes
+          + (numNodes < minReplication ? " < ": " >= ")
+          + " minimum = " + minReplication + ") in file " + src);
         return false;
       }
     }
@@ -3613,15 +3550,15 @@ public class BlockManager implements BlockStatsMXBean {
    * @return 0 if the block is not found;
    *         otherwise, return the replication factor of the block.
    */
-  private int getReplication(BlockInfo block) {
+  private int getReplication(Block block) {
     final BlockCollection bc = blocksMap.getBlockCollection(block);
-    return bc == null? 0: getExpectedReplicaNum(bc, block);
+    return bc == null? 0: bc.getPreferredBlockReplication();
   }
 
 
   /**
-   * Get blocks to invalidate for <i>nodeId</i>.
-   * in {@link #invalidateBlocks}.boolean blockHasEnoughRacks
+   * Get blocks to invalidate for <i>nodeId</i>
+   * in {@link #invalidateBlocks}.
    *
    * @return number of blocks scheduled for removal during this iteration.
    */
@@ -3659,20 +3596,22 @@ public class BlockManager implements BlockStatsMXBean {
     return toInvalidate.size();
   }
 
-  boolean blockHasEnoughRacks(BlockInfo storedBlock, int expectedStorageNum) {
+  boolean blockHasEnoughRacks(Block b) {
     if (!this.shouldCheckForEnoughRacks) {
       return true;
     }
-    boolean enoughRacks = false;
-    Collection<DatanodeDescriptor> corruptNodes =
-        corruptReplicas.getNodes(storedBlock);
+    boolean enoughRacks = false;;
+    Collection<DatanodeDescriptor> corruptNodes = 
+                                  corruptReplicas.getNodes(b);
+    int numExpectedReplicas = getReplication(b);
     String rackName = null;
-    for(DatanodeStorageInfo storage : getStorages(storedBlock)) {
+    for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
       final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
       if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
         if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
-          if (expectedStorageNum == 1 || (expectedStorageNum > 1 &&
-              !datanodeManager.hasClusterEverBeenMultiRack())) {
+          if (numExpectedReplicas == 1 ||
+              (numExpectedReplicas > 1 &&
+                  !datanodeManager.hasClusterEverBeenMultiRack())) {
             enoughRacks = true;
             break;
           }
@@ -3693,13 +3632,8 @@ public class BlockManager implements BlockStatsMXBean {
    * A block needs replication if the number of replicas is less than expected
    * or if it does not have enough racks.
    */
-  boolean isNeededReplication(BlockInfo storedBlock, int expected,
-      int current) {
-    return current < expected || !blockHasEnoughRacks(storedBlock, expected);
-  }
-
-  public short getExpectedReplicaNum(BlockCollection bc, BlockInfo block) {
-    return bc.getPreferredBlockReplication();
+  boolean isNeededReplication(Block b, int expected, int current) {
+    return current < expected || !blockHasEnoughRacks(b);
   }
   
   public long getMissingBlocksCount() {
@@ -3721,6 +3655,11 @@ public class BlockManager implements BlockStatsMXBean {
     return blocksMap.getBlockCollection(b);
   }
 
+  /** @return an iterator of the datanodes. */
+  public Iterable<DatanodeStorageInfo> getStorages(final Block block) {
+    return blocksMap.getStorages(block);
+  }
+
   public int numCorruptReplicas(Block block) {
     return corruptReplicas.numCorruptReplicas(block);
   }
@@ -3736,10 +3675,9 @@ public class BlockManager implements BlockStatsMXBean {
    * If a block is removed from blocksMap, remove it from excessReplicateMap.
    */
   private void removeFromExcessReplicateMap(Block block) {
-    for (DatanodeStorageInfo info : getStorages(block)) {
+    for (DatanodeStorageInfo info : blocksMap.getStorages(block)) {
       String uuid = info.getDatanodeDescriptor().getDatanodeUuid();
-      LightWeightLinkedSet<BlockInfo> excessReplicas =
-          excessReplicateMap.get(uuid);
+      LightWeightLinkedSet<Block> excessReplicas = excessReplicateMap.get(uuid);
       if (excessReplicas != null) {
         if (excessReplicas.remove(block)) {
           excessBlocksCount.decrementAndGet();
@@ -3928,7 +3866,7 @@ public class BlockManager implements BlockStatsMXBean {
 
   /**
    * A simple result enum for the result of
-   * {@link BlockManager#processMisReplicatedBlock}.
+   * {@link BlockManager#processMisReplicatedBlock(BlockInfo)}.
    */
   enum MisReplicationResult {
     /** The block should be invalidated since it belongs to a deleted file. */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/663eba0a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 92841a6..216d6d2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -24,7 +24,6 @@ import java.util.List;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
@@ -234,7 +233,7 @@ public class DatanodeStorageInfo {
     return blockPoolUsed;
   }
 
-  public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) {
+  public AddBlockResult addBlock(BlockInfo b) {
     // First check whether the block belongs to a different storage
     // on the same DN.
     AddBlockResult result = AddBlockResult.ADDED;
@@ -253,18 +252,10 @@ public class DatanodeStorageInfo {
     }
 
     // add to the head of the data-node list
-    b.addStorage(this, reportedBlock);
-    insertToList(b);
-    return result;
-  }
-
-  AddBlockResult addBlock(BlockInfo b) {
-    return addBlock(b, b);
-  }
-
-  public void insertToList(BlockInfo b) {
+    b.addStorage(this);
     blockList = b.listInsert(blockList, this);
     numBlocks++;
+    return result;
   }
 
   public boolean removeBlock(BlockInfo b) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/663eba0a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index e3717ab..3d176b0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -142,6 +142,7 @@ import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
@@ -2790,7 +2791,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       if (trackBlockCounts) {
         if (b.isComplete()) {
           numRemovedComplete++;
-          if (blockManager.hasMinStorage(b)) {
+          if (blockManager.checkMinReplication(b)) {
             numRemovedSafe++;
           }
         }
@@ -3022,7 +3023,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       curBlock = blocks[nrCompleteBlocks];
       if(!curBlock.isComplete())
         break;
-      assert blockManager.hasMinStorage(curBlock) :
+      assert blockManager.checkMinReplication(curBlock) :
               "A COMPLETE block is not minimally replicated in " + src;
     }
 
@@ -3058,7 +3059,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
     // If penultimate block doesn't exist then its minReplication is met
     boolean penultimateBlockMinReplication = penultimateBlock == null ? true :
-        blockManager.hasMinStorage(penultimateBlock);
+        blockManager.checkMinReplication(penultimateBlock);
 
     switch(lastBlockState) {
     case COMPLETE:
@@ -3067,7 +3068,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     case COMMITTED:
       // Close file if committed blocks are minimally replicated
       if(penultimateBlockMinReplication &&
-          blockManager.hasMinStorage(lastBlock)) {
+          blockManager.checkMinReplication(lastBlock)) {
         finalizeINodeFileUnderConstruction(src, pendingFile,
             iip.getLatestSnapshotId());
         NameNode.stateChangeLog.warn("BLOCK*"
@@ -3359,9 +3360,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
                 trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i));
             if (storageInfo != null) {
               if(copyTruncate) {
-                storageInfo.addBlock(truncatedBlock, truncatedBlock);
+                storageInfo.addBlock(truncatedBlock);
               } else {
-                storageInfo.addBlock(storedBlock, storedBlock);
+                storageInfo.addBlock(storedBlock);
               }
             }
           }
@@ -3377,9 +3378,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         } else {
           iFile.setLastBlock(storedBlock, trimmedStorageInfos);
           if (closeFile) {
-            blockManager.markBlockReplicasAsCorrupt(oldBlock.getLocalBlock(),
-                storedBlock, oldGenerationStamp, oldNumBytes,
-                trimmedStorageInfos);
+            blockManager.markBlockReplicasAsCorrupt(storedBlock,
+                oldGenerationStamp, oldNumBytes, trimmedStorageInfos);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/663eba0a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
index ab179b4..7d4cd7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
@@ -647,7 +647,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
                   .getStorageType()));
             }
             if (showReplicaDetails) {
-              LightWeightLinkedSet<BlockInfo> blocksExcess =
+              LightWeightLinkedSet<Block> blocksExcess =
                   bm.excessReplicateMap.get(dnDesc.getDatanodeUuid());
               Collection<DatanodeDescriptor> corruptReplicas =
                   bm.getCorruptReplicas(block.getLocalBlock());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/663eba0a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
index bae4f1d..5126aa7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
@@ -63,7 +63,7 @@ public class TestBlockInfo {
 
     final DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo("storageID", "127.0.0.1");
 
-    boolean added = blockInfo.addStorage(storage, blockInfo);
+    boolean added = blockInfo.addStorage(storage);
 
     Assert.assertTrue(added);
     Assert.assertEquals(storage, blockInfo.getStorageInfo(0));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/663eba0a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 9e31670..396dff3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -383,7 +383,7 @@ public class TestBlockManager {
     for (int i = 1; i < pipeline.length; i++) {
       DatanodeStorageInfo storage = pipeline[i];
       bm.addBlock(storage, blockInfo, null);
-      blockInfo.addStorage(storage, blockInfo);
+      blockInfo.addStorage(storage);
     }
   }
 
@@ -393,7 +393,7 @@ public class TestBlockManager {
 
     for (DatanodeDescriptor dn : nodes) {
       for (DatanodeStorageInfo storage : dn.getStorageInfos()) {
-        blockInfo.addStorage(storage, blockInfo);
+        blockInfo.addStorage(storage);
       }
     }
     return blockInfo;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/663eba0a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java
index c33667d..1c3f075 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java
@@ -100,7 +100,7 @@ public class TestNodeCount {
       DatanodeDescriptor nonExcessDN = null;
       for(DatanodeStorageInfo storage : bm.blocksMap.getStorages(block.getLocalBlock())) {
         final DatanodeDescriptor dn = storage.getDatanodeDescriptor();
-        Collection<BlockInfo> blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid());
+        Collection<Block> blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid());
         if (blocks == null || !blocks.contains(block.getLocalBlock()) ) {
           nonExcessDN = dn;
           break;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/663eba0a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
index 83b3aa0..2d7bb44 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -41,6 +42,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.util.Time;
 import org.junit.Test;
 
 public class TestOverReplicatedBlocks {
@@ -183,7 +185,7 @@ public class TestOverReplicatedBlocks {
       // All replicas for deletion should be scheduled on lastDN.
       // And should not actually be deleted, because lastDN does not heartbeat.
       namesystem.readLock();
-      Collection<BlockInfo> dnBlocks =
+      Collection<Block> dnBlocks = 
         namesystem.getBlockManager().excessReplicateMap.get(lastDNid);
       assertEquals("Replicas on node " + lastDNid + " should have been deleted",
           SMALL_FILE_LENGTH / SMALL_BLOCK_SIZE, dnBlocks.size());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/663eba0a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
index 44f0e65..2812957 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
@@ -1250,7 +1250,7 @@ public class TestReplicationPolicy {
     when(storage.removeBlock(any(BlockInfo.class))).thenReturn(true);
     when(storage.addBlock(any(BlockInfo.class))).thenReturn
         (DatanodeStorageInfo.AddBlockResult.ADDED);
-    ucBlock.addStorage(storage, ucBlock);
+    ucBlock.addStorage(storage);
 
     when(mbc.setLastBlock((BlockInfo) any(), (DatanodeStorageInfo[]) any()))
     .thenReturn(ucBlock);


Mime
View raw message