hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject hadoop git commit: HDFS-4366. Block Replication Policy Implementation May Skip Higher-Priority Blocks for Lower-Priority Blocks. (Derek Dagit and Zhe Zhang via wang)
Date Mon, 22 Jun 2015 21:18:41 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 c4747158b -> cd1e0930d


HDFS-4366. Block Replication Policy Implementation May Skip Higher-Priority Blocks for Lower-Priority
Blocks. (Derek Dagit and Zhe Zhang via wang)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cd1e0930
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cd1e0930
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cd1e0930

Branch: refs/heads/branch-2
Commit: cd1e0930dccf248b98dea21fbe12be23f03619fe
Parents: c474715
Author: Andrew Wang <wang@apache.org>
Authored: Mon Jun 22 14:18:24 2015 -0700
Committer: Andrew Wang <wang@apache.org>
Committed: Mon Jun 22 14:18:33 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../server/blockmanagement/BlockManager.java    |   5 -
 .../blockmanagement/UnderReplicatedBlocks.java  |  66 +++----
 .../hadoop/hdfs/util/LightWeightLinkedSet.java  |  34 ++++
 .../blockmanagement/TestReplicationPolicy.java  | 195 +++++++++++++++++++
 .../hdfs/util/TestLightWeightLinkedSet.java     |  69 +++++++
 6 files changed, 331 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd1e0930/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 6e7030e..5f2fcf2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -597,6 +597,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8337. Accessing httpfs via webhdfs doesn't work from a jar with
     kerberos. (Yongjun Zhang)
 
+    HDFS-4366. Block Replication Policy Implementation May Skip Higher-Priority
+    Blocks for Lower-Priority Blocks (Derek Dagit via kihwal)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd1e0930/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 7c2a0bc..53a6868 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1347,7 +1347,6 @@ public class BlockManager {
             // abandoned block or block reopened for append
             if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock())))
{
               neededReplications.remove(block, priority); // remove from neededReplications
-              neededReplications.decrementReplicationIndex(priority);
               continue;
             }
 
@@ -1377,7 +1376,6 @@ public class BlockManager {
               if ( (pendingReplications.getNumReplicas(block) > 0) ||
                    (blockHasEnoughRacks(block)) ) {
                 neededReplications.remove(block, priority); // remove from neededReplications
-                neededReplications.decrementReplicationIndex(priority);
                 blockLog.info("BLOCK* Removing {} from neededReplications as" +
                         " it has enough replicas", block);
                 continue;
@@ -1434,7 +1432,6 @@ public class BlockManager {
           if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock())))
{
             neededReplications.remove(block, priority); // remove from neededReplications
             rw.targets = null;
-            neededReplications.decrementReplicationIndex(priority);
             continue;
           }
           requiredReplication = bc.getPreferredBlockReplication();
