hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject svn commit: r1494347 - in /hbase/branches/0.95/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/
Date Tue, 18 Jun 2013 22:30:28 GMT
Author: sershe
Date: Tue Jun 18 22:30:27 2013
New Revision: 1494347

URL: http://svn.apache.org/r1494347
Log:
HBASE-8665 bad compaction priority behavior in queue can cause store to be blocked

Added:
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java
Modified:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1494347&r1=1494346&r2=1494347&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
Tue Jun 18 22:30:27 2013
@@ -152,7 +152,7 @@ public class CompactSplitThread implemen
       queueLists.append("    "+it.next().toString());
       queueLists.append("\n");
     }
-    
+
     if( smallCompactions != null ){
       queueLists.append("\n");
       queueLists.append("  SmallCompation Queue:\n");
@@ -248,14 +248,21 @@ public class CompactSplitThread implemen
   @Override
   public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final
String why,
       int p, List<Pair<CompactionRequest, Store>> requests) throws IOException
{
+    return requestCompactionInternal(r, why, p, requests, true);
+  }
+
+  private List<CompactionRequest> requestCompactionInternal(final HRegion r, final
String why,
+      int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow)
throws IOException {
     // not a special compaction request, so make our own list
-    List<CompactionRequest> ret;
+    List<CompactionRequest> ret = null;
     if (requests == null) {
-      ret = new ArrayList<CompactionRequest>(r.getStores().size());
+      ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
       for (Store s : r.getStores().values()) {
-        ret.add(requestCompaction(r, s, why, p, null));
+        CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow);
+        if (selectNow) ret.add(cr);
       }
     } else {
+      Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
       ret = new ArrayList<CompactionRequest>(requests.size());
       for (Pair<CompactionRequest, Store> pair : requests) {
         ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst()));
@@ -264,6 +271,21 @@ public class CompactSplitThread implemen
     return ret;
   }
 
+  public CompactionRequest requestCompaction(final HRegion r, final Store s,
+      final String why, int priority, CompactionRequest request) throws IOException {
+    return requestCompactionInternal(r, s, why, priority, request, true);
+  }
+
+  public synchronized void requestSystemCompaction(
+      final HRegion r, final String why) throws IOException {
+    requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false);
+  }
+
+  public void requestSystemCompaction(
+      final HRegion r, final Store s, final String why) throws IOException {
+    requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false);
+  }
+
   /**
    * @param r HRegion store belongs to
    * @param s Store to request compaction on
@@ -272,34 +294,48 @@ public class CompactSplitThread implemen
    * @param request custom compaction request. Can be <tt>null</tt> in which
case a simple
    *          compaction will be used.
    */
