hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nspiegelb...@apache.org
Subject svn commit: r1181527 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/compactions/ test/java/org/apache/hadoop/hbase/regionserver/
Date Tue, 11 Oct 2011 02:16:50 GMT
Author: nspiegelberg
Date: Tue Oct 11 02:16:50 2011
New Revision: 1181527

URL: http://svn.apache.org/viewvc?rev=1181527&view=rev
Log:
First cut of store based compactions

Summary:
Changed all compaction requests to be per store instead of per region.

Note: this is not complete, wanted to get a diff out and continue working.
CompactSplitThread.java:run() is not completely done the way we want to - wanted
to discuss how we wanted to proceed here before doing it.

Test Plan:
Not tested yet.

Reviewed By: nspiegelberg
Reviewers: jgray, nspiegelberg, aaiyer, kannan
Commenters: kannan
CC: , kranganathan, nspiegelberg, kannan
Differential Revision: 233737

Added:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1181527&r1=1181526&r2=1181527&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
(original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
Tue Oct 11 02:16:50 2011
@@ -27,6 +27,8 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.util.StringUtils;
 
@@ -69,37 +71,45 @@ class CompactSplitThread extends Thread 
   @Override
   public void run() {
     while (!this.server.isStopRequested()) {
+      CompactionRequest compactionRequest = null;
       HRegion r = null;
       try {
-        r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
-        if (r != null) {
+        compactionRequest = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
+        if (compactionRequest != null) {
           lock.lock();
           try {
             if(!this.server.isStopRequested()) {
               // Don't interrupt us while we are working
-              byte [] midKey = r.compactStores();
+              r = compactionRequest.getHRegion();
+              byte [] midKey = r.compactStore(compactionRequest.getStore());
               if (r.getLastCompactInfo() != null) {  // compaction aborted?
                 this.server.getMetrics().addCompaction(r.getLastCompactInfo());
               }
               if (LOG.isDebugEnabled()) {
-                HRegion next = this.compactionQueue.peek();
+                CompactionRequest next = this.compactionQueue.peek();
                 LOG.debug("Just finished a compaction. " +
                           " Current Compaction Queue: size=" +
                           getCompactionQueueSize() +
                           ((next != null) ?
-                              ", topPri=" + next.getCompactPriority() : ""));
+                              ", topPri=" + next.getPriority() : ""));
               }
               if (!this.server.isStopRequested()) {
                 // requests that were added during compaction will have a
                 // stale priority. remove and re-insert to update priority
-                boolean hadCompaction = compactionQueue.remove(r);
+                boolean hadCompaction = compactionQueue.remove(compactionRequest);
                 if (midKey != null) {
                   split(r, midKey);
                 } else if (hadCompaction) {
-                  compactionQueue.add(r);
-                } else if (r.getCompactPriority() < PRIORITY_USER) {
+                  // recompute the priority for a request already in the queue
+                  LOG.debug("Re-computing priority for compaction request " + compactionRequest);
+                  compactionRequest.setPriority(compactionRequest.getStore().getCompactPriority());
+                  compactionQueue.add(compactionRequest);
+                } else if (compactionRequest.getStore().getCompactPriority() < PRIORITY_USER)
{
                   // degenerate case. recursively enqueue blocked regions
-                  compactionQueue.add(r);
+                  LOG.debug("Re-queueing with " + compactionRequest.getStore().getCompactPriority()
+
+                      " priority for compaction request " + compactionRequest);
+                  compactionRequest.setPriority(compactionRequest.getStore().getCompactPriority());
+                  compactionQueue.add(compactionRequest);
                 }
               }
             }
@@ -135,7 +145,9 @@ class CompactSplitThread extends Thread 
    */
   public synchronized void requestCompaction(final HRegion r,
       final String why) {
-    requestCompaction(r, false, why, r.getCompactPriority());
+    for(Store s : r.getStores().values()) {
+      requestCompaction(r, s, false, why, s.getCompactPriority());
+    }
   }
 
   public synchronized void requestCompaction(final HRegion r,
@@ -143,12 +155,19 @@ class CompactSplitThread extends Thread 
     requestCompaction(r, false, why, p);
   }
 
+  public synchronized void requestCompaction(final HRegion r,
+      final boolean force, final String why, int p) {
+    for(Store s : r.getStores().values()) {
+      requestCompaction(r, s, force, why, p);
+    }
+  }
+
   /**
    * @param r HRegion store belongs to
    * @param force Whether next compaction should be major
    * @param why Why compaction requested -- used in debug messages
    */
-  public synchronized void requestCompaction(final HRegion r,
+  public synchronized void requestCompaction(final HRegion r, final Store s,
       final boolean force, final String why, int priority) {
 
     boolean addedToQueue = false;
@@ -159,16 +178,16 @@ class CompactSplitThread extends Thread 
 
     // tell the region to major-compact (and don't downgrade it)
     if (force) {
-      r.setForceMajorCompaction(force);
+      s.setForceMajorCompaction(force);
     }
-
-    addedToQueue = compactionQueue.add(r, priority);
-
+    CompactionRequest compactionRequest = new CompactionRequest(r, s, priority);
+    addedToQueue = compactionQueue.add(compactionRequest);
     // only log if actually added to compaction queue...
     if (addedToQueue && LOG.isDebugEnabled()) {
       LOG.debug("Compaction " + (force? "(major) ": "") +
         "requested for region " + r.getRegionNameAsString() +
         "/" + r.getRegionInfo().getEncodedName() +
+        ", store " + s +
         (why != null && !why.isEmpty()? " because: " + why: "") +
         "; Priority: " + priority + "; Compaction queue size: " + compactionQueue.size());
     }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1181527&r1=1181526&r2=1181527&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue
Oct 11 02:16:50 2011
@@ -890,6 +890,20 @@ public class HRegion implements HeapSize
     return compactStores();
   }
 
+  /**
+   * Compact all the stores and return the split key of the first store that needs
+   * to be split.
+   */
+  public byte[] compactStores() throws IOException {
+    byte[] splitRow = null;
+    for(Store s : getStores().values()) {
+      if(splitRow == null) {
+        splitRow = compactStore(s);
+      }
+    }
+    return splitRow;
+  }
+
   /*
    * Called by compaction thread and after region is opened to compact the
    * HStores if necessary.
@@ -905,7 +919,7 @@ public class HRegion implements HeapSize
    * @return split row if split is needed
    * @throws IOException e
    */
-  public byte [] compactStores()
+  public byte [] compactStore(Store store)
   throws IOException {
     if (this.closing.get() || this.closed.get()) {
       LOG.debug("Skipping compaction on " + this + " because closing/closed");
@@ -933,16 +947,12 @@ public class HRegion implements HeapSize
         long startTime = EnvironmentEdgeManager.currentTimeMillis();
         doRegionCompactionPrep();
         long lastCompactSize = 0;
-        long maxSize = -1;
         boolean completed = false;
         try {
-          for (Store store: stores.values()) {
-            final Store.StoreSize ss = store.compact();
-            lastCompactSize += store.getLastCompactSize();
-            if (ss != null && ss.getSize() > maxSize) {
-              maxSize = ss.getSize();
-              splitRow = ss.getSplitRow();
-            }
+          final Store.StoreSize ss = store.compact();
+          lastCompactSize += store.getLastCompactSize();
+          if (ss != null) {
+            splitRow = ss.getSplitRow();
           }
           completed = true;
         } catch (InterruptedIOException iioe) {
@@ -2230,6 +2240,10 @@ public class HRegion implements HeapSize
     return this.stores.get(column);
   }
 
+  public Map<byte[], Store> getStores() {
+    return this.stores;
+  }
+
   //////////////////////////////////////////////////////////////////////////////
   // Support code
   //////////////////////////////////////////////////////////////////////////////

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java?rev=1181527&r1=1181526&r2=1181527&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java
(original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java
Tue Oct 11 02:16:50 2011
@@ -20,132 +20,62 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.util.Collection;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 
 /**
- * This class delegates to the BlockingQueue but wraps all HRegions in
+ * This class delegates to the BlockingQueue but wraps all Stores in
  * compaction requests that hold the priority and the date requested.
  *
  * Implementation Note: With an elevation time of -1 there is the potential for
  * starvation of the lower priority compaction requests as long as there is a
  * constant stream of high priority requests.
  */
-public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
+public class PriorityCompactionQueue implements BlockingQueue<CompactionRequest> {
   static final Log LOG = LogFactory.getLog(PriorityCompactionQueue.class);
 
-  /**
-   * This class represents a compaction request and holds the region, priority,
-   * and time submitted.
-   */
-  private class CompactionRequest implements Comparable<CompactionRequest> {
-    private final HRegion r;
-    private final int p;
-    private final Date date;
-
-    public CompactionRequest(HRegion r, int p) {
-      this(r, p, null);
-    }
-
-    public CompactionRequest(HRegion r, int p, Date d) {
-      if (r == null) {
-        throw new NullPointerException("HRegion cannot be null");
-      }
-
-      if (d == null) {
-        d = new Date();
-      }
-
-      this.r = r;
-      this.p = p;
-      this.date = d;
-    }
-
-    /**
-     * This function will define where in the priority queue the request will
-     * end up.  Those with the highest priorities will be first.  When the
-     * priorities are the same it will It will first compare priority then date
-     * to maintain a FIFO functionality.
-     *
-     * <p>Note: The date is only accurate to the millisecond which means it is
-     * possible that two requests were inserted into the queue within a
-     * millisecond.  When that is the case this function will break the tie
-     * arbitrarily.
-     */
-    @Override
-    public int compareTo(CompactionRequest request) {
-      //NOTE: The head of the priority queue is the least element
-      if (this.equals(request)) {
-        return 0; //they are the same request
-      }
-      int compareVal;
-
-      compareVal = p - request.p; //compare priority
-      if (compareVal != 0) {
-        return compareVal;
-      }
-
-      compareVal = date.compareTo(request.date);
-      if (compareVal != 0) {
-        return compareVal;
-      }
-
-      //break the tie arbitrarily
-      return -1;
-    }
-
-    /** Gets the HRegion for the request */
-    HRegion getHRegion() {
-      return r;
-    }
-
-    /** Gets the priority for the request */
-    int getPriority() {
-      return p;
-    }
-
-    public String toString() {
-      return "regionName=" + r.getRegionNameAsString() +
-        ", priority=" + p + ", date=" + date;
-    }
-  }
-
   /** The actual blocking queue we delegate to */
   protected final BlockingQueue<CompactionRequest> queue =
     new PriorityBlockingQueue<CompactionRequest>();
 
-  /** Hash map of the HRegions contained within the Compaction Queue */
-  private final HashMap<HRegion, CompactionRequest> regionsInQueue =
-    new HashMap<HRegion, CompactionRequest>();
+  /** Hash map of the Stores contained within the Compaction Queue */
+  private final HashMap<Store, CompactionRequest> storesInQueue =
+    new HashMap<Store, CompactionRequest>();
 
   /** Creates a new PriorityCompactionQueue with no priority elevation time */
   public PriorityCompactionQueue() {
     LOG.debug("Create PriorityCompactionQueue");
   }
 
-  /** If the region is not already in the queue it will add it and return a
+  /** If the store is not already in the queue it will add it and return a
    * new compaction request object.  If it is already present in the queue
    * then it will return null.
    * @param p If null it will use the default priority
    * @return returns a compaction request if it isn't already in the queue
    */
-  protected CompactionRequest addToRegionsInQueue(HRegion r, int p) {
+  protected CompactionRequest addToCompactionQueue(CompactionRequest newRequest) {
     CompactionRequest queuedRequest = null;
-    CompactionRequest newRequest = new CompactionRequest(r, p);
-    synchronized (regionsInQueue) {
-      queuedRequest = regionsInQueue.get(r);
+    synchronized (storesInQueue) {
+      queuedRequest = storesInQueue.get(newRequest.getStore());
       if (queuedRequest == null ||
           newRequest.getPriority() < queuedRequest.getPriority()) {
-        LOG.trace("Inserting region in queue. " + newRequest);
-        regionsInQueue.put(r, newRequest);
+        String reason = "";
+        if (newRequest.getPriority() < queuedRequest.getPriority()) {
+          reason = "Reason : priority changed from " +
+            queuedRequest.getPriority() + " to " +
+            newRequest.getPriority() + ". ";
+        }
+        LOG.debug("Inserting store in queue. " + reason + newRequest);
+        storesInQueue.put(newRequest.getStore(), newRequest);
       } else {
-        LOG.trace("Region already in queue, skipping. Queued: " + queuedRequest +
+        LOG.debug("Store already in queue, skipping. Queued: " + queuedRequest +
           ", requested: " + newRequest);
         newRequest = null; // It is already present so don't add it
       }
@@ -159,23 +89,25 @@ public class PriorityCompactionQueue imp
     return newRequest;
   }
 
-  /** Removes the request from the regions in queue
+  /** Removes the request from the stores in queue
    * @param p If null it will use the default priority
    */
-  protected CompactionRequest removeFromRegionsInQueue(HRegion r) {
-    if (r == null) return null;
+  protected CompactionRequest removeFromQueue(CompactionRequest c) {
+    if (c == null) return null;
 
-    synchronized (regionsInQueue) {
-      CompactionRequest cr = regionsInQueue.remove(r);
+    synchronized (storesInQueue) {
+      CompactionRequest cr = storesInQueue.remove(c.getStore());
       if (cr == null) {
-        LOG.warn("Removed a region it couldn't find in regionsInQueue: " + r);
+        LOG.warn("Removed a compaction request it couldn't find in storesInQueue: " +
+            "region = " + c.getHRegion() + ", store = " + c.getStore());
       }
       return cr;
     }
   }
 
-  public boolean add(HRegion e, int p) {
-    CompactionRequest request = this.addToRegionsInQueue(e, p);
+  @Override
+  public boolean add(CompactionRequest e) {
+    CompactionRequest request = this.addToCompactionQueue(e);
     if (request != null) {
       boolean result = queue.add(request);
       queue.peek();
@@ -186,68 +118,50 @@ public class PriorityCompactionQueue imp
   }
 
   @Override
-  public boolean add(HRegion e) {
-    return add(e, e.getCompactPriority());
-  }
-
-  public boolean offer(HRegion e, int p) {
-    CompactionRequest request = this.addToRegionsInQueue(e, p);
+  public boolean offer(CompactionRequest e) {
+    CompactionRequest request = this.addToCompactionQueue(e);
     return (request != null)? queue.offer(request): false;
   }
 
   @Override
-  public boolean offer(HRegion e) {
-    return offer(e, e.getCompactPriority());
-  }
-
-  public void put(HRegion e, int p) throws InterruptedException {
-    CompactionRequest request = this.addToRegionsInQueue(e, p);
+  public void put(CompactionRequest e) throws InterruptedException {
+    CompactionRequest request = this.addToCompactionQueue(e);
     if (request != null) {
       queue.put(request);
     }
   }
 
   @Override
-  public void put(HRegion e) throws InterruptedException {
-    put(e, e.getCompactPriority());
-  }
-
-  public boolean offer(HRegion e, int p, long timeout, TimeUnit unit)
+  public boolean offer(CompactionRequest e, long timeout, TimeUnit unit)
   throws InterruptedException {
-    CompactionRequest request = this.addToRegionsInQueue(e, p);
+    CompactionRequest request = this.addToCompactionQueue(e);
     return (request != null)? queue.offer(request, timeout, unit): false;
   }
 
   @Override
-  public boolean offer(HRegion e, long timeout, TimeUnit unit)
-  throws InterruptedException {
-    return offer(e, e.getCompactPriority(), timeout, unit);
-  }
-
-  @Override
-  public HRegion take() throws InterruptedException {
+  public CompactionRequest take() throws InterruptedException {
     CompactionRequest cr = queue.take();
     if (cr != null) {
-      removeFromRegionsInQueue(cr.getHRegion());
-      return cr.getHRegion();
+      removeFromQueue(cr);
+      return cr;
     }
     return null;
   }
 
   @Override
-  public HRegion poll(long timeout, TimeUnit unit) throws InterruptedException {
+  public CompactionRequest poll(long timeout, TimeUnit unit) throws InterruptedException
{
     CompactionRequest cr = queue.poll(timeout, unit);
     if (cr != null) {
-      removeFromRegionsInQueue(cr.getHRegion());
-      return cr.getHRegion();
+      removeFromQueue(cr);
+      return cr;
     }
     return null;
   }
 
   @Override
-  public boolean remove(Object r) {
-    if (r instanceof HRegion) {
-      CompactionRequest cr = removeFromRegionsInQueue((HRegion) r);
+  public boolean remove(Object o) {
+    if (o instanceof CompactionRequest) {
+      CompactionRequest cr = removeFromQueue((CompactionRequest) o);
       if (cr != null) {
         return queue.remove(cr);
       }
@@ -257,21 +171,21 @@ public class PriorityCompactionQueue imp
   }
 
   @Override
-  public HRegion remove() {
+  public CompactionRequest remove() {
     CompactionRequest cr = queue.remove();
     if (cr != null) {
-      removeFromRegionsInQueue(cr.getHRegion());
-      return cr.getHRegion();
+      removeFromQueue(cr);
+      return cr;
     }
     return null;
   }
 
   @Override
-  public HRegion poll() {
+  public CompactionRequest poll() {
     CompactionRequest cr = queue.poll();
     if (cr != null) {
-      removeFromRegionsInQueue(cr.getHRegion());
-      return cr.getHRegion();
+      removeFromQueue(cr);
+      return cr;
     }
     return null;
   }
@@ -283,9 +197,9 @@ public class PriorityCompactionQueue imp
 
   @Override
   public boolean contains(Object r) {
-    if (r instanceof HRegion) {
-      synchronized (regionsInQueue) {
-        return regionsInQueue.containsKey((HRegion) r);
+    if (r instanceof CompactionRequest) {
+      synchronized (storesInQueue) {
+        return storesInQueue.containsKey((CompactionRequest) r);
       }
     } else if (r instanceof CompactionRequest) {
       return queue.contains(r);
@@ -294,15 +208,15 @@ public class PriorityCompactionQueue imp
   }
 
   @Override
-  public HRegion element() {
+  public CompactionRequest element() {
     CompactionRequest cr = queue.element();
-    return (cr != null)? cr.getHRegion(): null;
+    return (cr != null)? cr: null;
   }
 
   @Override
-  public HRegion peek() {
+  public CompactionRequest peek() {
     CompactionRequest cr = queue.peek();
-    return (cr != null)? cr.getHRegion(): null;
+    return (cr != null)? cr: null;
   }
 
   @Override
@@ -317,14 +231,14 @@ public class PriorityCompactionQueue imp
 
   @Override
   public void clear() {
-    regionsInQueue.clear();
+    storesInQueue.clear();
     queue.clear();
   }
 
   // Unimplemented methods, collection methods
 
   @Override
-  public Iterator<HRegion> iterator() {
+  public Iterator<CompactionRequest> iterator() {
     throw new UnsupportedOperationException("Not supported.");
   }
 
@@ -344,7 +258,7 @@ public class PriorityCompactionQueue imp
   }
 
   @Override
-  public boolean addAll(Collection<? extends HRegion> c) {
+  public boolean addAll(Collection<? extends CompactionRequest> c) {
     throw new UnsupportedOperationException("Not supported.");
   }
 
@@ -359,12 +273,12 @@ public class PriorityCompactionQueue imp
   }
 
   @Override
-  public int drainTo(Collection<? super HRegion> c) {
+  public int drainTo(Collection<? super CompactionRequest> c) {
     throw new UnsupportedOperationException("Not supported.");
   }
 
   @Override
-  public int drainTo(Collection<? super HRegion> c, int maxElements) {
+  public int drainTo(Collection<? super CompactionRequest> c, int maxElements) {
     throw new UnsupportedOperationException("Not supported.");
   }
 }
\ No newline at end of file

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1181527&r1=1181526&r2=1181527&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue
Oct 11 02:16:50 2011
@@ -1456,7 +1456,7 @@ public class Store implements HeapSize {
   /**
    * @return The priority that this store should have in the compaction queue
    */
-  int getCompactPriority() {
+  public int getCompactPriority() {
     return this.blockingStoreFileCount - this.storefiles.size();
   }
 

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1181527&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
(added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
Tue Oct 11 02:16:50 2011
@@ -0,0 +1,102 @@
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.util.Date;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Store;
+
+  /**
+   * This class represents a compaction request and holds the region, priority,
+   * and time submitted.
+   */
+  public class CompactionRequest implements Comparable<CompactionRequest> {
+    static final Log LOG = LogFactory.getLog(CompactionRequest.class);
+    private final HRegion r;
+    private final Store s;
+    private int p;
+    private final Date date;
+
+    public CompactionRequest(HRegion r, Store s) {
+      this(r, s, s.getCompactPriority());
+    }
+
+    public CompactionRequest(HRegion r, Store s, int p) {
+      this(r, s, p, null);
+    }
+
+    public CompactionRequest(HRegion r, Store s, int p, Date d) {
+      if (r == null) {
+        throw new NullPointerException("HRegion cannot be null");
+      }
+
+      if (d == null) {
+        d = new Date();
+      }
+
+      this.r = r;
+      this.s = s;
+      this.p = p;
+      this.date = d;
+    }
+
+    /**
+     * This function will define where in the priority queue the request will
+     * end up.  Those with the highest priorities will be first.  When the
+     * priorities are the same it will It will first compare priority then date
+     * to maintain a FIFO functionality.
+     *
+     * <p>Note: The date is only accurate to the millisecond which means it is
+     * possible that two requests were inserted into the queue within a
+     * millisecond.  When that is the case this function will break the tie
+     * arbitrarily.
+     */
+    @Override
+    public int compareTo(CompactionRequest request) {
+      //NOTE: The head of the priority queue is the least element
+      if (this.equals(request)) {
+        return 0; //they are the same request
+      }
+      int compareVal;
+
+      compareVal = p - request.p; //compare priority
+      if (compareVal != 0) {
+        return compareVal;
+      }
+
+      compareVal = date.compareTo(request.date);
+      if (compareVal != 0) {
+        return compareVal;
+      }
+
+      //break the tie arbitrarily
+      return -1;
+    }
+
+    /** Gets the HRegion for the request */
+    public HRegion getHRegion() {
+      return r;
+    }
+
+    /** Gets the Store for the request */
+    public Store getStore() {
+      return s;
+    }
+
+    /** Gets the priority for the request */
+    public int getPriority() {
+      return p;
+    }
+
+    /** Gets the priority for the request */
+    public void setPriority(int p) {
+      this.p = p;
+    }
+
+    public String toString() {
+      return "regionName=" + r.getRegionNameAsString() +
+        "storeName = " + new String(s.getFamily().getName()) +
+        ", priority=" + p + ", date=" + date;
+    }
+  }

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java?rev=1181527&r1=1181526&r2=1181527&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java
(original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java
Tue Oct 11 02:16:50 2011
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionse
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -68,7 +69,7 @@ public class TestPriorityCompactionQueue
 
   protected void getAndCheckRegion(PriorityCompactionQueue pq,
       HRegion checkRegion) {
-    HRegion r = pq.remove();
+    HRegion r = pq.remove().getHRegion();
     if (r != checkRegion) {
       Assert.assertTrue("Didn't get expected " + checkRegion + " got " + r, r
           .equals(checkRegion));
@@ -76,7 +77,7 @@ public class TestPriorityCompactionQueue
   }
 
   protected void addRegion(PriorityCompactionQueue pq, HRegion r, int p) {
-    pq.add(r, p);
+    pq.add(new CompactionRequest(r, null, p));
     try {
       // Sleep 10 millisecond so 2 things are not put in the queue within the
       // same millisecond. The queue breaks ties arbitrarily between two



Mime
View raw message