hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1002019 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/hadoop/hbase/regionserver/wal/ src/test/java/org/apache/hadoop/hbase/regionserver/
Date Tue, 28 Sep 2010 05:26:23 GMT
Author: stack
Date: Tue Sep 28 05:26:22 2010
New Revision: 1002019

URL: http://svn.apache.org/viewvc?rev=1002019&view=rev
Log:
HBASE-2646 Compaction requests should be prioritized to prevent blocking

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1002019&r1=1002018&r2=1002019&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue Sep 28 05:26:22 2010
@@ -947,6 +947,8 @@ Release 0.21.0 - Unreleased
                (Andy Chen via Stack)
    HBASE-3030  The return code of many filesystem operations are not checked
                (dhruba borthakur via Stack)
+   HBASE-2646  Compaction requests should be prioritized to prevent blocking
+               (Jeff Whiting via Stack)
 
   NEW FEATURES
    HBASE-1961  HBase EC2 scripts

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1002019&r1=1002018&r2=1002019&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
Tue Sep 28 05:26:22 2010
@@ -20,9 +20,6 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
-import java.util.HashSet;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -43,10 +40,36 @@ public class CompactSplitThread extends 
   private final HRegionServer server;
   private final Configuration conf;
 
-  private final BlockingQueue<HRegion> compactionQueue =
-    new LinkedBlockingQueue<HRegion>();
+  private final PriorityCompactionQueue compactionQueue =
+    new PriorityCompactionQueue();
 
-  private final HashSet<HRegion> regionsInQueue = new HashSet<HRegion>();
+  /** The priorities for a compaction request. */
+  public enum Priority implements Comparable<Priority> {
+    //NOTE: All priorities should be numbered consecutively starting with 1.
+    //The highest priority should be 1 followed by all lower priorities.
+    //Priorities can be changed at anytime without requiring any changes to the
+    //queue.
+
+    /** HIGH_BLOCKING should only be used when an operation is blocked until a
+     * compact / split is done (e.g. a MemStore can't flush because it has
+     * "too many store files" and is blocking until a compact / split is done)
+     */
+    HIGH_BLOCKING(1),
+    /** A normal compaction / split request */
+    NORMAL(2),
+    /** A low compaction / split request -- not currently used */
+    LOW(3);
+
+    int value;
+
+    Priority(int value) {
+      this.value = value;
+    }
+
+    int getInt() {
+      return value;
+    }
+  }
 
   /**
    * Splitting should not take place if the total number of regions exceed this.
@@ -74,9 +97,6 @@ public class CompactSplitThread extends 
       try {
         r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
         if (r != null && !this.server.isStopped()) {
-          synchronized (regionsInQueue) {
-            regionsInQueue.remove(r);
-          }
           lock.lock();
           try {
             // Don't interrupt us while we are working
@@ -107,14 +127,23 @@ public class CompactSplitThread extends 
         }
       }
     }
-    regionsInQueue.clear();
     compactionQueue.clear();
     LOG.info(getName() + " exiting");
   }
 
   public synchronized void requestCompaction(final HRegion r,
       final String why) {
-    requestCompaction(r, false, why);
+    requestCompaction(r, false, why, Priority.NORMAL);
+  }
+
+  public synchronized void requestCompaction(final HRegion r,
+      final String why, Priority p) {
+    requestCompaction(r, false, why, p);
+  }
+
+  public synchronized void requestCompaction(final HRegion r,
+      final boolean force, final String why) {
+    requestCompaction(r, force, why, Priority.NORMAL);
   }
 
   /**
@@ -123,7 +152,7 @@ public class CompactSplitThread extends 
    * @param why Why compaction requested -- used in debug messages
    */
   public synchronized void requestCompaction(final HRegion r,
-      final boolean force, final String why) {
+      final boolean force, final String why, Priority priority) {
     if (this.server.isStopped()) {
       return;
     }
@@ -131,14 +160,10 @@ public class CompactSplitThread extends 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Compaction " + (force? "(major) ": "") +
         "requested for region " + r.getRegionNameAsString() +
-        (why != null && !why.isEmpty()? " because: " + why: ""));
-    }
-    synchronized (regionsInQueue) {
-      if (!regionsInQueue.contains(r)) {
-        compactionQueue.add(r);
-        regionsInQueue.add(r);
-      }
+        (why != null && !why.isEmpty()? " because: " + why: "") +
+        "; Priority: " + priority + "; Compaction queue size: " + compactionQueue.size());
     }
