hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [1/2] hadoop git commit: HDFS-9260. Improve the performance and GC friendliness of NameNode startup and full block reports (Staffan Friberg via cmccabe)
Date Tue, 02 Feb 2016 19:28:54 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk 2da03b48e -> dd9ebf6ee


http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/FoldedTreeSet.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/FoldedTreeSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/FoldedTreeSet.java
new file mode 100644
index 0000000..1c6be1d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/FoldedTreeSet.java
@@ -0,0 +1,1285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import org.apache.hadoop.util.Time;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.SortedSet;
+
+/**
+ * A memory efficient implementation of RBTree. Instead of having a Node for
+ * each entry each node contains an array holding 64 entries.
+ *
+ * Based on the Apache Harmony folded TreeMap.
+ *
+ * @param <E> Entry type
+ */
+public class FoldedTreeSet<E> implements SortedSet<E> {
+
+  private static final boolean RED = true;
+  private static final boolean BLACK = false;
+
+  private final Comparator<E> comparator;
+  private Node<E> root;
+  private int size;
+  private int nodeCount;
+  private int modCount;
+  private Node<E> cachedNode;
+
+  /**
+   * Internal tree node that holds a sorted array of entries.
+   *
+   * @param <E> type of the elements
+   */
+  private static class Node<E> {
+
+    private static final int NODE_SIZE = 64;
+
+    // Tree structure
+    private Node<E> parent, left, right;
+    private boolean color;
+    private final E[] entries;
+    private int leftIndex = 0, rightIndex = -1;
+    private int size = 0;
+    // List for fast ordered iteration
+    private Node<E> prev, next;
+
+    @SuppressWarnings("unchecked")
+    public Node() {
+      entries = (E[]) new Object[NODE_SIZE];
+    }
+
+    public boolean isRed() {
+      return color == RED;
+    }
+
+    public boolean isBlack() {
+      return color == BLACK;
+    }
+
+    public Node<E> getLeftMostNode() {
+      Node<E> node = this;
+      while (node.left != null) {
+        node = node.left;
+      }
+      return node;
+    }
+
+    public Node<E> getRightMostNode() {
+      Node<E> node = this;
+      while (node.right != null) {
+        node = node.right;
+      }
+      return node;
+    }
+
+    public void addEntryLeft(E entry) {
+      assert rightIndex < entries.length;
+      assert !isFull();
+
+      if (leftIndex == 0) {
+        rightIndex++;
+        // Shift entries right/up
+        System.arraycopy(entries, 0, entries, 1, size);
+      } else {
+        leftIndex--;
+      }
+      size++;
+      entries[leftIndex] = entry;
+    }
+
+    public void addEntryRight(E entry) {
+      assert !isFull();
+
+      if (rightIndex == NODE_SIZE - 1) {
+        assert leftIndex > 0;
+        // Shift entries left/down
+        System.arraycopy(entries, leftIndex, entries, --leftIndex, size);
+      } else {
+        rightIndex++;
+      }
+      size++;
+      entries[rightIndex] = entry;
+    }
+
+    public void addEntryAt(E entry, int index) {
+      assert !isFull();
+
+      if (leftIndex == 0 || ((rightIndex != Node.NODE_SIZE - 1)
+                             && (rightIndex - index <= index - leftIndex))) {
+        rightIndex++;
+        System.arraycopy(entries, index,
+                         entries, index + 1, rightIndex - index);
+        entries[index] = entry;
+      } else {
+        int newLeftIndex = leftIndex - 1;
+        System.arraycopy(entries, leftIndex,
+                         entries, newLeftIndex, index - leftIndex);
+        leftIndex = newLeftIndex;
+        entries[index - 1] = entry;
+      }
+      size++;
+    }
+
+    public void addEntriesLeft(Node<E> from) {
+      leftIndex -= from.size;
+      size += from.size;
+      System.arraycopy(from.entries, from.leftIndex,
+                       entries, leftIndex, from.size);
+    }
+
+    public void addEntriesRight(Node<E> from) {
+      System.arraycopy(from.entries, from.leftIndex,
+                       entries, rightIndex + 1, from.size);
+      size += from.size;
+      rightIndex += from.size;
+    }
+
+    public E insertEntrySlideLeft(E entry, int index) {
+      E pushedEntry = entries[0];
+      System.arraycopy(entries, 1, entries, 0, index - 1);
+      entries[index - 1] = entry;
+      return pushedEntry;
+    }
+
+    public E insertEntrySlideRight(E entry, int index) {
+      E movedEntry = entries[rightIndex];
+      System.arraycopy(entries, index, entries, index + 1, rightIndex - index);
+      entries[index] = entry;
+      return movedEntry;
+    }
+
+    public E removeEntryLeft() {
+      assert !isEmpty();
+      E entry = entries[leftIndex];
+      entries[leftIndex] = null;
+      leftIndex++;
+      size--;
+      return entry;
+    }
+
+    public E removeEntryRight() {
+      assert !isEmpty();
+      E entry = entries[rightIndex];
+      entries[rightIndex] = null;
+      rightIndex--;
+      size--;
+      return entry;
+    }
+
+    public E removeEntryAt(int index) {
+      assert !isEmpty();
+
+      E entry = entries[index];
+      int rightSize = rightIndex - index;
+      int leftSize = index - leftIndex;
+      if (rightSize <= leftSize) {
+        System.arraycopy(entries, index + 1, entries, index, rightSize);
+        entries[rightIndex] = null;
+        rightIndex--;
+      } else {
+        System.arraycopy(entries, leftIndex, entries, leftIndex + 1, leftSize);
+        entries[leftIndex] = null;
+        leftIndex++;
+      }
+      size--;
+      return entry;
+    }
+
+    public boolean isFull() {
+      return size == NODE_SIZE;
+    }
+
+    public boolean isEmpty() {
+      return size == 0;
+    }
+
+    public void clear() {
+      if (leftIndex < rightIndex) {
+        Arrays.fill(entries, leftIndex, rightIndex + 1, null);
+      }
+      size = 0;
+      leftIndex = 0;
+      rightIndex = -1;
+      prev = null;
+      next = null;
+      parent = null;
+      left = null;
+      right = null;
+      color = BLACK;
+    }
+  }
+
+  private static final class TreeSetIterator<E> implements Iterator<E> {
+
+    private final FoldedTreeSet<E> tree;
+    private int iteratorModCount;
+    private Node<E> node;
+    private int index;
+    private E lastEntry;
+    private int lastIndex;
+    private Node<E> lastNode;
+
+    private TreeSetIterator(FoldedTreeSet<E> tree) {
+      this.tree = tree;
+      this.iteratorModCount = tree.modCount;
+      if (!tree.isEmpty()) {
+        this.node = tree.root.getLeftMostNode();
+        this.index = this.node.leftIndex;
+      }
+    }
+
+    @Override
+    public boolean hasNext() {
+      checkForModification();
+      return node != null;
+    }
+
+    @Override
+    public E next() {
+      if (hasNext()) {
+        lastEntry = node.entries[index];
+        lastIndex = index;
+        lastNode = node;
+        if (++index > node.rightIndex) {
+          node = node.next;
+          if (node != null) {
+            index = node.leftIndex;
+          }
+        }
+        return lastEntry;
+      } else {
+        throw new NoSuchElementException("Iterator exhausted");
+      }
+    }
+
+    @Override
+    public void remove() {
+      if (lastEntry == null) {
+        throw new IllegalStateException("No current element");
+      }
+      checkForModification();
+      if (lastNode.size == 1) {
+        // Safe to remove lastNode, the iterator is on the next node
+        tree.deleteNode(lastNode);
+      } else if (lastNode.leftIndex == lastIndex) {
+        // Safe to remove leftmost entry, the iterator is on the next index
+        lastNode.removeEntryLeft();
+      } else if (lastNode.rightIndex == lastIndex) {
+        // Safe to remove the rightmost entry, the iterator is on the next node
+        lastNode.removeEntryRight();
+      } else {
+        // Remove entry in the middle of the array
+        assert node == lastNode;
+        int oldRIndex = lastNode.rightIndex;
+        lastNode.removeEntryAt(lastIndex);
+        if (oldRIndex > lastNode.rightIndex) {
+          // Entries moved to the left in the array so index must be reset
+          index = lastIndex;
+        }
+      }
+      lastEntry = null;
+      iteratorModCount++;
+      tree.modCount++;
+      tree.size--;
+    }
+
+    private void checkForModification() {
+      if (iteratorModCount != tree.modCount) {
+        throw new ConcurrentModificationException("Tree has been modified "
+                                                  + "outside of iterator");
+      }
+    }
+  }
+
+  /**
+   * Create a new TreeSet that uses the natural ordering of objects. The element
+   * type must implement Comparable.
+   */
+  public FoldedTreeSet() {
+    this(null);
+  }
+
+  /**
+   * Create a new TreeSet that orders the elements using the supplied
+   * Comparator.
+   *
+   * @param comparator Comparator able to compare elements of type E
+   */
+  public FoldedTreeSet(Comparator<E> comparator) {
+    this.comparator = comparator;
+  }
+
+  private Node<E> cachedOrNewNode(E entry) {
+    Node<E> node = (cachedNode != null) ? cachedNode : new Node<E>();
+    cachedNode = null;
+    nodeCount++;
+    // Since BlockIDs are always increasing for new blocks it is best to
+    // add values on the left side to enable quicker inserts on the right
+    node.addEntryLeft(entry);
+    return node;
+  }
+
+  private void cacheAndClear(Node<E> node) {
+    if (cachedNode == null) {
+      node.clear();
+      cachedNode = node;
+    }
+  }
+
+  @Override
+  public Comparator<? super E> comparator() {
+    return comparator;
+  }
+
+  @Override
+  public SortedSet<E> subSet(E fromElement, E toElement) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public SortedSet<E> headSet(E toElement) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public SortedSet<E> tailSet(E fromElement) {
+    throw new UnsupportedOperationException("Not supported yet.");
+  }
+
+  @Override
+  public E first() {
+    if (!isEmpty()) {
+      Node<E> node = root.getLeftMostNode();
+      return node.entries[node.leftIndex];
+    }
+    return null;
+  }
+
+  @Override
+  public E last() {
+    if (!isEmpty()) {
+      Node<E> node = root.getRightMostNode();
+      return node.entries[node.rightIndex];
+    }
+    return null;
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return root == null;
+  }
+
+  /**
+   * Lookup and return a stored object using a user provided comparator.
+   *
+   * @param obj Lookup key
+   * @param cmp User provided Comparator. The comparator should expect that the
+   *            proved obj will always be the first method parameter and any
+   *            stored object will be the second parameter.
+   *
+   * @return A matching stored object or null if non is found
+   */
+  public E get(Object obj, Comparator<?> cmp) {
+    Objects.requireNonNull(obj);
+
+    Node<E> node = root;
+    while (node != null) {
+      E[] entries = node.entries;
+
+      int leftIndex = node.leftIndex;
+      int result = compare(obj, entries[leftIndex], cmp);
+      if (result < 0) {
+        node = node.left;
+      } else if (result == 0) {
+        return entries[leftIndex];
+      } else {
+        int rightIndex = node.rightIndex;
+        if (leftIndex != rightIndex) {
+          result = compare(obj, entries[rightIndex], cmp);
+        }
+        if (result == 0) {
+          return entries[rightIndex];
+        } else if (result > 0) {
+          node = node.right;
+        } else {
+          int low = leftIndex + 1;
+          int high = rightIndex - 1;
+          while (low <= high) {
+            int mid = (low + high) >>> 1;
+            result = compare(obj, entries[mid], cmp);
+            if (result > 0) {
+              low = mid + 1;
+            } else if (result < 0) {
+              high = mid - 1;
+            } else {
+              return entries[mid];
+            }
+          }
+          return null;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Lookup and return a stored object.
+   *
+   * @param entry Lookup entry
+   *
+   * @return A matching stored object or null if non is found
+   */
+  public E get(E entry) {
+    return get(entry, comparator);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public boolean contains(Object obj) {
+    return get((E) obj) != null;
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  private static int compare(Object lookup, Object stored, Comparator cmp) {
+    return cmp != null
+           ? cmp.compare(lookup, stored)
+           : ((Comparable<Object>) lookup).compareTo(stored);
+  }
+
+  @Override
+  public Iterator<E> iterator() {
+    return new TreeSetIterator<>(this);
+  }
+
+  @Override
+  public Object[] toArray() {
+    Object[] objects = new Object[size];
+    if (!isEmpty()) {
+      int pos = 0;
+      for (Node<E> node = root.getLeftMostNode(); node != null;
+          pos += node.size, node = node.next) {
+        System.arraycopy(node.entries, node.leftIndex, objects, pos, node.size);
+      }
+    }
+    return objects;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T> T[] toArray(T[] a) {
+    T[] r = a.length >= size ? a
+            : (T[]) java.lang.reflect.Array
+        .newInstance(a.getClass().getComponentType(), size);
+    if (!isEmpty()) {
+      Node<E> node = root.getLeftMostNode();
+      int pos = 0;
+      while (node != null) {
+        System.arraycopy(node.entries, node.leftIndex, r, pos, node.size);
+        pos += node.size;
+        node = node.next;
+      }
+      if (r.length > pos) {
+        r[pos] = null;
+      }
+    } else if (a.length > 0) {
+      a[0] = null;
+    }
+    return r;
+  }
+
+  /**
+   * Add or replace an entry in the TreeSet.
+   *
+   * @param entry Entry to add or replace/update.
+   *
+   * @return the previous entry, or null if this set did not already contain the
+   *         specified entry
+   */
+  public E addOrReplace(E entry) {
+    return add(entry, true);
+  }
+
+  @Override
+  public boolean add(E entry) {
+    return add(entry, false) == null;
+  }
+
+  /**
+   * Internal add method to add a entry to the set.
+   *
+   * @param entry   Entry to add
+   * @param replace Should the entry replace an old entry which is equal to the
+   *                new entry
+   *
+   * @return null if entry added and didn't exist or the previous value (which
+   *         might not have been overwritten depending on the replace parameter)
+   */
+  private E add(E entry, boolean replace) {
+    Objects.requireNonNull(entry);
+
+    // Empty tree
+    if (isEmpty()) {
+      root = cachedOrNewNode(entry);
+      size = 1;
+      modCount++;
+      return null;
+    }
+
+    // Compare right entry first since inserts of comperatively larger entries
+    // is more likely to be inserted. BlockID is always increasing in HDFS.
+    Node<E> node = root;
+    Node<E> prevNode = null;
+    int result = 0;
+    while (node != null) {
+      prevNode = node;
+      E[] entries = node.entries;
+      int rightIndex = node.rightIndex;
+      result = compare(entry, entries[rightIndex], comparator);
+      if (result > 0) {
+        node = node.right;
+      } else if (result == 0) {
+        E prevEntry = entries[rightIndex];
+        if (replace) {
+          entries[rightIndex] = entry;
+        }
+        return prevEntry;
+      } else {
+        int leftIndex = node.leftIndex;
+        if (leftIndex != rightIndex) {
+          result = compare(entry, entries[leftIndex], comparator);
+        }
+        if (result < 0) {
+          node = node.left;
+        } else if (result == 0) {
+          E prevEntry = entries[leftIndex];
+          if (replace) {
+            entries[leftIndex] = entry;
+          }
+          return prevEntry;
+        } else {
+          // Insert in this node
+          int low = leftIndex + 1, high = rightIndex - 1;
+          while (low <= high) {
+            int mid = (low + high) >>> 1;
+            result = compare(entry, entries[mid], comparator);
+            if (result > 0) {
+              low = mid + 1;
+            } else if (result == 0) {
+              E prevEntry = entries[mid];
+              if (replace) {
+                entries[mid] = entry;
+              }
+              return prevEntry;
+            } else {
+              high = mid - 1;
+            }
+          }
+          addElementInNode(node, entry, low);
+          return null;
+        }
+      }
+    }
+
+    assert prevNode != null;
+    size++;
+    modCount++;
+    if (!prevNode.isFull()) {
+      // The previous node still has space
+      if (result < 0) {
+        prevNode.addEntryLeft(entry);
+      } else {
+        prevNode.addEntryRight(entry);
+      }
+    } else if (result < 0) {
+      // The previous node is full, add to adjencent node or a new node
+      if (prevNode.prev != null && !prevNode.prev.isFull()) {
+        prevNode.prev.addEntryRight(entry);
+      } else {
+        attachNodeLeft(prevNode, cachedOrNewNode(entry));
+      }
+    } else if (prevNode.next != null && !prevNode.next.isFull()) {
+      prevNode.next.addEntryLeft(entry);
+    } else {
+      attachNodeRight(prevNode, cachedOrNewNode(entry));
+    }
+    return null;
+  }
+
+  /**
+   * Insert an entry last in the sorted tree. The entry must be the considered
+   * larger than the currently largest entry in the set when doing
+   * current.compareTo(entry), if entry is not the largest entry the method will
+   * fall back on the regular add method.
+   *
+   * @param entry entry to add
+   *
+   * @return True if added, false if already existed in the set
+   */
+  public boolean addSortedLast(E entry) {
+
+    if (isEmpty()) {
+      root = cachedOrNewNode(entry);
+      size = 1;
+      modCount++;
+      return true;
+    } else {
+      Node<E> node = root.getRightMostNode();
+      if (compare(node.entries[node.rightIndex], entry, comparator) < 0) {
+        size++;
+        modCount++;
+        if (!node.isFull()) {
+          node.addEntryRight(entry);
+        } else {
+          attachNodeRight(node, cachedOrNewNode(entry));
+        }
+        return true;
+      }
+    }
+
+    // Fallback on normal add if entry is unsorted
+    return add(entry);
+  }
+
+  private void addElementInNode(Node<E> node, E entry, int index) {
+    size++;
+    modCount++;
+
+    if (!node.isFull()) {
+      node.addEntryAt(entry, index);
+    } else {
+      // Node is full, insert and push old entry
+      Node<E> prev = node.prev;
+      Node<E> next = node.next;
+      if (prev == null) {
+        // First check if we have space in the the next node
+        if (next != null && !next.isFull()) {
+          E movedEntry = node.insertEntrySlideRight(entry, index);
+          next.addEntryLeft(movedEntry);
+        } else {
+          // Since prev is null the left child must be null
+          assert node.left == null;
+          E movedEntry = node.insertEntrySlideLeft(entry, index);
+          Node<E> newNode = cachedOrNewNode(movedEntry);
+          attachNodeLeft(node, newNode);
+        }
+      } else if (!prev.isFull()) {
+        // Prev has space
+        E movedEntry = node.insertEntrySlideLeft(entry, index);
+        prev.addEntryRight(movedEntry);
+      } else if (next == null) {
+        // Since next is null the right child must be null
+        assert node.right == null;
+        E movedEntry = node.insertEntrySlideRight(entry, index);
+        Node<E> newNode = cachedOrNewNode(movedEntry);
+        attachNodeRight(node, newNode);
+      } else if (!next.isFull()) {
+        // Next has space
+        E movedEntry = node.insertEntrySlideRight(entry, index);
+        next.addEntryLeft(movedEntry);
+      } else {
+        // Both prev and next nodes exist and are full
+        E movedEntry = node.insertEntrySlideRight(entry, index);
+        Node<E> newNode = cachedOrNewNode(movedEntry);
+        if (node.right == null) {
+          attachNodeRight(node, newNode);
+        } else {
+          // Since our right node exist,
+          // the left node of our next node must be empty
+          assert next.left == null;
+          attachNodeLeft(next, newNode);
+        }
+      }
+    }
+  }
+
+  private void attachNodeLeft(Node<E> node, Node<E> newNode) {
+    newNode.parent = node;
+    node.left = newNode;
+
+    newNode.next = node;
+    newNode.prev = node.prev;
+    if (newNode.prev != null) {
+      newNode.prev.next = newNode;
+    }
+    node.prev = newNode;
+    balanceInsert(newNode);
+  }
+
+  private void attachNodeRight(Node<E> node, Node<E> newNode) {
+    newNode.parent = node;
+    node.right = newNode;
+
+    newNode.prev = node;
+    newNode.next = node.next;
+    if (newNode.next != null) {
+      newNode.next.prev = newNode;
+    }
+    node.next = newNode;
+    balanceInsert(newNode);
+  }
+
+  /**
+   * Balance the RB Tree after insert.
+   *
+   * @param node Added node
+   */
+  private void balanceInsert(Node<E> node) {
+    node.color = RED;
+
+    while (node != root && node.parent.isRed()) {
+      if (node.parent == node.parent.parent.left) {
+        Node<E> uncle = node.parent.parent.right;
+        if (uncle != null && uncle.isRed()) {
+          node.parent.color = BLACK;
+          uncle.color = BLACK;
+          node.parent.parent.color = RED;
+          node = node.parent.parent;
+        } else {
+          if (node == node.parent.right) {
+            node = node.parent;
+            rotateLeft(node);
+          }
+          node.parent.color = BLACK;
+          node.parent.parent.color = RED;
+          rotateRight(node.parent.parent);
+        }
+      } else {
+        Node<E> uncle = node.parent.parent.left;
+        if (uncle != null && uncle.isRed()) {
+          node.parent.color = BLACK;
+          uncle.color = BLACK;
+          node.parent.parent.color = RED;
+          node = node.parent.parent;
+        } else {
+          if (node == node.parent.left) {
+            node = node.parent;
+            rotateRight(node);
+          }
+          node.parent.color = BLACK;
+          node.parent.parent.color = RED;
+          rotateLeft(node.parent.parent);
+        }
+      }
+    }
+    root.color = BLACK;
+  }
+
+  private void rotateRight(Node<E> node) {
+    Node<E> pivot = node.left;
+    node.left = pivot.right;
+    if (pivot.right != null) {
+      pivot.right.parent = node;
+    }
+    pivot.parent = node.parent;
+    if (node.parent == null) {
+      root = pivot;
+    } else if (node == node.parent.right) {
+      node.parent.right = pivot;
+    } else {
+      node.parent.left = pivot;
+    }
+    pivot.right = node;
+    node.parent = pivot;
+  }
+
+  private void rotateLeft(Node<E> node) {
+    Node<E> pivot = node.right;
+    node.right = pivot.left;
+    if (pivot.left != null) {
+      pivot.left.parent = node;
+    }
+    pivot.parent = node.parent;
+    if (node.parent == null) {
+      root = pivot;
+    } else if (node == node.parent.left) {
+      node.parent.left = pivot;
+    } else {
+      node.parent.right = pivot;
+    }
+    pivot.left = node;
+    node.parent = pivot;
+  }
+
+  /**
+   * Remove object using a provided comparator, and return the removed entry.
+   *
+   * @param obj Lookup entry
+   * @param cmp User provided Comparator. The comparator should expect that the
+   *            proved obj will always be the first method parameter and any
+   *            stored object will be the second parameter.
+   *
+   * @return The removed entry or null if not found
+   */
+  public E removeAndGet(Object obj, Comparator<?> cmp) {
+    Objects.requireNonNull(obj);
+
+    if (!isEmpty()) {
+      Node<E> node = root;
+      while (node != null) {
+        E[] entries = node.entries;
+        int leftIndex = node.leftIndex;
+        int result = compare(obj, entries[leftIndex], cmp);
+        if (result < 0) {
+          node = node.left;
+        } else if (result == 0) {
+          return removeElementLeft(node);
+        } else {
+          int rightIndex = node.rightIndex;
+          if (leftIndex != rightIndex) {
+            result = compare(obj, entries[rightIndex], cmp);
+          }
+          if (result == 0) {
+            return removeElementRight(node);
+          } else if (result > 0) {
+            node = node.right;
+          } else {
+            int low = leftIndex + 1, high = rightIndex - 1;
+            while (low <= high) {
+              int mid = (low + high) >>> 1;
+              result = compare(obj, entries[mid], cmp);
+              if (result > 0) {
+                low = mid + 1;
+              } else if (result == 0) {
+                return removeElementAt(node, mid);
+              } else {
+                high = mid - 1;
+              }
+            }
+            return null;
+          }
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Remove object and return the removed entry.
+   *
+   * @param obj Lookup entry
+   *
+   * @return The removed entry or null if not found
+   */
+  public E removeAndGet(Object obj) {
+    return removeAndGet(obj, comparator);
+  }
+
+  /**
+   * Remove object using a provided comparator.
+   *
+   * @param obj Lookup entry
+   * @param cmp User provided Comparator. The comparator should expect that the
+   *            proved obj will always be the first method parameter and any
+   *            stored object will be the second parameter.
+   *
+   * @return True if found and removed, else false
+   */
+  public boolean remove(Object obj, Comparator<?> cmp) {
+    return removeAndGet(obj, cmp) != null;
+  }
+
+  @Override
+  public boolean remove(Object obj) {
+    return removeAndGet(obj, comparator) != null;
+  }
+
+  private E removeElementLeft(Node<E> node) {
+    modCount++;
+    size--;
+    E entry = node.removeEntryLeft();
+
+    if (node.isEmpty()) {
+      deleteNode(node);
+    } else if (node.prev != null
+               && (Node.NODE_SIZE - 1 - node.prev.rightIndex) >= node.size) {
+      // Remaining entries fit in the prev node, move them and delete this node
+      node.prev.addEntriesRight(node);
+      deleteNode(node);
+    } else if (node.next != null && node.next.leftIndex >= node.size) {
+      // Remaining entries fit in the next node, move them and delete this node
+      node.next.addEntriesLeft(node);
+      deleteNode(node);
+    } else if (node.prev != null && node.prev.size < node.leftIndex) {
+      // Entries in prev node will fit in this node, move them and delete prev
+      node.addEntriesLeft(node.prev);
+      deleteNode(node.prev);
+    }
+
+    return entry;
+  }
+
+  private E removeElementRight(Node<E> node) {
+    modCount++;
+    size--;
+    E entry = node.removeEntryRight();
+
+    if (node.isEmpty()) {
+      deleteNode(node);
+    } else if (node.prev != null
+               && (Node.NODE_SIZE - 1 - node.prev.rightIndex) >= node.size) {
+      // Remaining entries fit in the prev node, move them and delete this node
+      node.prev.addEntriesRight(node);
+      deleteNode(node);
+    } else if (node.next != null && node.next.leftIndex >= node.size) {
+      // Remaining entries fit in the next node, move them and delete this node
+      node.next.addEntriesLeft(node);
+      deleteNode(node);
+    } else if (node.next != null
+               && node.next.size < (Node.NODE_SIZE - 1 - node.rightIndex)) {
+      // Entries in next node will fit in this node, move them and delete next
+      node.addEntriesRight(node.next);
+      deleteNode(node.next);
+    }
+
+    return entry;
+  }
+
+  private E removeElementAt(Node<E> node, int index) {
+    modCount++;
+    size--;
+    E entry = node.removeEntryAt(index);
+
+    if (node.prev != null
+        && (Node.NODE_SIZE - 1 - node.prev.rightIndex) >= node.size) {
+      // Remaining entries fit in the prev node, move them and delete this node
+      node.prev.addEntriesRight(node);
+      deleteNode(node);
+    } else if (node.next != null && (node.next.leftIndex) >= node.size) {
+      // Remaining entries fit in the next node, move them and delete this node
+      node.next.addEntriesLeft(node);
+      deleteNode(node);
+    } else if (node.prev != null && node.prev.size < node.leftIndex) {
+      // Entries in prev node will fit in this node, move them and delete prev
+      node.addEntriesLeft(node.prev);
+      deleteNode(node.prev);
+    } else if (node.next != null
+               && node.next.size < (Node.NODE_SIZE - 1 - node.rightIndex)) {
+      // Entries in next node will fit in this node, move them and delete next
+      node.addEntriesRight(node.next);
+      deleteNode(node.next);
+    }
+
+    return entry;
+  }
+
+  /**
+   * Delete the node and ensure the tree is balanced.
+   *
+   * @param node node to delete
+   */
+  private void deleteNode(final Node<E> node) {
+    if (node.right == null) {
+      if (node.left != null) {
+        attachToParent(node, node.left);
+      } else {
+        attachNullToParent(node);
+      }
+    } else if (node.left == null) {
+      attachToParent(node, node.right);
+    } else {
+      // node.left != null && node.right != null
+      // node.next should replace node in tree
+      // node.next != null guaranteed since node.left != null
+      // node.next.left == null since node.next.prev is node
+      // node.next.right may be null or non-null
+      Node<E> toMoveUp = node.next;
+      if (toMoveUp.right == null) {
+        attachNullToParent(toMoveUp);
+      } else {
+        attachToParent(toMoveUp, toMoveUp.right);
+      }
+      toMoveUp.left = node.left;
+      if (toMoveUp.left != null) {
+        toMoveUp.left.parent = toMoveUp;
+      }
+      toMoveUp.right = node.right;
+      if (toMoveUp.right != null) {
+        toMoveUp.right.parent = toMoveUp;
+      }
+      attachToParentNoBalance(node, toMoveUp);
+      toMoveUp.color = node.color;
+    }
+
+    // Remove node from ordered list of nodes
+    if (node.prev != null) {
+      node.prev.next = node.next;
+    }
+    if (node.next != null) {
+      node.next.prev = node.prev;
+    }
+
+    nodeCount--;
+    cacheAndClear(node);
+  }
+
+  private void attachToParentNoBalance(Node<E> toDelete, Node<E> toConnect) {
+    Node<E> parent = toDelete.parent;
+    toConnect.parent = parent;
+    if (parent == null) {
+      root = toConnect;
+    } else if (toDelete == parent.left) {
+      parent.left = toConnect;
+    } else {
+      parent.right = toConnect;
+    }
+  }
+
+  private void attachToParent(Node<E> toDelete, Node<E> toConnect) {
+    attachToParentNoBalance(toDelete, toConnect);
+    if (toDelete.isBlack()) {
+      balanceDelete(toConnect);
+    }
+  }
+
+  private void attachNullToParent(Node<E> toDelete) {
+    Node<E> parent = toDelete.parent;
+    if (parent == null) {
+      root = null;
+    } else {
+      if (toDelete == parent.left) {
+        parent.left = null;
+      } else {
+        parent.right = null;
+      }
+      if (toDelete.isBlack()) {
+        balanceDelete(parent);
+      }
+    }
+  }
+
+  /**
+   * Balance tree after removing a node.
+   *
+   * @param node Node to balance after deleting another node
+   */
+  private void balanceDelete(Node<E> node) {
+    while (node != root && node.isBlack()) {
+      if (node == node.parent.left) {
+        Node<E> sibling = node.parent.right;
+        if (sibling == null) {
+          node = node.parent;
+          continue;
+        }
+        if (sibling.isRed()) {
+          sibling.color = BLACK;
+          node.parent.color = RED;
+          rotateLeft(node.parent);
+          sibling = node.parent.right;
+          if (sibling == null) {
+            node = node.parent;
+            continue;
+          }
+        }
+        if ((sibling.left == null || !sibling.left.isRed())
+            && (sibling.right == null || !sibling.right.isRed())) {
+          sibling.color = RED;
+          node = node.parent;
+        } else {
+          if (sibling.right == null || !sibling.right.isRed()) {
+            sibling.left.color = BLACK;
+            sibling.color = RED;
+            rotateRight(sibling);
+            sibling = node.parent.right;
+          }
+          sibling.color = node.parent.color;
+          node.parent.color = BLACK;
+          sibling.right.color = BLACK;
+          rotateLeft(node.parent);
+          node = root;
+        }
+      } else {
+        Node<E> sibling = node.parent.left;
+        if (sibling == null) {
+          node = node.parent;
+          continue;
+        }
+        if (sibling.isRed()) {
+          sibling.color = BLACK;
+          node.parent.color = RED;
+          rotateRight(node.parent);
+          sibling = node.parent.left;
+          if (sibling == null) {
+            node = node.parent;
+            continue;
+          }
+        }
+        if ((sibling.left == null || sibling.left.isBlack())
+            && (sibling.right == null || sibling.right.isBlack())) {
+          sibling.color = RED;
+          node = node.parent;
+        } else {
+          if (sibling.left == null || sibling.left.isBlack()) {
+            sibling.right.color = BLACK;
+            sibling.color = RED;
+            rotateLeft(sibling);
+            sibling = node.parent.left;
+          }
+          sibling.color = node.parent.color;
+          node.parent.color = BLACK;
+          sibling.left.color = BLACK;
+          rotateRight(node.parent);
+          node = root;
+        }
+      }
+    }
+    node.color = BLACK;
+  }
+
+  @Override
+  public boolean containsAll(Collection<?> c) {
+    for (Object entry : c) {
+      if (!contains(entry)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public boolean addAll(Collection<? extends E> c) {
+    boolean modified = false;
+    for (E entry : c) {
+      if (add(entry)) {
+        modified = true;
+      }
+    }
+    return modified;
+  }
+
+  @Override
+  public boolean retainAll(Collection<?> c) {
+    boolean modified = false;
+    Iterator<E> it = iterator();
+    while (it.hasNext()) {
+      if (!c.contains(it.next())) {
+        it.remove();
+        modified = true;
+      }
+    }
+    return modified;
+  }
+
+  @Override
+  public boolean removeAll(Collection<?> c) {
+    boolean modified = false;
+    for (Object entry : c) {
+      if (remove(entry)) {
+        modified = true;
+      }
+    }
+    return modified;
+  }
+
+  @Override
+  public void clear() {
+    modCount++;
+    if (!isEmpty()) {
+      size = 0;
+      nodeCount = 0;
+      cacheAndClear(root);
+      root = null;
+    }
+  }
+
+  /**
+   * Returns the current size divided by the capacity of the tree. A value
+   * between 0.0 and 1.0, where 1.0 means that every allocated node in the tree
+   * is completely full.
+   *
+   * An empty set will return 1.0
+   *
+   * @return the fill ratio of the tree
+   */
+  public double fillRatio() {
+    if (nodeCount > 1) {
+      // Count the last node as completely full since it can't be compacted
+      return (size + (Node.NODE_SIZE - root.getRightMostNode().size))
+             / (double) (nodeCount * Node.NODE_SIZE);
+    }
+    return 1.0;
+  }
+
+  /**
+   * Compact all the entries to use the fewest number of nodes in the tree.
+   *
+   * Having a compact tree minimize memory usage, but can cause inserts to get
+   * slower due to new nodes needs to be allocated as there is no space in any
+   * of the existing nodes anymore for entries added in the middle of the set.
+   *
+   * Useful to do to reduce memory consumption and if the tree is know to not
+   * change after compaction or mainly added to at either extreme.
+   *
+   * @param timeout Maximum time to spend compacting the tree set in
+   *                milliseconds.
+   *
+   * @return true if compaction completed, false if aborted
+   */
+  public boolean compact(long timeout) {
+
+    if (!isEmpty()) {
+      long start = Time.monotonicNow();
+      Node<E> node = root.getLeftMostNode();
+      while (node != null) {
+        if (node.prev != null && !node.prev.isFull()) {
+          Node<E> prev = node.prev;
+          int count = Math.min(Node.NODE_SIZE - prev.size, node.size);
+          System.arraycopy(node.entries, node.leftIndex,
+                           prev.entries, prev.rightIndex + 1, count);
+          node.leftIndex += count;
+          node.size -= count;
+          prev.rightIndex += count;
+          prev.size += count;
+        }
+        if (node.isEmpty()) {
+          Node<E> temp = node.next;
+          deleteNode(node);
+          node = temp;
+          continue;
+        } else if (!node.isFull()) {
+          if (node.leftIndex != 0) {
+            System.arraycopy(node.entries, node.leftIndex,
+                             node.entries, 0, node.size);
+            Arrays.fill(node.entries, node.size, node.rightIndex + 1, null);
+            node.leftIndex = 0;
+            node.rightIndex = node.size - 1;
+          }
+        }
+        node = node.next;
+
+        if (Time.monotonicNow() - start > timeout) {
+          return false;
+        }
+      }
+    }
+
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 05a6830..02d5b81 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -261,6 +261,9 @@ message BlockReportContextProto  {
   // The block report lease ID, or 0 if we are sending without a lease to
   // bypass rate-limiting.
   optional uint64 leaseId = 4 [ default = 0 ];
+
+  // True if the reported blocks are sorted by increasing block IDs
+  optional bool sorted = 5 [default = false];
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
index bf29373..2cc1f7d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
@@ -228,7 +228,7 @@ public class TestBlockListAsLongs {
     request.set(null);
     nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask());
     nn.blockReport(reg, "pool", sbr,
-        new BlockReportContext(1, 0, System.nanoTime(), 0L));
+        new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
     BlockReportRequestProto proto = request.get();
     assertNotNull(proto);
     assertTrue(proto.getReports(0).getBlocksList().isEmpty());
@@ -238,7 +238,7 @@ public class TestBlockListAsLongs {
     request.set(null);
     nsInfo.setCapabilities(Capability.UNKNOWN.getMask());
     nn.blockReport(reg, "pool", sbr,
-        new BlockReportContext(1, 0, System.nanoTime(), 0L));
+        new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
     proto = request.get();
     assertNotNull(proto);
     assertFalse(proto.getReports(0).getBlocksList().isEmpty());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
index d6213ff..4e7cf3d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
@@ -19,19 +19,11 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import static org.apache.hadoop.hdfs.server.namenode.INodeId.INVALID_INODE_ID;
 import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.junit.Assert;
 import org.junit.Test;
@@ -91,84 +83,4 @@ public class TestBlockInfo {
     Assert.assertThat(added, is(false));
     Assert.assertThat(blockInfos[NUM_BLOCKS/2].getStorageInfo(0), is(storage2));
   }
-
-  @Test
-  public void testBlockListMoveToHead() throws Exception {
-    LOG.info("BlockInfo moveToHead tests...");
-
-    final int MAX_BLOCKS = 10;
-
-    DatanodeStorageInfo dd = DFSTestUtil.createDatanodeStorageInfo("s1", "1.1.1.1");
-    ArrayList<Block> blockList = new ArrayList<Block>(MAX_BLOCKS);
-    ArrayList<BlockInfo> blockInfoList = new ArrayList<BlockInfo>();
-    int headIndex;
-    int curIndex;
-
-    LOG.info("Building block list...");
-    for (int i = 0; i < MAX_BLOCKS; i++) {
-      blockList.add(new Block(i, 0, GenerationStamp.LAST_RESERVED_STAMP));
-      blockInfoList.add(new BlockInfoContiguous(blockList.get(i), (short) 3));
-      dd.addBlock(blockInfoList.get(i));
-
-      // index of the datanode should be 0
-      assertEquals("Find datanode should be 0", 0, blockInfoList.get(i)
-          .findStorageInfo(dd));
-    }
-
-    // list length should be equal to the number of blocks we inserted
-    LOG.info("Checking list length...");
-    assertEquals("Length should be MAX_BLOCK", MAX_BLOCKS, dd.numBlocks());
-    Iterator<BlockInfo> it = dd.getBlockIterator();
-    int len = 0;
-    while (it.hasNext()) {
-      it.next();
-      len++;
-    }
-    assertEquals("There should be MAX_BLOCK blockInfo's", MAX_BLOCKS, len);
-
-    headIndex = dd.getBlockListHeadForTesting().findStorageInfo(dd);
-
-    LOG.info("Moving each block to the head of the list...");
-    for (int i = 0; i < MAX_BLOCKS; i++) {
-      curIndex = blockInfoList.get(i).findStorageInfo(dd);
-      headIndex = dd.moveBlockToHead(blockInfoList.get(i), curIndex, headIndex);
-      // the moved element must be at the head of the list
-      assertEquals("Block should be at the head of the list now.",
-          blockInfoList.get(i), dd.getBlockListHeadForTesting());
-    }
-
-    // move head of the list to the head - this should not change the list
-    LOG.info("Moving head to the head...");
-
-    BlockInfo temp = dd.getBlockListHeadForTesting();
-    curIndex = 0;
-    headIndex = 0;
-    dd.moveBlockToHead(temp, curIndex, headIndex);
-    assertEquals(
-        "Moving head to the head of the list shopuld not change the list",
-        temp, dd.getBlockListHeadForTesting());
-
-    // check all elements of the list against the original blockInfoList
-    LOG.info("Checking elements of the list...");
-    temp = dd.getBlockListHeadForTesting();
-    assertNotNull("Head should not be null", temp);
-    int c = MAX_BLOCKS - 1;
-    while (temp != null) {
-      assertEquals("Expected element is not on the list",
-          blockInfoList.get(c--), temp);
-      temp = temp.getNext(0);
-    }
-
-    LOG.info("Moving random blocks to the head of the list...");
-    headIndex = dd.getBlockListHeadForTesting().findStorageInfo(dd);
-    Random rand = new Random();
-    for (int i = 0; i < MAX_BLOCKS; i++) {
-      int j = rand.nextInt(MAX_BLOCKS);
-      curIndex = blockInfoList.get(j).findStorageInfo(dd);
-      headIndex = dd.moveBlockToHead(blockInfoList.get(j), curIndex, headIndex);
-      // the moved element must be at the head of the list
-      assertEquals("Block should be at the head of the list now.",
-          blockInfoList.get(j), dd.getBlockListHeadForTesting());
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 0e4e167..a970d77 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -32,6 +32,7 @@ import static org.mockito.Mockito.verify;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -75,6 +76,7 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeId;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.namenode.TestINodeFile;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@@ -806,7 +808,8 @@ public class TestBlockManager {
     // Make sure it's the first full report
     assertEquals(0, ds.getBlockReportCount());
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-        builder.build(), null, false);
+        builder.build(),
+        new BlockReportContext(1, 0, System.nanoTime(), 0, true), false);
     assertEquals(1, ds.getBlockReportCount());
 
     // verify the storage info is correct
@@ -821,6 +824,70 @@ public class TestBlockManager {
         (ds) >= 0);
   }
 
+  @Test
+  public void testFullBR() throws Exception {
+    doReturn(true).when(fsn).isRunning();
+
+    DatanodeDescriptor node = nodes.get(0);
+    DatanodeStorageInfo ds = node.getStorageInfos()[0];
+    node.setAlive(true);
+    DatanodeRegistration nodeReg =  new DatanodeRegistration(node, null, null, "");
+
+    // register new node
+    bm.getDatanodeManager().registerDatanode(nodeReg);
+    bm.getDatanodeManager().addDatanode(node);
+    assertEquals(node, bm.getDatanodeManager().getDatanode(node));
+    assertEquals(0, ds.getBlockReportCount());
+
+    ArrayList<BlockInfo> blocks = new ArrayList<>();
+    for (int id = 24; id > 0; id--) {
+      blocks.add(addBlockToBM(id));
+    }
+
+    // Make sure it's the first full report
+    assertEquals(0, ds.getBlockReportCount());
+    bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
+                     generateReport(blocks),
+                     new BlockReportContext(1, 0, System.nanoTime(), 0, false),
+                     false);
+    assertEquals(1, ds.getBlockReportCount());
+    // verify the storage info is correct
+    for (BlockInfo block : blocks) {
+      assertTrue(bm.getStoredBlock(block).findStorageInfo(ds) >= 0);
+    }
+
+    // Send unsorted report
+    bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
+                     generateReport(blocks),
+                     new BlockReportContext(1, 0, System.nanoTime(), 0, false),
+                     false);
+    assertEquals(2, ds.getBlockReportCount());
+    // verify the storage info is correct
+    for (BlockInfo block : blocks) {
+      assertTrue(bm.getStoredBlock(block).findStorageInfo(ds) >= 0);
+    }
+
+    // Sort list and send a sorted report
+    Collections.sort(blocks);
+    bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
+                     generateReport(blocks),
+                     new BlockReportContext(1, 0, System.nanoTime(), 0, true),
+                     false);
+    assertEquals(3, ds.getBlockReportCount());
+    // verify the storage info is correct
+    for (BlockInfo block : blocks) {
+      assertTrue(bm.getStoredBlock(block).findStorageInfo(ds) >= 0);
+    }
+  }
+
+  private BlockListAsLongs generateReport(List<BlockInfo> blocks) {
+    BlockListAsLongs.Builder builder = BlockListAsLongs.builder();
+    for (BlockInfo block : blocks) {
+      builder.add(new FinalizedReplica(block, null, null));
+    }
+    return builder.build();
+  }
+
   private BlockInfo addBlockToBM(long blkId) {
     Block block = new Block(blkId);
     BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 3);
@@ -1061,4 +1128,4 @@ public class TestBlockManager {
       cluster.shutdown();
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index c843938..f4e88b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -29,6 +29,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.management.NotCompliantMBeanException;
@@ -566,7 +567,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
       }
       Map<Block, BInfo> map = blockMap.get(bpid);
       if (map == null) {
-        map = new HashMap<Block, BInfo>();
+        map = new TreeMap<>();
         blockMap.put(bpid, map);
       }
       
@@ -1206,7 +1207,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
 
   @Override // FsDatasetSpi
   public void addBlockPool(String bpid, Configuration conf) {
-    Map<Block, BInfo> map = new HashMap<Block, BInfo>();
+    Map<Block, BInfo> map = new TreeMap<>();
     blockMap.put(bpid, map);
     storage.addBlockPool(bpid);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
index 27d1cea..61321e4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
@@ -19,14 +19,23 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.IOException;
 import java.util.ArrayList;
-
+import java.util.Collections;
+import java.util.Comparator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.*;
-import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -108,12 +117,13 @@ public class TestBlockHasMultipleReplicasOnSameDN {
     StorageBlockReport reports[] =
         new StorageBlockReport[cluster.getStoragesPerDatanode()];
 
-    ArrayList<Replica> blocks = new ArrayList<Replica>();
+    ArrayList<ReplicaInfo> blocks = new ArrayList<>();
 
     for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
       Block localBlock = locatedBlock.getBlock().getLocalBlock();
       blocks.add(new FinalizedReplica(localBlock, null, null));
     }
+    Collections.sort(blocks);
 
     try (FsDatasetSpi.FsVolumeReferences volumes =
       dn.getFSDataset().getFsVolumeReferences()) {
@@ -126,7 +136,7 @@ public class TestBlockHasMultipleReplicasOnSameDN {
 
     // Should not assert!
     cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports,
-        new BlockReportContext(1, 0, System.nanoTime(), 0L));
+        new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
 
     // Get the block locations once again.
     locatedBlocks = client.getLocatedBlocks(filename, 0, BLOCK_SIZE * NUM_BLOCKS);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
index 212d2e6..27029a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
@@ -82,6 +82,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.timeout;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
index 90e000b..66f804c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@@ -190,7 +191,8 @@ public class TestDataNodeVolumeFailure {
             new StorageBlockReport(dnStorage, blockList);
     }
     
-    cluster.getNameNodeRpc().blockReport(dnR, bpid, reports, null);
+    cluster.getNameNodeRpc().blockReport(dnR, bpid, reports,
+        new BlockReportContext(1, 0, System.nanoTime(), 0, true));
 
     // verify number of blocks and files...
     verify(filename, filesize);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
index aadd9b2..badd593 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
@@ -134,7 +134,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
     Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport(
         any(DatanodeRegistration.class),
         anyString(),
-        captor.capture(),  Mockito.<BlockReportContext>anyObject());
+        captor.capture(), Mockito.<BlockReportContext>anyObject());
 
     verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
   }
@@ -166,7 +166,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
     Mockito.verify(nnSpy, times(1)).blockReport(
         any(DatanodeRegistration.class),
         anyString(),
-        captor.capture(),  Mockito.<BlockReportContext>anyObject());
+        captor.capture(), Mockito.<BlockReportContext>anyObject());
 
     verifyCapturedArguments(captor, cluster.getStoragesPerDatanode(), BLOCKS_IN_FILE);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
index 67bbefe..791ee20 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
-import org.apache.hadoop.util.Time;
 
 
 /**
@@ -40,7 +39,7 @@ public class TestNNHandlesBlockReportPerStorage extends BlockReportTestBase {
       LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
       StorageBlockReport[] singletonReport = { report };
       cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
-          new BlockReportContext(reports.length, i, System.nanoTime(), 0L));
+          new BlockReportContext(reports.length, i, System.nanoTime(), 0L, true));
       i++;
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
index fd19ba6..a35fa48 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
@@ -36,6 +36,6 @@ public class TestNNHandlesCombinedBlockReport extends BlockReportTestBase {
                                   StorageBlockReport[] reports) throws IOException {
     LOG.info("Sending combined block reports for " + dnR);
     cluster.getNameNodeRpc().blockReport(dnR, poolId, reports,
-        new BlockReportContext(1, 0, System.nanoTime(), 0L));
+        new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java
index 00c0f22..f12bc18 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.timeout;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index 42cb72f..7fa3803 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -973,7 +973,7 @@ public class NNThroughputBenchmark implements Tool {
           new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
       };
       dataNodeProto.blockReport(dnRegistration, bpid, reports,
-              new BlockReportContext(1, 0, System.nanoTime(), 0L));
+              new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
     }
 
     /**
@@ -1247,7 +1247,7 @@ public class NNThroughputBenchmark implements Tool {
       StorageBlockReport[] report = { new StorageBlockReport(
           dn.storage, dn.getBlockReportList()) };
       dataNodeProto.blockReport(dn.dnRegistration, bpid, report,
-          new BlockReportContext(1, 0, System.nanoTime(), 0L));
+          new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
       long end = Time.now();
       return end-start;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
index 542c616..dfe9cbe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
@@ -303,7 +304,8 @@ public class TestAddStripedBlocks {
       StorageBlockReport[] reports = {new StorageBlockReport(storage,
           bll)};
       cluster.getNameNodeRpc().blockReport(dn.getDNRegistrationForBP(bpId),
-          bpId, reports, null);
+          bpId, reports,
+          new BlockReportContext(1, 0, System.nanoTime(), 0, true));
     }
 
     DatanodeStorageInfo[] locs = lastBlock.getUnderConstructionFeature()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index 7fd0c30..ff8f81b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -118,7 +118,7 @@ public class TestDeadDatanode {
         BlockListAsLongs.EMPTY) };
     try {
       dnp.blockReport(reg, poolId, report,
-          new BlockReportContext(1, 0, System.nanoTime(), 0L));
+          new BlockReportContext(1, 0, System.nanoTime(), 0L, true));
       fail("Expected IOException is not thrown");
     } catch (IOException ex) {
       // Expected

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd9ebf6e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/FoldedTreeSetTest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/FoldedTreeSetTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/FoldedTreeSetTest.java
new file mode 100644
index 0000000..d554b1b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/FoldedTreeSetTest.java
@@ -0,0 +1,644 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * Test of TreeSet
+ */
+public class FoldedTreeSetTest {
+
+  private static Random srand;
+
+  public FoldedTreeSetTest() {
+  }
+
+  @BeforeClass
+  public static void setUpClass() {
+    long seed = System.nanoTime();
+    System.out.println("This run uses the random seed " + seed);
+    srand = new Random(seed);
+  }
+
+  @AfterClass
+  public static void tearDownClass() {
+  }
+
+  @Before
+  public void setUp() {
+  }
+
+  @After
+  public void tearDown() {
+  }
+
+  /**
+   * Test of comparator method, of class TreeSet.
+   */
+  @Test
+  public void testComparator() {
+    Comparator<String> comparator = new Comparator<String>() {
+
+      @Override
+      public int compare(String o1, String o2) {
+        return o1.compareTo(o2);
+      }
+    };
+    assertEquals(null, new FoldedTreeSet<>().comparator());
+    assertEquals(comparator, new FoldedTreeSet<>(comparator).comparator());
+
+    FoldedTreeSet<String> set = new FoldedTreeSet<>(comparator);
+    set.add("apa3");
+    set.add("apa2");
+    set.add("apa");
+    set.add("apa5");
+    set.add("apa4");
+    assertEquals(5, set.size());
+    assertEquals("apa", set.get("apa"));
+  }
+
+  /**
+   * Test of first method, of class TreeSet.
+   */
+  @Test
+  public void testFirst() {
+    FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
+    for (int i = 0; i < 256; i++) {
+      tree.add(1024 + i);
+      assertEquals(1024, tree.first().intValue());
+    }
+    for (int i = 1; i < 256; i++) {
+      tree.remove(1024 + i);
+      assertEquals(1024, tree.first().intValue());
+    }
+  }
+
+  /**
+   * Test of last method, of class TreeSet.
+   */
+  @Test
+  public void testLast() {
+    FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
+    for (int i = 0; i < 256; i++) {
+      tree.add(1024 + i);
+      assertEquals(1024 + i, tree.last().intValue());
+    }
+    for (int i = 0; i < 255; i++) {
+      tree.remove(1024 + i);
+      assertEquals(1279, tree.last().intValue());
+    }
+  }
+
+  /**
+   * Test of size method, of class TreeSet.
+   */
+  @Test
+  public void testSize() {
+    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+    String entry = "apa";
+    assertEquals(0, instance.size());
+    instance.add(entry);
+    assertEquals(1, instance.size());
+    instance.remove(entry);
+    assertEquals(0, instance.size());
+  }
+
+  /**
+   * Test of isEmpty method, of class TreeSet.
+   */
+  @Test
+  public void testIsEmpty() {
+    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+    boolean expResult = true;
+    boolean result = instance.isEmpty();
+    assertEquals(expResult, result);
+    instance.add("apa");
+    instance.remove("apa");
+    assertEquals(expResult, result);
+  }
+
+  /**
+   * Test of contains method, of class TreeSet.
+   */
+  @Test
+  public void testContains() {
+    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+    String entry = "apa";
+    assertEquals(false, instance.contains(entry));
+    instance.add(entry);
+    assertEquals(true, instance.contains(entry));
+    assertEquals(false, instance.contains(entry + entry));
+  }
+
+  /**
+   * Test of iterator method, of class TreeSet.
+   */
+  @Test
+  public void testIterator() {
+
+    for (int iter = 0; iter < 10; iter++) {
+      FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
+      long[] longs = new long[64723];
+      for (int i = 0; i < longs.length; i++) {
+        Holder val = new Holder(srand.nextLong());
+        while (set.contains(val)) {
+          val = new Holder(srand.nextLong());
+        }
+        longs[i] = val.getId();
+        set.add(val);
+      }
+      assertEquals(longs.length, set.size());
+      Arrays.sort(longs);
+
+      Iterator<Holder> it = set.iterator();
+      for (int i = 0; i < longs.length; i++) {
+        assertTrue(it.hasNext());
+        Holder val = it.next();
+        assertEquals(longs[i], val.getId());
+        // remove randomly to force non linear removes
+        if (srand.nextBoolean()) {
+          it.remove();
+        }
+      }
+    }
+  }
+
+  /**
+   * Test of toArray method, of class TreeSet.
+   */
+  @Test
+  public void testToArray() {
+    FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
+    ArrayList<Integer> list = new ArrayList<>(256);
+    for (int i = 0; i < 256; i++) {
+      list.add(1024 + i);
+    }
+    tree.addAll(list);
+    assertArrayEquals(list.toArray(), tree.toArray());
+  }
+
+  /**
+   * Test of toArray method, of class TreeSet.
+   */
+  @Test
+  public void testToArray_GenericType() {
+    FoldedTreeSet<Integer> tree = new FoldedTreeSet<>();
+    ArrayList<Integer> list = new ArrayList<>(256);
+    for (int i = 0; i < 256; i++) {
+      list.add(1024 + i);
+    }
+    tree.addAll(list);
+    assertArrayEquals(list.toArray(new Integer[tree.size()]), tree.toArray(new Integer[tree.size()]));
+    assertArrayEquals(list.toArray(new Integer[tree.size() + 100]), tree.toArray(new Integer[tree.size() + 100]));
+  }
+
+  /**
+   * Test of add method, of class TreeSet.
+   */
+  @Test
+  public void testAdd() {
+    FoldedTreeSet<String> simpleSet = new FoldedTreeSet<>();
+    String entry = "apa";
+    assertTrue(simpleSet.add(entry));
+    assertFalse(simpleSet.add(entry));
+
+    FoldedTreeSet<Integer> intSet = new FoldedTreeSet<>();
+    for (int i = 512; i < 1024; i++) {
+      assertTrue(intSet.add(i));
+    }
+    for (int i = -1024; i < -512; i++) {
+      assertTrue(intSet.add(i));
+    }
+    for (int i = 0; i < 512; i++) {
+      assertTrue(intSet.add(i));
+    }
+    for (int i = -512; i < 0; i++) {
+      assertTrue(intSet.add(i));
+    }
+    assertEquals(2048, intSet.size());
+
+    FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
+    long[] longs = new long[23432];
+    for (int i = 0; i < longs.length; i++) {
+      Holder val = new Holder(srand.nextLong());
+      while (set.contains(val)) {
+        val = new Holder(srand.nextLong());
+      }
+      longs[i] = val.getId();
+      assertTrue(set.add(val));
+    }
+    assertEquals(longs.length, set.size());
+    Arrays.sort(longs);
+
+    Iterator<Holder> it = set.iterator();
+    for (int i = 0; i < longs.length; i++) {
+      assertTrue(it.hasNext());
+      Holder val = it.next();
+      assertEquals(longs[i], val.getId());
+    }
+
+    // Specially constructed adds to exercise all code paths
+    FoldedTreeSet<Integer> specialAdds = new FoldedTreeSet<>();
+    // Fill node with even numbers
+    for (int i = 0; i < 128; i += 2) {
+      assertTrue(specialAdds.add(i));
+    }
+    // Remove left and add left
+    assertTrue(specialAdds.remove(0));
+    assertTrue(specialAdds.add(-1));
+    assertTrue(specialAdds.remove(-1));
+    // Add right and shift everything left
+    assertTrue(specialAdds.add(127));
+    assertTrue(specialAdds.remove(127));
+
+    // Empty at both ends
+    assertTrue(specialAdds.add(0));
+    assertTrue(specialAdds.remove(0));
+    assertTrue(specialAdds.remove(126));
+    // Add in the middle left to slide entries left
+    assertTrue(specialAdds.add(11));
+    assertTrue(specialAdds.remove(11));
+    // Add in the middle right to slide entries right
+    assertTrue(specialAdds.add(99));
+    assertTrue(specialAdds.remove(99));
+    // Add existing entry in the middle of a node
+    assertFalse(specialAdds.add(64));
+  }
+
+  @Test
+  public void testAddOrReplace() {
+    FoldedTreeSet<String> simpleSet = new FoldedTreeSet<>();
+    String entry = "apa";
+    assertNull(simpleSet.addOrReplace(entry));
+    assertEquals(entry, simpleSet.addOrReplace(entry));
+
+    FoldedTreeSet<Integer> intSet = new FoldedTreeSet<>();
+    for (int i = 0; i < 1024; i++) {
+      assertNull(intSet.addOrReplace(i));
+    }
+    for (int i = 0; i < 1024; i++) {
+      assertEquals(i, intSet.addOrReplace(i).intValue());
+    }
+  }
+
+  private static class Holder implements Comparable<Holder> {
+
+    private final long id;
+
+    public Holder(long id) {
+      this.id = id;
+    }
+
+    public long getId() {
+      return id;
+    }
+
+    @Override
+    public int compareTo(Holder o) {
+      return id < o.getId() ? -1
+             : id > o.getId() ? 1 : 0;
+    }
+  }
+
+  @Test
+  public void testRemoveWithComparator() {
+    FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
+    long[] longs = new long[98327];
+    for (int i = 0; i < longs.length; i++) {
+      Holder val = new Holder(srand.nextLong());
+      while (set.contains(val)) {
+        val = new Holder(srand.nextLong());
+      }
+      longs[i] = val.getId();
+      set.add(val);
+    }
+    assertEquals(longs.length, set.size());
+    Comparator<Object> cmp = new Comparator<Object>() {
+      @Override
+      public int compare(Object o1, Object o2) {
+        long lookup = (long) o1;
+        long stored = ((Holder) o2).getId();
+        return lookup < stored ? -1
+               : lookup > stored ? 1 : 0;
+      }
+    };
+
+    for (long val : longs) {
+      set.remove(val, cmp);
+    }
+    assertEquals(0, set.size());
+    assertTrue(set.isEmpty());
+  }
+
+  @Test
+  public void testGetWithComparator() {
+    FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
+    long[] longs = new long[32147];
+    for (int i = 0; i < longs.length; i++) {
+      Holder val = new Holder(srand.nextLong());
+      while (set.contains(val)) {
+        val = new Holder(srand.nextLong());
+      }
+      longs[i] = val.getId();
+      set.add(val);
+    }
+    assertEquals(longs.length, set.size());
+    Comparator<Object> cmp = new Comparator<Object>() {
+      @Override
+      public int compare(Object o1, Object o2) {
+        long lookup = (long) o1;
+        long stored = ((Holder) o2).getId();
+        return lookup < stored ? -1
+               : lookup > stored ? 1 : 0;
+      }
+    };
+
+    for (long val : longs) {
+      assertEquals(val, set.get(val, cmp).getId());
+    }
+  }
+
+  @Test
+  public void testGet() {
+    FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
+    long[] longs = new long[43277];
+    for (int i = 0; i < longs.length; i++) {
+      Holder val = new Holder(srand.nextLong());
+      while (set.contains(val)) {
+        val = new Holder(srand.nextLong());
+      }
+      longs[i] = val.getId();
+      set.add(val);
+    }
+    assertEquals(longs.length, set.size());
+
+    for (long val : longs) {
+      assertEquals(val, set.get(new Holder(val)).getId());
+    }
+  }
+
+  /**
+   * Test of remove method, of class TreeSet.
+   */
+  @Test
+  public void testRemove() {
+    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+    assertEquals(false, instance.remove("apa"));
+    instance.add("apa");
+    assertEquals(true, instance.remove("apa"));
+
+    removeLeft();
+    removeRight();
+    removeAt();
+    removeRandom();
+  }
+
+  public void removeLeft() {
+    FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
+    for (int i = 1; i <= 320; i++) {
+      set.add(i);
+    }
+    for (int i = 193; i < 225; i++) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+    for (int i = 129; i < 161; i++) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+    for (int i = 256; i > 224; i--) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+    for (int i = 257; i < 289; i++) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+    while (!set.isEmpty()) {
+      assertTrue(set.remove(set.first()));
+    }
+  }
+
+  public void removeRight() {
+    FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
+    for (int i = 1; i <= 320; i++) {
+      set.add(i);
+    }
+    for (int i = 193; i < 225; i++) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+    for (int i = 192; i > 160; i--) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+    for (int i = 256; i > 224; i--) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+    for (int i = 320; i > 288; i--) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+    while (!set.isEmpty()) {
+      assertTrue(set.remove(set.last()));
+    }
+  }
+
+  public void removeAt() {
+    FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
+    for (int i = 1; i <= 320; i++) {
+      set.add(i);
+    }
+    for (int i = 193; i < 225; i++) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+    for (int i = 160; i < 192; i++) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+    for (int i = 225; i < 257; i++) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+    for (int i = 288; i < 320; i++) {
+      assertEquals(true, set.remove(i));
+      assertEquals(false, set.remove(i));
+    }
+  }
+
+  public void removeRandom() {
+    FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
+    int[] integers = new int[2048];
+    for (int i = 0; i < 2048; i++) {
+      int val = srand.nextInt();
+      while (set.contains(val)) {
+        val = srand.nextInt();
+      }
+      integers[i] = val;
+      set.add(val);
+    }
+    assertEquals(2048, set.size());
+
+    for (int val : integers) {
+      assertEquals(true, set.remove(val));
+      assertEquals(false, set.remove(val));
+    }
+    assertEquals(true, set.isEmpty());
+  }
+
+  /**
+   * Test of containsAll method, of class TreeSet.
+   */
+  @Test
+  public void testContainsAll() {
+    Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
+    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+    assertEquals(false, instance.containsAll(list));
+    instance.addAll(list);
+    assertEquals(true, instance.containsAll(list));
+  }
+
+  /**
+   * Test of addAll method, of class TreeSet.
+   */
+  @Test
+  public void testAddAll() {
+    Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
+    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+    assertEquals(true, instance.addAll(list));
+    assertEquals(false, instance.addAll(list)); // add same entries again
+  }
+
+  /**
+   * Test of retainAll method, of class TreeSet.
+   */
+  @Test
+  public void testRetainAll() {
+    Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
+    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+    instance.addAll(list);
+    assertEquals(false, instance.retainAll(list));
+    assertEquals(2, instance.size());
+    Collection<String> list2 = Arrays.asList(new String[]{"apa"});
+    assertEquals(true, instance.retainAll(list2));
+    assertEquals(1, instance.size());
+  }
+
+  /**
+   * Test of removeAll method, of class TreeSet.
+   */
+  @Test
+  public void testRemoveAll() {
+    Collection<String> list = Arrays.asList(new String[]{"apa", "apa2", "apa"});
+    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+    assertEquals(false, instance.removeAll(list));
+    instance.addAll(list);
+    assertEquals(true, instance.removeAll(list));
+    assertEquals(true, instance.isEmpty());
+  }
+
+  /**
+   * Test of clear method, of class TreeSet.
+   */
+  @Test
+  public void testClear() {
+    FoldedTreeSet<String> instance = new FoldedTreeSet<>();
+    instance.clear();
+    assertEquals(true, instance.isEmpty());
+    instance.add("apa");
+    assertEquals(false, instance.isEmpty());
+    instance.clear();
+    assertEquals(true, instance.isEmpty());
+  }
+
+  @Test
+  public void testFillRatio() {
+    FoldedTreeSet<Integer> set = new FoldedTreeSet<>();
+    final int size = 1024;
+    for (int i = 1; i <= size; i++) {
+      set.add(i);
+      assertEquals("Iteration: " + i, 1.0, set.fillRatio(), 0.0);
+    }
+
+    for (int i = 1; i <= size / 2; i++) {
+      set.remove(i * 2);
+      // Need the max since all the removes from the last node doesn't
+      // affect the fill ratio
+      assertEquals("Iteration: " + i,
+                   Math.max((size - i) / (double) size, 0.53125),
+                   set.fillRatio(), 0.0);
+    }
+  }
+
+  @Test
+  public void testCompact() {
+    FoldedTreeSet<Holder> set = new FoldedTreeSet<>();
+    long[] longs = new long[24553];
+    for (int i = 0; i < longs.length; i++) {
+      Holder val = new Holder(srand.nextLong());
+      while (set.contains(val)) {
+        val = new Holder(srand.nextLong());
+      }
+      longs[i] = val.getId();
+      set.add(val);
+    }
+    assertEquals(longs.length, set.size());
+
+    long[] longs2 = new long[longs.length];
+    for (int i = 0; i < longs2.length; i++) {
+      Holder val = new Holder(srand.nextLong());
+      while (set.contains(val)) {
+        val = new Holder(srand.nextLong());
+      }
+      longs2[i] = val.getId();
+      set.add(val);
+    }
+    assertEquals(longs.length + longs2.length, set.size());
+
+    // Create fragementation
+    for (long val : longs) {
+      assertTrue(set.remove(new Holder(val)));
+    }
+    assertEquals(longs2.length, set.size());
+
+    assertFalse(set.compact(0));
+    assertTrue(set.compact(Long.MAX_VALUE));
+    assertEquals(longs2.length, set.size());
+    for (long val : longs) {
+      assertFalse(set.remove(new Holder(val)));
+    }
+    for (long val : longs2) {
+      assertEquals(val, set.get(new Holder(val)).getId());
+    }
+  }
+}


Mime
View raw message