@@ -1448,7 +1445,6 @@ public class BlockManager {
             if ( (pendingReplications.getNumReplicas(block) > 0) ||
                  (blockHasEnoughRacks(block)) ) {
               neededReplications.remove(block, priority); // remove from neededReplications
-              neededReplications.decrementReplicationIndex(priority);
               rw.targets = null;
               blockLog.info("BLOCK* Removing {} from neededReplications as" +
                       " it has enough replicas", block);
@@ -1481,7 +1477,6 @@ public class BlockManager {
           // remove from neededReplications
           if(numEffectiveReplicas + targets.length >= requiredReplication) {
             neededReplications.remove(block, priority); // remove from neededReplications
-            neededReplications.decrementReplicationIndex(priority);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd1e0930/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
index 0240805..49bc1fe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
@@ -18,12 +18,9 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 
-import java.util.Map;
-
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -82,11 +79,9 @@ class UnderReplicatedBlocks implements Iterable<Block> {
   /** The queue for corrupt blocks: {@value} */
   static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;
   /** the queues themselves */
-  private final List<LightWeightLinkedSet<Block>> priorityQueues
-      = new ArrayList<LightWeightLinkedSet<Block>>();
+  private List<LightWeightLinkedSet<Block>> priorityQueues
+      = new ArrayList<LightWeightLinkedSet<Block>>(LEVEL);
 
-  /** Stores the replication index for each priority */
-  private Map<Integer, Integer> priorityToReplIdx = new HashMap<Integer, Integer>(LEVEL);
   /** The number of corrupt blocks with replication factor 1 */
   private int corruptReplOneBlocks = 0;
 
@@ -94,7 +89,6 @@ class UnderReplicatedBlocks implements Iterable<Block> {
   UnderReplicatedBlocks() {
     for (int i = 0; i < LEVEL; i++) {
       priorityQueues.add(new LightWeightLinkedSet<Block>());
-      priorityToReplIdx.put(i, 0);
     }
   }
 
@@ -328,16 +322,18 @@ class UnderReplicatedBlocks implements Iterable<Block> {
       }
     }
   }
-  
+
   /**
    * Get a list of block lists to be replicated. The index of block lists
-   * represents its replication priority. Replication index will be tracked for
-   * each priority list separately in priorityToReplIdx map. Iterates through
-   * all priority lists and find the elements after replication index. Once the
-   * last priority lists reaches to end, all replication indexes will be set to
-   * 0 and start from 1st priority list to fulfill the blockToProces count.
-   * 
-   * @param blocksToProcess - number of blocks to fetch from underReplicated blocks.
+   * represents its replication priority. Iterates each block list in priority
+   * order beginning with the highest priority list. Iterators use a bookmark to
+   * resume where the previous iteration stopped. Returns when the block count
+   * is met or iteration reaches the end of the lowest priority list, in which
+   * case bookmarks for each block list are reset to the heads of their
+   * respective lists.
+   *
+   * @param blocksToProcess - number of blocks to fetch from underReplicated
+   *                        blocks.
    * @return Return a list of block lists to be replicated. The block list index
    *         represents its replication priority.
    */
@@ -357,12 +353,8 @@ class UnderReplicatedBlocks implements Iterable<Block> {
     for (int priority = 0; priority < LEVEL; priority++) { 
       // Go through all blocks that need replications with current priority.
       BlockIterator neededReplicationsIterator = iterator(priority);
-      Integer replIndex = priorityToReplIdx.get(priority);
-      
-      // skip to the first unprocessed block, which is at replIndex
-      for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++)
{
-        neededReplicationsIterator.next();
-      }
+      // Set the iterator to the first unprocessed block at this priority level.
+       neededReplicationsIterator.setToBookmark();
 
       blocksToProcess = Math.min(blocksToProcess, size());
       
@@ -375,20 +367,18 @@ class UnderReplicatedBlocks implements Iterable<Block> {
           && neededReplicationsIterator.hasNext()) {
         Block block = neededReplicationsIterator.next();
         blocksToReplicate.get(priority).add(block);
-        replIndex++;
         blockCount++;
       }
       
       if (!neededReplicationsIterator.hasNext()
           && neededReplicationsIterator.getPriority() == LEVEL - 1) {
-        // reset all priorities replication index to 0 because there is no
-        // recently added blocks in any list.
+        // Reset all priorities' bookmarks to the beginning because there were
+        // no recently added blocks in any list.
         for (int i = 0; i < LEVEL; i++) {
-          priorityToReplIdx.put(i, 0);
+          this.priorityQueues.get(i).resetBookmark();
         }
         break;
       }
-      priorityToReplIdx.put(priority, replIndex); 
     }
     return blocksToReplicate;
   }
@@ -471,15 +461,19 @@ class UnderReplicatedBlocks implements Iterable<Block> {
     int getPriority() {
       return level;
     }
-  }
 
-  /**
-   * This method is to decrement the replication index for the given priority
-   * 
-   * @param priority  - int priority level
-   */
-  public void decrementReplicationIndex(int priority) {
-    Integer replIdx = priorityToReplIdx.get(priority);
-    priorityToReplIdx.put(priority, --replIdx); 
+    /**
+     * Sets iterator(s) to bookmarked elements.
+     */
+    private synchronized void setToBookmark() {
+      if (this.isIteratorForLevel) {
+        this.iterators.set(0, priorityQueues.get(this.level)
+            .getBookmark());
+      } else {
+        for (int i = 0; i < LEVEL; i++) {
+          this.iterators.set(i, priorityQueues.get(i).getBookmark());
+        }
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd1e0930/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightLinkedSet.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightLinkedSet.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightLinkedSet.java
index f470cdd..dbd615c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightLinkedSet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightLinkedSet.java
@@ -56,6 +56,8 @@ public class LightWeightLinkedSet<T> extends LightWeightHashSet<T>
{
   private DoubleLinkedElement<T> head;
   private DoubleLinkedElement<T> tail;
 
+  private LinkedSetIterator bookmark;
+
   /**
    * @param initCapacity
    *          Recommended size of the internal array.
@@ -69,6 +71,7 @@ public class LightWeightLinkedSet<T> extends LightWeightHashSet<T>
{
     super(initCapacity, maxLoadFactor, minLoadFactor);
     head = null;
     tail = null;
+    bookmark = new LinkedSetIterator();
   }
 
   public LightWeightLinkedSet() {
@@ -111,6 +114,12 @@ public class LightWeightLinkedSet<T> extends LightWeightHashSet<T>
{
     tail = le;
     if (head == null) {
       head = le;
+      bookmark.next = head;
+    }
+
+    // Update bookmark, if necessary.
+    if (bookmark.next == null) {
+      bookmark.next = le;
     }
     return true;
   }
@@ -141,6 +150,11 @@ public class LightWeightLinkedSet<T> extends LightWeightHashSet<T>
{
     if (tail == found) {
       tail = tail.before;
     }
+
+    // Update bookmark, if necessary.
+    if (found == this.bookmark.next) {
+      this.bookmark.next = found.after;
+    }
     return found;
   }
 
@@ -262,5 +276,25 @@ public class LightWeightLinkedSet<T> extends LightWeightHashSet<T>
{
     super.clear();
     this.head = null;
     this.tail = null;
+    this.resetBookmark();
+  }
+
+  /**
+   * Returns a new iterator starting at the bookmarked element.
+   *
+   * @return the iterator to the bookmarked element.
+   */
+  public Iterator<T> getBookmark() {
+    LinkedSetIterator toRet = new LinkedSetIterator();
+    toRet.next = this.bookmark.next;
+    this.bookmark = toRet;
+    return toRet;
+  }
+
+  /**
+   * Resets the bookmark to the beginning of the list.
+   */
+  public void resetBookmark() {
+    this.bookmark.next = this.head;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd1e0930/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
index cc2b732..0557655 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -38,6 +39,7 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -50,6 +52,7 @@ import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.StatefulBlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -1100,4 +1103,196 @@ public class TestReplicationPolicy {
     exception.expect(IllegalArgumentException.class);
     blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
   }
+
+  @Test(timeout = 60000)
+  public void testUpdateDoesNotCauseSkippedReplication() {
+    UnderReplicatedBlocks underReplicatedBlocks = new UnderReplicatedBlocks();
+
+    Block block1 = new Block(ThreadLocalRandom.current().nextLong());
+    Block block2 = new Block(ThreadLocalRandom.current().nextLong());
+    Block block3 = new Block(ThreadLocalRandom.current().nextLong());
+
+    // Adding QUEUE_VERY_UNDER_REPLICATED block
+    final int block1CurReplicas = 2;
+    final int block1ExpectedReplicas = 7;
+    underReplicatedBlocks.add(block1, block1CurReplicas, 0,
+        block1ExpectedReplicas);
+
+    // Adding QUEUE_VERY_UNDER_REPLICATED block
+    underReplicatedBlocks.add(block2, 2, 0, 7);
+
+    // Adding QUEUE_UNDER_REPLICATED block
+    underReplicatedBlocks.add(block3, 2, 0, 6);
+
+    List<List<Block>> chosenBlocks;
+
+    // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
+    // from QUEUE_VERY_UNDER_REPLICATED.
+    chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
+    assertTheChosenBlocks(chosenBlocks, 0, 1, 0, 0, 0);
+
+    // Increasing the replications will move the block down a
+    // priority.  This simulates a replica being completed in between checks.
+    underReplicatedBlocks.update(block1, block1CurReplicas+1, 0,
+        block1ExpectedReplicas, 1, 0);
+
+    // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
+    // from QUEUE_VERY_UNDER_REPLICATED.
+    // This block was moved up a priority and should not be skipped over.
+    chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
+    assertTheChosenBlocks(chosenBlocks, 0, 1, 0, 0, 0);
+
+    // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
+    // from QUEUE_UNDER_REPLICATED.
+    chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
+    assertTheChosenBlocks(chosenBlocks, 0, 0, 1, 0, 0);
+  }
+
+  @Test(timeout = 60000)
+  public void testAddStoredBlockDoesNotCauseSkippedReplication()
+      throws IOException {
+    Namesystem mockNS = mock(Namesystem.class);
+    when(mockNS.isPopulatingReplQueues()).thenReturn(true);
+    when(mockNS.hasWriteLock()).thenReturn(true);
+    BlockManager bm =
+        new BlockManager(mockNS, new HdfsConfiguration());
+    UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
+
+    Block block1 = new Block(ThreadLocalRandom.current().nextLong());
+    Block block2 = new Block(ThreadLocalRandom.current().nextLong());
+
+    // Adding QUEUE_UNDER_REPLICATED block
+    underReplicatedBlocks.add(block1, 0, 1, 1);
+
+    // Adding QUEUE_UNDER_REPLICATED block
+    underReplicatedBlocks.add(block2, 0, 1, 1);
+
+    List<List<Block>> chosenBlocks;
+
+    // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
+    // from QUEUE_VERY_UNDER_REPLICATED.
+    chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
+    assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
+
+    // Adding this block collection to the BlockManager, so that when we add the
+    // block under construction, the BlockManager will realize the expected
+    // replication has been achieved and remove it from the under-replicated
+    // queue.
+    BlockInfoUnderConstruction info = new BlockInfoUnderConstructionContiguous(block1, (short)1);
+    BlockCollection bc = mock(BlockCollection.class);
+    when(bc.getPreferredBlockReplication()).thenReturn((short)1);
+    bm.addBlockCollection(info, bc);
+
+    StatefulBlockInfo statefulBlockInfo = new StatefulBlockInfo(info,
+      block1, ReplicaState.RBW);
+
+    // Adding this block will increase its current replication, and that will
+    // remove it from the queue.
+    bm.addStoredBlockUnderConstruction(statefulBlockInfo,
+        TestReplicationPolicy.storages[0]);
+
+    // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
+    // from QUEUE_VERY_UNDER_REPLICATED.
+    // This block remains and should not be skipped over.
+    chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
+    assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
+  }
+
+  @Test(timeout = 60000)
+  public void
+      testConvertLastBlockToUnderConstructionDoesNotCauseSkippedReplication()
+          throws IOException {
+    Namesystem mockNS = mock(Namesystem.class);
+    when(mockNS.isPopulatingReplQueues()).thenReturn(true);
+    BlockManager bm =
+        new BlockManager(mockNS, new HdfsConfiguration());
+    UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
+
+    Block block1 = new Block(ThreadLocalRandom.current().nextLong());
+    Block block2 = new Block(ThreadLocalRandom.current().nextLong());
+
+    // Adding QUEUE_UNDER_REPLICATED block
+    underReplicatedBlocks.add(block1, 0, 1, 1);
+
+    // Adding QUEUE_UNDER_REPLICATED block
+    underReplicatedBlocks.add(block2, 0, 1, 1);
+
+    List<List<Block>> chosenBlocks;
+
+    // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
+    // from QUEUE_VERY_UNDER_REPLICATED.
+    chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
+    assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
+
+    final BlockInfo info = new BlockInfoContiguous(block1, (short) 1);
+    final BlockCollection mbc = mock(BlockCollection.class);
+    when(mbc.getLastBlock()).thenReturn(info);
+    when(mbc.getPreferredBlockSize()).thenReturn(block1.getNumBytes() + 1);
+    when(mbc.getPreferredBlockReplication()).thenReturn((short)1);
+    ContentSummary cs = mock(ContentSummary.class);
+    when(cs.getLength()).thenReturn((long)1);
+    when(mbc.computeContentSummary(bm.getStoragePolicySuite())).thenReturn(cs);
+    info.setBlockCollection(mbc);
+    bm.addBlockCollection(info, mbc);
+
+    DatanodeStorageInfo[] dnAry = {storages[0]};
+    final BlockInfoUnderConstruction ucBlock =
+        info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,
+            dnAry);
+
+    DatanodeStorageInfo storage = mock(DatanodeStorageInfo.class);
+    DatanodeDescriptor dn = mock(DatanodeDescriptor.class);
+    when(dn.isDecommissioned()).thenReturn(true);
+    when(storage.getState()).thenReturn(DatanodeStorage.State.NORMAL);
+    when(storage.getDatanodeDescriptor()).thenReturn(dn);
+    when(storage.removeBlock(any(BlockInfo.class))).thenReturn(true);
+    when(storage.addBlock(any(BlockInfo.class))).thenReturn
+        (DatanodeStorageInfo.AddBlockResult.ADDED);
+    ucBlock.addStorage(storage);
+
+    when(mbc.setLastBlock((BlockInfo) any(), (DatanodeStorageInfo[]) any()))
+    .thenReturn(ucBlock);
+
+    bm.convertLastBlockToUnderConstruction(mbc, 0);
+
+    // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
+    // from QUEUE_VERY_UNDER_REPLICATED.
+    // This block remains and should not be skipped over.
+    chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
+    assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
+  }
+
+  @Test(timeout = 60000)
+  public void testupdateNeededReplicationsDoesNotCauseSkippedReplication()
+      throws IOException {
+    Namesystem mockNS = mock(Namesystem.class);
+    when(mockNS.isPopulatingReplQueues()).thenReturn(true);
+    BlockManager bm =
+        new BlockManager(mockNS, new HdfsConfiguration());
+    UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
+
+    Block block1 = new Block(ThreadLocalRandom.current().nextLong());
+    Block block2 = new Block(ThreadLocalRandom.current().nextLong());
+
+    // Adding QUEUE_UNDER_REPLICATED block
+    underReplicatedBlocks.add(block1, 0, 1, 1);
+
+    // Adding QUEUE_UNDER_REPLICATED block
+    underReplicatedBlocks.add(block2, 0, 1, 1);
+
+    List<List<Block>> chosenBlocks;
+
+    // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
+    // from QUEUE_VERY_UNDER_REPLICATED.
+    chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
+    assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
+
+    bm.setReplication((short)0, (short)1, "", block1);
+
+    // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
+    // from QUEUE_VERY_UNDER_REPLICATED.
+    // This block remains and should not be skipped over.
+    chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
+    assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd1e0930/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java
index e8b365a..f923920 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java
@@ -325,10 +325,19 @@ public class TestLightWeightLinkedSet {
     assertEquals(NUM, set.size());
     assertFalse(set.isEmpty());
 
+    // Advance the bookmark.
+    Iterator<Integer> bkmrkIt = set.getBookmark();
+    for (int i=0; i<set.size()/2+1; i++) {
+      bkmrkIt.next();
+    }
+    assertTrue(bkmrkIt.hasNext());
+
     // clear the set
     set.clear();
     assertEquals(0, set.size());
     assertTrue(set.isEmpty());
+    bkmrkIt = set.getBookmark();
+    assertFalse(bkmrkIt.hasNext());
 
     // poll should return an empty list
     assertEquals(0, set.pollAll().size());
@@ -363,4 +372,64 @@ public class TestLightWeightLinkedSet {
     LOG.info("Test capacity - DONE");
   }
 
+  @Test(timeout=60000)
+  public void testGetBookmarkReturnsBookmarkIterator() {
+    LOG.info("Test getBookmark returns proper iterator");
+    assertTrue(set.addAll(list));
+
+    Iterator<Integer> bookmark = set.getBookmark();
+    assertEquals(bookmark.next(), list.get(0));
+
+    final int numAdvance = list.size()/2;
+    for(int i=1; i<numAdvance; i++) {
+      bookmark.next();
+    }
+
+    Iterator<Integer> bookmark2 = set.getBookmark();
+    assertEquals(bookmark2.next(), list.get(numAdvance));
+  }
+
+  @Test(timeout=60000)
+  public void testBookmarkAdvancesOnRemoveOfSameElement() {
+    LOG.info("Test that the bookmark advances if we remove its element.");
+    assertTrue(set.add(list.get(0)));
+    assertTrue(set.add(list.get(1)));
+    assertTrue(set.add(list.get(2)));
+
+    Iterator<Integer> it = set.getBookmark();
+    assertEquals(it.next(), list.get(0));
+    set.remove(list.get(1));
+    it = set.getBookmark();
+    assertEquals(it.next(), list.get(2));
+  }
+
+  @Test(timeout=60000)
+  public void testBookmarkSetToHeadOnAddToEmpty() {
+    LOG.info("Test bookmark is set after adding to previously empty set.");
+    Iterator<Integer> it = set.getBookmark();
+    assertFalse(it.hasNext());
+    set.add(list.get(0));
+    set.add(list.get(1));
+
+    it = set.getBookmark();
+    assertTrue(it.hasNext());
+    assertEquals(it.next(), list.get(0));
+    assertEquals(it.next(), list.get(1));
+    assertFalse(it.hasNext());
+  }
+
+  @Test(timeout=60000)
+  public void testResetBookmarkPlacesBookmarkAtHead() {
+    set.addAll(list);
+    Iterator<Integer> it = set.getBookmark();
+    final int numAdvance = set.size()/2;
+    for (int i=0; i<numAdvance; i++) {
+      it.next();
+    }
+    assertEquals(it.next(), list.get(numAdvance));
+
+    set.resetBookmark();
+    it = set.getBookmark();
+    assertEquals(it.next(), list.get(0));
+  }
 }
\ No newline at end of file


Mime
View raw message