-  public synchronized CompactionRequest requestCompaction(final HRegion r, final Store s,
-      final String why, int priority, CompactionRequest request) throws IOException {
+  private synchronized CompactionRequest requestCompactionInternal(final HRegion r, final
Store s,
+      final String why, int priority, CompactionRequest request, boolean selectNow)
+          throws IOException {
     if (this.server.isStopped()) {
       return null;
     }
+
+    CompactionContext compaction = null;
+    if (selectNow) {
+      compaction = selectCompaction(r, s, priority, request);
+      if (compaction == null) return null; // message logged inside
+    }
+
+    // We assume that most compactions are small. So, put system compactions into small
+    // pool; we will do selection there, and move to large pool if necessary.
+    long size = selectNow ? compaction.getRequest().getSize() : 0;
+    ThreadPoolExecutor pool = (!selectNow && s.throttleCompaction(size))
+      ? largeCompactions : smallCompactions;
+    pool.execute(new CompactionRunner(s, r, compaction, pool));
+    if (LOG.isDebugEnabled()) {
+      String type = (pool == smallCompactions) ? "Small " : "Large ";
+      LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
+          + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
+    }
+    return selectNow ? compaction.getRequest() : null;
+  }
+
+  private CompactionContext selectCompaction(final HRegion r, final Store s,
+      int priority, CompactionRequest request) throws IOException {
     CompactionContext compaction = s.requestCompaction(priority, request);
     if (compaction == null) {
       if(LOG.isDebugEnabled()) {
-        LOG.debug("Not compacting " + r.getRegionNameAsString() + 
+        LOG.debug("Not compacting " + r.getRegionNameAsString() +
             " because compaction request was cancelled");
       }
       return null;
     }
-
     assert compaction.hasSelection();
     if (priority != Store.NO_PRIORITY) {
       compaction.getRequest().setPriority(priority);
     }
-    ThreadPoolExecutor pool = s.throttleCompaction(compaction.getRequest().getSize())
-      ? largeCompactions : smallCompactions;
-    pool.execute(new CompactionRunner(s, r, compaction));
-    if (LOG.isDebugEnabled()) {
-      String type = (pool == smallCompactions) ? "Small " : "Large ";
-      LOG.debug(type + "Compaction requested: " + compaction
-          + (why != null && !why.isEmpty() ? "; Because: " + why : "")
-          + "; " + this);
-    }
-    return compaction.getRequest();
+    return compaction;
   }
 
   /**
@@ -358,18 +394,25 @@ public class CompactSplitThread implemen
   private class CompactionRunner implements Runnable, Comparable<CompactionRunner>
{
     private final Store store;
     private final HRegion region;
-    private final CompactionContext compaction;
+    private CompactionContext compaction;
+    private int queuedPriority;
+    private ThreadPoolExecutor parent;
 
-    public CompactionRunner(Store store, HRegion region, CompactionContext compaction) {
+    public CompactionRunner(Store store, HRegion region,
+        CompactionContext compaction, ThreadPoolExecutor parent) {
       super();
       this.store = store;
       this.region = region;
       this.compaction = compaction;
+      this.queuedPriority = (this.compaction == null)
+          ? store.getCompactPriority() : compaction.getRequest().getPriority();
+      this.parent = parent;
     }
-    
+
     @Override
     public String toString() {
-      return "Request = " + compaction.getRequest();
+      return (this.compaction != null) ? ("Request = " + compaction.getRequest())
+          : ("Store = " + store.toString() + ", pri = " + queuedPriority);
     }
 
     @Override
@@ -378,6 +421,40 @@ public class CompactSplitThread implemen
       if (server.isStopped()) {
         return;
       }
+      // Common case - system compaction without a file selection. Select now.
+      if (this.compaction == null) {
+        int oldPriority = this.queuedPriority;
+        this.queuedPriority = this.store.getCompactPriority();
+        if (this.queuedPriority > oldPriority) {
+          // Store priority decreased while we were in queue (due to some other compaction?),
+          // requeue with new priority to avoid blocking potential higher priorities.
+          this.parent.execute(this);
+          return;
+        }
+        try {
+          this.compaction = selectCompaction(this.region, this.store, queuedPriority, null);
+        } catch (IOException ex) {
+          LOG.error("Compaction selection failed " + this, ex);
+          server.checkFileSystem();
+          return;
+        }
+        if (this.compaction == null) return; // nothing to do
+        // Now see if we are in correct pool for the size; if not, go to the correct one.
+        // We might end up waiting for a while, so cancel the selection.
+        assert this.compaction.hasSelection();
+        ThreadPoolExecutor pool = store.throttleCompaction(
+            compaction.getRequest().getSize()) ? largeCompactions : smallCompactions;
+        if (this.parent != pool) {
+          this.store.cancelRequestedCompaction(this.compaction);
+          this.compaction = null;
+          this.parent = pool;
+          this.parent.execute(this);
+          return;
+        }
+      }
+      // Finally we can compact something.
+      assert this.compaction != null;
+
       this.compaction.getRequest().beforeExecute();
       try {
         // Note: please don't put single-compaction logic here;
@@ -390,7 +467,7 @@ public class CompactSplitThread implemen
         if (completed) {
           // degenerate case: blocked regions require recursive enqueues
           if (store.getCompactPriority() <= 0) {
-            requestCompaction(region, store, "Recursive enqueue", null);
+            requestSystemCompaction(region, store, "Recursive enqueue");
           } else {
             // see if the compaction has caused us to exceed max region size
             requestSplit(region);
@@ -422,8 +499,13 @@ public class CompactSplitThread implemen
 
     @Override
     public int compareTo(CompactionRunner o) {
-      // Only compare the underlying request, for queue sorting purposes.
-      return this.compaction.getRequest().compareTo(o.compaction.getRequest());
+      // Only compare the underlying request (if any), for queue sorting purposes.
+      int compareVal = queuedPriority - o.queuedPriority; // compare priority
+      if (compareVal != 0) return compareVal;
+      CompactionContext tc = this.compaction, oc = o.compaction;
+      // Sort pre-selected (user?) compactions before system ones with equal priority.
+      return (tc == null) ? ((oc == null) ? 0 : 1)
+          : ((oc == null) ? -1 : tc.getRequest().compareTo(oc.getRequest()));
     }
   }
 

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1494347&r1=1494346&r2=1494347&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Tue Jun 18 22:30:27 2013
@@ -1330,8 +1330,8 @@ public class HRegionServer implements Cl
             if (iteration % multiplier != 0) continue;
             if (s.needsCompaction()) {
               // Queue a compaction. Will recognize if major is needed.
-              this.instance.compactSplitThread.requestCompaction(r, s, getName()
-                  + " requests compaction", null);
+              this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
+                  + " requests compaction");
             } else if (s.isMajorCompaction()) {
               if (majorCompactPriority == DEFAULT_PRIORITY
                   || majorCompactPriority > r.getCompactPriority()) {
@@ -1690,7 +1690,7 @@ public class HRegionServer implements Cl
     // Do checks to see if we need to compact (references or too many files)
     for (Store s : r.getStores().values()) {
       if (s.hasReferences() || s.needsCompaction()) {
-        getCompactionRequester().requestCompaction(r, s, "Opening Region", null);
+       this.compactSplitThread.requestSystemCompaction(r, s, "Opening Region");
       }
     }
     long openSeqNum = r.getOpenSeqNum();
@@ -3576,7 +3576,7 @@ public class HRegionServer implements Cl
       if (shouldFlush) {
         boolean result = region.flushcache();
         if (result) {
-          this.compactSplitThread.requestCompaction(region,
+          this.compactSplitThread.requestSystemCompaction(region,
               "Compaction through user triggered flush");
         }
         builder.setFlushed(result);

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1494347&r1=1494346&r2=1494347&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
(original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
Tue Jun 18 22:30:27 2013
@@ -408,8 +408,8 @@ class MemStoreFlusher implements FlushRe
             "store files; delaying flush up to " + this.blockingWaitTime + "ms");
           if (!this.server.compactSplitThread.requestSplit(region)) {
             try {
-              this.server.compactSplitThread.requestCompaction(region, Thread
-                  .currentThread().getName());
+              this.server.compactSplitThread.requestSystemCompaction(
+                  region, Thread.currentThread().getName());
             } catch (IOException e) {
               LOG.error(
                 "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()),
@@ -457,7 +457,8 @@ class MemStoreFlusher implements FlushRe
       if (shouldSplit) {
         this.server.compactSplitThread.requestSplit(region);
       } else if (shouldCompact) {
-        server.compactSplitThread.requestCompaction(region, Thread.currentThread().getName());
+        server.compactSplitThread.requestSystemCompaction(
+            region, Thread.currentThread().getName());
       }
 
     } catch (DroppedSnapshotException ex) {

Added: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java?rev=1494347&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java
(added)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/StatefulStoreMockMaker.java
Tue Jun 18 22:30:27 2013
@@ -0,0 +1,64 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * This class is a helper that allows to create a partially-implemented, stateful mocks of
+ * Store. It contains a bunch of blank methods, and answers redirecting to these.
+ */
+public class StatefulStoreMockMaker {
+  // Add and expand the methods and answers as needed.
+  public CompactionContext selectCompaction() { return null; }
+  public void cancelCompaction(Object originalContext) {}
+  public int getPriority() { return 0; }
+
+  private class SelectAnswer implements Answer<CompactionContext> {
+    public CompactionContext answer(InvocationOnMock invocation) throws Throwable {
+      return selectCompaction();
+    }
+  }
+  private class PriorityAnswer implements Answer<Integer> {
+    public Integer answer(InvocationOnMock invocation) throws Throwable {
+      return getPriority();
+    }
+  }
+  private class CancelAnswer implements Answer<Object> {
+    public CompactionContext answer(InvocationOnMock invocation) throws Throwable {
+      cancelCompaction(invocation.getArguments()[0]); return null;
+    }
+  }
+
+  public Store createStoreMock(String name) throws Exception {
+    Store store = mock(Store.class, name);
+    when(store.requestCompaction(
+        anyInt(), isNull(CompactionRequest.class))).then(new SelectAnswer());
+    when(store.getCompactPriority()).then(new PriorityAnswer());
+    doAnswer(new CancelAnswer()).when(
+        store).cancelRequestedCompaction(any(CompactionContext.class));
+    return store;
+  }
+}

Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1494347&r1=1494346&r2=1494347&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
(original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
Tue Jun 18 22:30:27 2013
@@ -18,8 +18,9 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.spy;
+import static org.mockito.AdditionalMatchers.aryEq;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -33,10 +34,12 @@ import java.util.concurrent.CountDownLat
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestCase;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
@@ -53,6 +56,7 @@ import org.apache.hadoop.hbase.io.encodi
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
@@ -60,6 +64,8 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.Assume;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -730,6 +736,227 @@ public class TestCompaction extends HBas
     thread.interruptIfNecessary();
   }
 
+  private class StoreMockMaker extends StatefulStoreMockMaker {
+    public ArrayList<StoreFile> compacting = new ArrayList<StoreFile>();
+    public ArrayList<StoreFile> notCompacting = new ArrayList<StoreFile>();
+    private ArrayList<Integer> results;
+
+    public StoreMockMaker(ArrayList<Integer> results) {
+      this.results = results;
+    }
+
+    public class TestCompactionContext extends CompactionContext {
+      private List<StoreFile> selectedFiles;
+      public TestCompactionContext(List<StoreFile> selectedFiles) {
+        super();
+        this.selectedFiles = selectedFiles;
+      }
+
+      @Override
+      public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
+        return new ArrayList<StoreFile>();
+      }
+
+      @Override
+      public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction,
+          boolean mayUseOffPeak, boolean forceMajor) throws IOException {
+        this.request = new CompactionRequest(selectedFiles);
+        this.request.setPriority(getPriority());
+        return true;
+      }
+
+      @Override
+      public List<Path> compact() throws IOException {
+        finishCompaction(this.selectedFiles);
+        return new ArrayList<Path>();
+      }
+    }
+
+    @Override
+    public synchronized CompactionContext selectCompaction() {
+      CompactionContext ctx = new TestCompactionContext(new ArrayList<StoreFile>(notCompacting));
+      compacting.addAll(notCompacting);
+      notCompacting.clear();
+      try {
+        ctx.select(null, false, false, false);
+      } catch (IOException ex) {
+        fail("Shouldn't happen");
+      }
+      return ctx;
+    }
+
+    @Override
+    public synchronized void cancelCompaction(Object object) {
+      TestCompactionContext ctx = (TestCompactionContext)object;
+      compacting.removeAll(ctx.selectedFiles);
+      notCompacting.addAll(ctx.selectedFiles);
+    }
+
+    public synchronized void finishCompaction(List<StoreFile> sfs) {
+      if (sfs.isEmpty()) return;
+      synchronized (results) {
+        results.add(sfs.size());
+      }
+      compacting.removeAll(sfs);
+    }
+
+    @Override
+    public int getPriority() {
+      return 7 - compacting.size() - notCompacting.size();
+    }
+  }
+
+  public class BlockingStoreMockMaker extends StatefulStoreMockMaker {
+    BlockingCompactionContext blocked = null;
+
+    public class BlockingCompactionContext extends CompactionContext {
+      public volatile boolean isInCompact = false;
+
+      public void unblock() {
+        synchronized (this) { this.notifyAll(); }
+      }
+
+      @Override
+      public List<Path> compact() throws IOException {
+        try {
+          isInCompact = true;
+          synchronized (this) { this.wait(); }
+        } catch (InterruptedException e) {
+           Assume.assumeNoException(e);
+        }
+        return new ArrayList<Path>();
+      }
+
+      @Override
+      public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
+        return new ArrayList<StoreFile>();
+      }
+
+      @Override
+      public boolean select(List<StoreFile> f, boolean i, boolean m, boolean e)
+          throws IOException {
+        this.request = new CompactionRequest(new ArrayList<StoreFile>());
+        return true;
+      }
+    }
+
+    @Override
+    public CompactionContext selectCompaction() {
+      this.blocked = new BlockingCompactionContext();
+      try {
+        this.blocked.select(null, false, false, false);
+      } catch (IOException ex) {
+        fail("Shouldn't happen");
+      }
+      return this.blocked;
+    }
+
+    @Override
+    public void cancelCompaction(Object object) {}
+
+    public int getPriority() {
+      return Integer.MIN_VALUE; // some invalid value, see createStoreMock
+    }
+
+    public BlockingCompactionContext waitForBlocking() {
+      while (this.blocked == null || !this.blocked.isInCompact) {
+        Threads.sleepWithoutInterrupt(50);
+      }
+      BlockingCompactionContext ctx = this.blocked;
+      this.blocked = null;
+      return ctx;
+    }
+
+    @Override
+    public Store createStoreMock(String name) throws Exception {
+      return createStoreMock(Integer.MIN_VALUE, name);
+    }
+
+    public Store createStoreMock(int priority, String name) throws Exception {
+      // Override the mock to always return the specified priority.
+      Store s = super.createStoreMock(name);
+      when(s.getCompactPriority()).thenReturn(priority);
+      return s;
+    }
+  }
+
+  /** Test compaction priority management and multiple compactions per store (HBASE-8665).
*/
+  public void testCompactionQueuePriorities() throws Exception {
+    // Setup a compact/split thread on a mock server.
+    final Configuration conf = HBaseConfiguration.create();
+    HRegionServer mockServer = mock(HRegionServer.class);
+    when(mockServer.isStopped()).thenReturn(false);
+    when(mockServer.getConfiguration()).thenReturn(conf);
+    CompactSplitThread cst = new CompactSplitThread(mockServer);
+    when(mockServer.getCompactSplitThread()).thenReturn(cst);
+
+    // Set up the region mock that redirects compactions.
+    HRegion r = mock(HRegion.class);
+    when(r.compact(any(CompactionContext.class), any(Store.class))).then(new Answer<Boolean>()
{
+      public Boolean answer(InvocationOnMock invocation) throws Throwable {
+        ((CompactionContext)invocation.getArguments()[0]).compact();
+        return true;
+      }
+    });
+
+    // Set up store mocks for 2 "real" stores and the one we use for blocking CST.
+    ArrayList<Integer> results = new ArrayList<Integer>();
+    StoreMockMaker sm = new StoreMockMaker(results), sm2 = new StoreMockMaker(results);
+    Store store = sm.createStoreMock("store1"), store2 = sm2.createStoreMock("store2");
+    BlockingStoreMockMaker blocker = new BlockingStoreMockMaker();
+
+    // First, block the compaction thread so that we could muck with queue.
+    cst.requestSystemCompaction(r, blocker.createStoreMock(1, "b-pri1"), "b-pri1");
+    BlockingStoreMockMaker.BlockingCompactionContext currentBlock = blocker.waitForBlocking();
+
+    // Add 4 files to store1, 3 to store2, and queue compactions; pri 3 and 4 respectively.
+    for (int i = 0; i < 4; ++i) {
+      sm.notCompacting.add(createFile());
+    }
+    cst.requestSystemCompaction(r, store, "s1-pri3");
+    for (int i = 0; i < 3; ++i) {
+      sm2.notCompacting.add(createFile());
+    }
+    cst.requestSystemCompaction(r, store2, "s2-pri4");
+    // Now add 2 more files to store1 and queue compaction - pri 1.
+    for (int i = 0; i < 2; ++i) {
+      sm.notCompacting.add(createFile());
+    }
+    cst.requestSystemCompaction(r, store, "s1-pri1");
+    // Finally add blocking compaction with priority 2.
+    cst.requestSystemCompaction(r, blocker.createStoreMock(2, "b-pri2"), "b-pri2");
+
+    // Unblock the blocking compaction; we should run pri1 and become block again in pri2.
+    currentBlock.unblock();
+    currentBlock = blocker.waitForBlocking();
+    // Pri1 should have "compacted" all 6 files.
+    assertEquals(1, results.size());
+    assertEquals(6, results.get(0).intValue());
+    // Add 2 files to store 1 (it has 2 files now).
+    for (int i = 0; i < 2; ++i) {
+      sm.notCompacting.add(createFile());
+    }
+    // Now we have pri4 for store 2 in queue, and pri3 for store1; store1's current priority
+    // is 5, however, so it must not preempt store 2. Add blocking compaction at the end.
+    cst.requestSystemCompaction(r, blocker.createStoreMock(7, "b-pri7"), "b-pri7");
+    currentBlock.unblock();
+    currentBlock = blocker.waitForBlocking();
+    assertEquals(3, results.size());
+    assertEquals(3, results.get(1).intValue()); // 3 files should go before 2 files.
+    assertEquals(2, results.get(2).intValue());
+
+    currentBlock.unblock();
+    cst.interruptIfNecessary();
+  }
+
+  private static StoreFile createFile() throws Exception {
+    StoreFile sf = mock(StoreFile.class);
+    when(sf.getPath()).thenReturn(new Path("file"));
+    StoreFile.Reader r = mock(StoreFile.Reader.class);
+    when(r.length()).thenReturn(10L);
+    when(sf.getReader()).thenReturn(r);
+    return sf;
+  }
 
   /**
    * Simple {@link CompactionRequest} on which you can wait until the requested compaction
finishes.



Mime
View raw message