+    compactionQueue.add(r, priority);
   }
 
   private void split(final HRegion parent, final byte [] midKey)

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1002019&r1=1002018&r2=1002019&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Tue
Sep 28 05:26:22 2010
@@ -212,7 +212,8 @@ class MemStoreFlusher extends Thread imp
           LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
             "store files; delaying flush up to " + this.blockingWaitTime + "ms");
         }
-        this.server.compactSplitThread.requestCompaction(region, getName());
+        this.server.compactSplitThread.requestCompaction(region, getName(),
+          CompactSplitThread.Priority.HIGH_BLOCKING);
         // Put back on the queue.  Have it come back out of the queue
         // after a delay of this.blockingWaitTime / 100 ms.
         this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java?rev=1002019&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java
(added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java
Tue Sep 28 05:26:22 2010
@@ -0,0 +1,375 @@
+/**
+* Copyright 2010 The Apache Software Foundation
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.regionserver.CompactSplitThread.Priority;
+
+/**
+ * This class delegates to the BlockingQueue but wraps all HRegions in
+ * compaction requests that hold the priority and the date requested.
+ *
+ * Implementation Note: With an elevation time of -1 there is the potential for
+ * starvation of the lower priority compaction requests as long as there is a
+ * constant stream of high priority requests.
+ */
+public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
+  static final Log LOG = LogFactory.getLog(PriorityCompactionQueue.class);
+
+  /**
+   * This class represents a compaction request and holds the region, priority,
+   * and time submitted.
+   */
+  private class CompactionRequest implements Comparable<CompactionRequest> {
+    private final HRegion r;
+    private final Priority p;
+    private final Date date;
+
+    public CompactionRequest(HRegion r, Priority p) {
+      this(r, p, null);
+    }
+
+    public CompactionRequest(HRegion r, Priority p, Date d) {
+      if (r == null) {
+        throw new NullPointerException("HRegion cannot be null");
+      }
+
+      if (p == null) {
+        p = Priority.NORMAL; //the default priority
+      }
+
+      if (d == null) {
+        d = new Date();
+      }
+
+      this.r = r;
+      this.p = p;
+      this.date = d;
+    }
+
+    /**
+     * This function will define where in the priority queue the request will
+     * end up.  Those with the highest priorities will be first.  When the
+     * priorities are the same it will It will first compare priority then date
+     * to maintain a FIFO functionality.
+     *
+     * <p>Note: The date is only accurate to the millisecond which means it is
+     * possible that two requests were inserted into the queue within a
+     * millisecond.  When that is the case this function will break the tie
+     * arbitrarily.
+     */
+    @Override
+    public int compareTo(CompactionRequest request) {
+      //NOTE: The head of the priority queue is the least element
+      if (this.equals(request)) {
+        return 0; //they are the same request
+      }
+      int compareVal;
+
+      compareVal = p.compareTo(request.p); //compare priority
+      if (compareVal != 0) {
+        return compareVal;
+      }
+
+      compareVal = date.compareTo(request.date);
+      if (compareVal != 0) {
+        return compareVal;
+      }
+
+      //break the tie arbitrarily
+      return -1;
+    }
+
+    /** Gets the HRegion for the request */
+    HRegion getHRegion() {
+      return r;
+    }
+
+    /** Gets the priority for the request */
+    Priority getPriority() {
+      return p;
+    }
+
+    public String toString() {
+      return "regionName=" + r.getRegionNameAsString() +
+        ", priority=" + p + ", date=" + date;
+    }
+  }
+
+  /** The actual blocking queue we delegate to */
+  protected final BlockingQueue<CompactionRequest> queue =
+    new PriorityBlockingQueue<CompactionRequest>();
+
+  /** Hash map of the HRegions contained within the Compaction Queue */
+  private final HashMap<HRegion, CompactionRequest> regionsInQueue =
+    new HashMap<HRegion, CompactionRequest>();
+
+  /** Creates a new PriorityCompactionQueue with no priority elevation time */
+  public PriorityCompactionQueue() {
+    LOG.debug("Create PriorityCompactionQueue");
+  }
+
+  /** If the region is not already in the queue it will add it and return a
+   * new compaction request object.  If it is already present in the queue
+   * then it will return null.
+   * @param p If null it will use the default priority
+   * @return returns a compaction request if it isn't already in the queue
+   */
+  protected CompactionRequest addToRegionsInQueue(HRegion r, Priority p) {
+    CompactionRequest queuedRequest = null;
+    CompactionRequest newRequest = new CompactionRequest(r, p);
+    synchronized (regionsInQueue) {
+      queuedRequest = regionsInQueue.get(r);
+      if (queuedRequest == null ||
+          newRequest.getPriority().compareTo(queuedRequest.getPriority()) < 0) {
+        LOG.trace("Inserting region in queue. " + newRequest);
+        regionsInQueue.put(r, newRequest);
+      } else {
+        LOG.trace("Region already in queue, skipping. Queued: " + queuedRequest +
+          ", requested: " + newRequest);
+        newRequest = null; // It is already present so don't add it
+      }
+    }
+
+    if (newRequest != null && queuedRequest != null) {
+      // Remove the lower priority request
+      queue.remove(queuedRequest);
+    }
+
+    return newRequest;
+  }
+
+  /** Removes the request from the regions in queue
+   * @param p If null it will use the default priority
+   */
+  protected CompactionRequest removeFromRegionsInQueue(HRegion r) {
+    if (r == null) return null;
+
+    synchronized (regionsInQueue) {
+      CompactionRequest cr = regionsInQueue.remove(r);
+      if (cr == null) {
+        LOG.warn("Removed a region it couldn't find in regionsInQueue: " + r);
+      }
+      return cr;
+    }
+  }
+
+  public boolean add(HRegion e, Priority p) {
+    CompactionRequest request = this.addToRegionsInQueue(e, p);
+    if (request != null) {
+      boolean result = queue.add(request);
+      queue.peek();
+      return result;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean add(HRegion e) {
+    return add(e, null);
+  }
+
+  public boolean offer(HRegion e, Priority p) {
+    CompactionRequest request = this.addToRegionsInQueue(e, p);
+    return (request != null)? queue.offer(request): false;
+  }
+
+  @Override
+  public boolean offer(HRegion e) {
+    return offer(e, null);
+  }
+
+  public void put(HRegion e, Priority p) throws InterruptedException {
+    CompactionRequest request = this.addToRegionsInQueue(e, p);
+    if (request != null) {
+      queue.put(request);
+    }
+  }
+
+  @Override
+  public void put(HRegion e) throws InterruptedException {
+    put(e, null);
+  }
+
+  public boolean offer(HRegion e, Priority p, long timeout, TimeUnit unit)
+  throws InterruptedException {
+    CompactionRequest request = this.addToRegionsInQueue(e, p);
+    return (request != null)? queue.offer(request, timeout, unit): false;
+  }
+
+  @Override
+  public boolean offer(HRegion e, long timeout, TimeUnit unit)
+  throws InterruptedException {
+    return offer(e, null, timeout, unit);
+  }
+
+  @Override
+  public HRegion take() throws InterruptedException {
+    CompactionRequest cr = queue.take();
+    if (cr != null) {
+      removeFromRegionsInQueue(cr.getHRegion());
+      return cr.getHRegion();
+    }
+    return null;
+  }
+
+  @Override
+  public HRegion poll(long timeout, TimeUnit unit) throws InterruptedException {
+    CompactionRequest cr = queue.poll(timeout, unit);
+    if (cr != null) {
+      removeFromRegionsInQueue(cr.getHRegion());
+      return cr.getHRegion();
+    }
+    return null;
+  }
+
+  @Override
+  public boolean remove(Object r) {
+    if (r instanceof HRegion) {
+      CompactionRequest cr = removeFromRegionsInQueue((HRegion) r);
+      if (cr != null) {
+        return queue.remove(cr);
+      }
+    }
+
+    return false;
+  }
+
+  @Override
+  public HRegion remove() {
+    CompactionRequest cr = queue.remove();
+    if (cr != null) {
+      removeFromRegionsInQueue(cr.getHRegion());
+      return cr.getHRegion();
+    }
+    return null;
+  }
+
+  @Override
+  public HRegion poll() {
+    CompactionRequest cr = queue.poll();
+    if (cr != null) {
+      removeFromRegionsInQueue(cr.getHRegion());
+      return cr.getHRegion();
+    }
+    return null;
+  }
+
+  @Override
+  public int remainingCapacity() {
+    return queue.remainingCapacity();
+  }
+
+  @Override
+  public boolean contains(Object r) {
+    if (r instanceof HRegion) {
+      synchronized (regionsInQueue) {
+        return regionsInQueue.containsKey((HRegion) r);
+      }
+    } else if (r instanceof CompactionRequest) {
+      return queue.contains(r);
+    }
+    return false;
+  }
+
+  @Override
+  public HRegion element() {
+    CompactionRequest cr = queue.element();
+    return (cr != null)? cr.getHRegion(): null;
+  }
+
+  @Override
+  public HRegion peek() {
+    CompactionRequest cr = queue.peek();
+    return (cr != null)? cr.getHRegion(): null;
+  }
+
+  @Override
+  public int size() {
+    return queue.size();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return queue.isEmpty();
+  }
+
+  @Override
+  public void clear() {
+    regionsInQueue.clear();
+    queue.clear();
+  }
+
+  // Unimplemented methods, collection methods
+
+  @Override
+  public Iterator<HRegion> iterator() {
+    throw new UnsupportedOperationException("Not supported.");
+  }
+
+  @Override
+  public Object[] toArray() {
+    throw new UnsupportedOperationException("Not supported.");
+  }
+
+  @Override
+  public <T> T[] toArray(T[] a) {
+    throw new UnsupportedOperationException("Not supported.");
+  }
+
+  @Override
+  public boolean containsAll(Collection<?> c) {
+    throw new UnsupportedOperationException("Not supported.");
+  }
+
+  @Override
+  public boolean addAll(Collection<? extends HRegion> c) {
+    throw new UnsupportedOperationException("Not supported.");
+  }
+
+  @Override
+  public boolean removeAll(Collection<?> c) {
+    throw new UnsupportedOperationException("Not supported.");
+  }
+
+  @Override
+  public boolean retainAll(Collection<?> c) {
+    throw new UnsupportedOperationException("Not supported.");
+  }
+
+  @Override
+  public int drainTo(Collection<? super HRegion> c) {
+    throw new UnsupportedOperationException("Not supported.");
+  }
+
+  @Override
+  public int drainTo(Collection<? super HRegion> c, int maxElements) {
+    throw new UnsupportedOperationException("Not supported.");
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java?rev=1002019&r1=1002018&r2=1002019&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
(original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
Tue Sep 28 05:26:22 2010
@@ -20,7 +20,7 @@
 
 package org.apache.hadoop.hbase.regionserver.wal;
 
-import java.io.EOFException;
+import java.io.FilterInputStream;
 import java.io.IOException;
 import java.lang.Class;
 import java.lang.reflect.Constructor;
@@ -78,18 +78,43 @@ public class SequenceFileLogReader imple
         this.length = l;
       }
 
+      // This section can be confusing.  It is specific to how HDFS works.
+      // Let me try to break it down.  This is the problem:
+      //
+      //  1. HDFS DataNodes update the NameNode about a filename's length 
+      //     on block boundaries or when a file is closed. Therefore, 
+      //     if an RS dies, then the NN's fs.getLength() can be out of date
+      //  2. this.in.available() would work, but it returns int &
+      //     therefore breaks for files > 2GB (happens on big clusters)
+      //  3. DFSInputStream.getFileLength() gets the actual length from the DNs
+      //  4. DFSInputStream is wrapped 2 levels deep : this.in.in
+      //
+      // So, here we adjust getPos() using getFileLength() so the
+      // SequenceFile.Reader constructor (aka: first invocation) comes out 
+      // with the correct end of the file:
+      //         this.end = in.getPos() + length;
       @Override
       public long getPos() throws IOException {
         if (this.firstGetPosInvocation) {
           this.firstGetPosInvocation = false;
-          // Tell a lie.  We're doing this just so that this line up in
-          // SequenceFile.Reader constructor comes out with the correct length
-          // on the file:
-          //         this.end = in.getPos() + length;
-          long available = this.in.available();
-          // Length gets added up in the SF.Reader constructor so subtract the
-          // difference.  If available < this.length, then return this.length.
-          return available >= this.length? available - this.length: this.length;
+          long adjust = 0;
+
+          try {
+            Field fIn = FilterInputStream.class.getDeclaredField("in");
+            fIn.setAccessible(true);
+            Object realIn = fIn.get(this.in);
+            long realLength = ((Long)realIn.getClass().
+                getMethod("getFileLength", new Class<?> []{}).
+                invoke(realIn, new Object []{})).longValue();
+            assert(realLength >= this.length);
+            adjust = realLength - this.length;
+          } catch(Exception e) {
+            SequenceFileLogReader.LOG.warn(
+              "Error while trying to get accurate file length.  " +
+              "Truncation / data loss may occur if RegionServers die.", e);
+          }
+
+          return adjust + super.getPos();
         }
         return super.getPos();
       }

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java?rev=1002019&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java
(added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java
Tue Sep 28 05:26:22 2010
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.regionserver.CompactSplitThread.Priority;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test class for the priority compaction queue
+ */
+public class TestPriorityCompactionQueue {
+  static final Log LOG = LogFactory.getLog(TestPriorityCompactionQueue.class);
+
+  @Before
+  public void setUp() {
+  }
+
+  @After
+  public void tearDown() {
+
+  }
+
+  class DummyHRegion extends HRegion {
+    String name;
+
+    DummyHRegion(String name) {
+      super();
+      this.name = name;
+    }
+
+    public int hashCode() {
+      return name.hashCode();
+    }
+
+    public boolean equals(DummyHRegion r) {
+      return name.equals(r.name);
+    }
+
+    public String toString() {
+      return "[DummyHRegion " + name + "]";
+    }
+
+    public String getRegionNameAsString() {
+      return name;
+    }
+  }
+
+  protected void getAndCheckRegion(PriorityCompactionQueue pq,
+      HRegion checkRegion) {
+    HRegion r = pq.remove();
+    if (r != checkRegion) {
+      Assert.assertTrue("Didn't get expected " + checkRegion + " got " + r, r
+          .equals(checkRegion));
+    }
+  }
+
+  protected void addRegion(PriorityCompactionQueue pq, HRegion r, Priority p) {
+    pq.add(r, p);
+    try {
+      // Sleep 10 millisecond so 2 things are not put in the queue within the
+      // same millisecond. The queue breaks ties arbitrarily between two
+      // requests inserted at the same time. We want the ordering to
+      // be consistent for our unit test.
+      Thread.sleep(1);
+    } catch (InterruptedException ex) {
+      // continue
+    }
+  }
+
+  // ////////////////////////////////////////////////////////////////////////////
+  // tests
+  // ////////////////////////////////////////////////////////////////////////////
+
+  /** tests general functionality of the compaction queue */
+  @Test public void testPriorityQueue() throws InterruptedException {
+    PriorityCompactionQueue pq = new PriorityCompactionQueue();
+
+    HRegion r1 = new DummyHRegion("r1");
+    HRegion r2 = new DummyHRegion("r2");
+    HRegion r3 = new DummyHRegion("r3");
+    HRegion r4 = new DummyHRegion("r4");
+    HRegion r5 = new DummyHRegion("r5");
+
+    // test 1
+    // check fifo w/priority
+    addRegion(pq, r1, Priority.HIGH_BLOCKING);
+    addRegion(pq, r2, Priority.HIGH_BLOCKING);
+    addRegion(pq, r3, Priority.HIGH_BLOCKING);
+    addRegion(pq, r4, Priority.HIGH_BLOCKING);
+    addRegion(pq, r5, Priority.HIGH_BLOCKING);
+
+    getAndCheckRegion(pq, r1);
+    getAndCheckRegion(pq, r2);
+    getAndCheckRegion(pq, r3);
+    getAndCheckRegion(pq, r4);
+    getAndCheckRegion(pq, r5);
+
+    // test 2
+    // check fifo
+    addRegion(pq, r1, null);
+    addRegion(pq, r2, null);
+    addRegion(pq, r3, null);
+    addRegion(pq, r4, null);
+    addRegion(pq, r5, null);
+
+    getAndCheckRegion(pq, r1);
+    getAndCheckRegion(pq, r2);
+    getAndCheckRegion(pq, r3);
+    getAndCheckRegion(pq, r4);
+    getAndCheckRegion(pq, r5);
+
+    // test 3
+    // check fifo w/mixed priority
+    addRegion(pq, r1, Priority.HIGH_BLOCKING);
+    addRegion(pq, r2, Priority.NORMAL);
+    addRegion(pq, r3, Priority.HIGH_BLOCKING);
+    addRegion(pq, r4, Priority.NORMAL);
+    addRegion(pq, r5, Priority.HIGH_BLOCKING);
+
+    getAndCheckRegion(pq, r1);
+    getAndCheckRegion(pq, r3);
+    getAndCheckRegion(pq, r5);
+    getAndCheckRegion(pq, r2);
+    getAndCheckRegion(pq, r4);
+
+    // test 4
+    // check fifo w/mixed priority
+    addRegion(pq, r1, Priority.NORMAL);
+    addRegion(pq, r2, Priority.NORMAL);
+    addRegion(pq, r3, Priority.NORMAL);
+    addRegion(pq, r4, Priority.NORMAL);
+    addRegion(pq, r5, Priority.HIGH_BLOCKING);
+
+    getAndCheckRegion(pq, r5);
+    getAndCheckRegion(pq, r1);
+    getAndCheckRegion(pq, r2);
+    getAndCheckRegion(pq, r3);
+    getAndCheckRegion(pq, r4);
+
+    // test 5
+    // check fifo w/mixed priority elevation time
+    addRegion(pq, r1, Priority.NORMAL);
+    addRegion(pq, r2, Priority.HIGH_BLOCKING);
+    addRegion(pq, r3, Priority.NORMAL);
+    Thread.sleep(1000);
+    addRegion(pq, r4, Priority.NORMAL);
+    addRegion(pq, r5, Priority.HIGH_BLOCKING);
+
+    getAndCheckRegion(pq, r2);
+    getAndCheckRegion(pq, r5);
+    getAndCheckRegion(pq, r1);
+    getAndCheckRegion(pq, r3);
+    getAndCheckRegion(pq, r4);
+
+    // reset the priority compaction queue back to a normal queue
+    pq = new PriorityCompactionQueue();
+
+    // test 7
+    // test that lower priority are removed from the queue when a high priority
+    // is added
+    addRegion(pq, r1, Priority.NORMAL);
+    addRegion(pq, r2, Priority.NORMAL);
+    addRegion(pq, r3, Priority.NORMAL);
+    addRegion(pq, r4, Priority.NORMAL);
+    addRegion(pq, r5, Priority.NORMAL);
+    addRegion(pq, r3, Priority.HIGH_BLOCKING);
+
+    getAndCheckRegion(pq, r3);
+    getAndCheckRegion(pq, r1);
+    getAndCheckRegion(pq, r2);
+    getAndCheckRegion(pq, r4);
+    getAndCheckRegion(pq, r5);
+
+    Assert.assertTrue("Queue should be empty.", pq.size() == 0);
+
+    // test 8
+    // don't add the same region more than once
+    addRegion(pq, r1, Priority.NORMAL);
+    addRegion(pq, r2, Priority.NORMAL);
+    addRegion(pq, r3, Priority.NORMAL);
+    addRegion(pq, r4, Priority.NORMAL);
+    addRegion(pq, r5, Priority.NORMAL);
+    addRegion(pq, r1, Priority.NORMAL);
+    addRegion(pq, r2, Priority.NORMAL);
+    addRegion(pq, r3, Priority.NORMAL);
+    addRegion(pq, r4, Priority.NORMAL);
+    addRegion(pq, r5, Priority.NORMAL);
+
+    getAndCheckRegion(pq, r1);
+    getAndCheckRegion(pq, r2);
+    getAndCheckRegion(pq, r3);
+    getAndCheckRegion(pq, r4);
+    getAndCheckRegion(pq, r5);
+
+    Assert.assertTrue("Queue should be empty.", pq.size() == 0);
+  }
+}
\ No newline at end of file



Mime
View raw message