hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [1/2] hbase git commit: HBASE-8329 Limit compaction speed
Date Tue, 03 Feb 2015 06:19:18 GMT
Repository: hbase
Updated Branches:
  refs/heads/master da30c72b7 -> eb351b9ff


http://git-wip-us.apache.org/repos/asf/hbase/blob/eb351b9f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
index ed8b819..86d670c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
@@ -121,7 +122,8 @@ public class TestStripeCompactor {
     StoreFileWritersCapture writers = new StoreFileWritersCapture();
     StripeCompactor sc = createCompactor(writers, input);
     List<Path> paths =
-        sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo);
+        sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo,
+          NoLimitCompactionThroughputController.INSTANCE);
     writers.verifyKvs(output, allFiles, true);
     if (allFiles) {
       assertEquals(output.length, paths.size());
@@ -156,8 +158,9 @@ public class TestStripeCompactor {
       byte[] left, byte[] right, KeyValue[][] output) throws Exception {
     StoreFileWritersCapture writers = new StoreFileWritersCapture();
     StripeCompactor sc = createCompactor(writers, input);
-    List<Path> paths = sc.compact(
-        createDummyRequest(), targetCount, targetSize, left, right, null, null);
+    List<Path> paths =
+        sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null, null,
+          NoLimitCompactionThroughputController.INSTANCE);
     assertEquals(output.length, paths.size());
     writers.verifyKvs(output, true, true);
     List<byte[]> boundaries = new ArrayList<byte[]>();

http://git-wip-us.apache.org/repos/asf/hbase/blob/eb351b9f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
index d8cdc90..32ab164 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
@@ -17,8 +17,16 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -27,12 +35,14 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
-import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -63,9 +73,10 @@ public class TestStripeStoreEngine {
     TestStoreEngine se = createEngine(conf);
     StripeCompactor mockCompactor = mock(StripeCompactor.class);
     se.setCompactorOverride(mockCompactor);
-    when(mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(),
-        any(byte[].class), any(byte[].class), any(byte[].class), any(byte[].class)))
-        .thenReturn(new ArrayList<Path>());
+    when(
+      mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(), any(byte[].class),
+        any(byte[].class), any(byte[].class), any(byte[].class),
+        any(CompactionThroughputController.class))).thenReturn(new ArrayList<Path>());
 
     // Produce 3 L0 files.
     StoreFile sf = createFile();
@@ -83,9 +94,10 @@ public class TestStripeStoreEngine {
     assertEquals(2, compaction.getRequest().getFiles().size());
     assertFalse(compaction.getRequest().getFiles().contains(sf));
     // Make sure the correct method it called on compactor.
-    compaction.compact();
+    compaction.compact(NoLimitCompactionThroughputController.INSTANCE);
     verify(mockCompactor, times(1)).compact(compaction.getRequest(), targetCount, 0L,
-          StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null);
+      StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null,
+      NoLimitCompactionThroughputController.INSTANCE);
   }
 
   private static StoreFile createFile() throws Exception {

http://git-wip-us.apache.org/repos/asf/hbase/blob/eb351b9f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java
new file mode 100644
index 0000000..8d0d5a8
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactionWithThroughputController.java
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreEngine;
+import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
+import org.apache.hadoop.hbase.regionserver.StripeStoreEngine;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestCompactionWithThroughputController {
+
+  private static final Log LOG = LogFactory.getLog(TestCompactionWithThroughputController.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static final double EPSILON = 1E-6;
+
+  private final TableName tableName = TableName.valueOf(getClass().getSimpleName());
+
+  private final byte[] family = Bytes.toBytes("f");
+
+  private final byte[] qualifier = Bytes.toBytes("q");
+
+  private Store getStoreWithName(TableName tableName) {
+    MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
+    List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
+    for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
+      HRegionServer hrs = rsts.get(i).getRegionServer();
+      for (HRegion region : hrs.getOnlineRegions(tableName)) {
+        return region.getStores().values().iterator().next();
+      }
+    }
+    return null;
+  }
+
+  private Store prepareData() throws IOException {
+    HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
+    if (admin.tableExists(tableName)) {
+      admin.disableTable(tableName);
+      admin.deleteTable(tableName);
+    }
+    HTable table = TEST_UTIL.createTable(tableName, family);
+    Random rand = new Random();
+    for (int i = 0; i < 10; i++) {
+      for (int j = 0; j < 10; j++) {
+        byte[] value = new byte[128 * 1024];
+        rand.nextBytes(value);
+        table.put(new Put(Bytes.toBytes(i * 10 + j)).add(family, qualifier, value));
+      }
+      admin.flush(tableName);
+    }
+    return getStoreWithName(tableName);
+  }
+
+  private long testCompactionWithThroughputLimit() throws Exception {
+    long throughputLimit = 1024L * 1024;
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
+    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
+    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
+    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
+    conf.setLong(
+      PressureAwareCompactionThroughputController
+        .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
+      throughputLimit);
+    conf.setLong(
+      PressureAwareCompactionThroughputController
+        .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
+      throughputLimit);
+    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
+      PressureAwareCompactionThroughputController.class.getName());
+    TEST_UTIL.startMiniCluster(1);
+    try {
+      Store store = prepareData();
+      assertEquals(10, store.getStorefilesCount());
+      long startTime = System.currentTimeMillis();
+      TEST_UTIL.getHBaseAdmin().majorCompact(tableName);
+      Thread.sleep(5000);
+      assertEquals(10, store.getStorefilesCount());
+      while (store.getStorefilesCount() != 1) {
+        Thread.sleep(20);
+      }
+      long duration = System.currentTimeMillis() - startTime;
+      double throughput = (double) store.getStorefilesSize() / duration * 1000;
+      // confirm that the speed limit work properly(not too fast, and also not too slow)
+      // 20% is the max acceptable error rate.
+      assertTrue(throughput < throughputLimit * 1.2);
+      assertTrue(throughput > throughputLimit * 0.8);
+      return System.currentTimeMillis() - startTime;
+    } finally {
+      TEST_UTIL.shutdownMiniCluster();
+    }
+  }
+
+  private long testCompactionWithoutThroughputLimit() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
+    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
+    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 200);
+    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
+    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
+      NoLimitCompactionThroughputController.class.getName());
+    TEST_UTIL.startMiniCluster(1);
+    try {
+      Store store = prepareData();
+      assertEquals(10, store.getStorefilesCount());
+      long startTime = System.currentTimeMillis();
+      TEST_UTIL.getHBaseAdmin().majorCompact(tableName);
+      while (store.getStorefilesCount() != 1) {
+        Thread.sleep(20);
+      }
+      return System.currentTimeMillis() - startTime;
+    } finally {
+      TEST_UTIL.shutdownMiniCluster();
+    }
+  }
+
+  @Test
+  public void testCompaction() throws Exception {
+    long limitTime = testCompactionWithThroughputLimit();
+    long noLimitTime = testCompactionWithoutThroughputLimit();
+    LOG.info("With 1M/s limit, compaction use " + limitTime + "ms; without limit, compaction
use "
+        + noLimitTime + "ms");
+    // usually the throughput of a compaction without limitation is about 40MB/sec at least,
so this
+    // is a very weak assumption.
+    assertTrue(limitTime > noLimitTime * 2);
+  }
+
+  /**
+   * Test the tuning task of {@link PressureAwareCompactionThroughputController}
+   */
+  @Test
+  public void testThroughputTuning() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName());
+    conf.setLong(
+      PressureAwareCompactionThroughputController
+        .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
+      20L * 1024 * 1024);
+    conf.setLong(
+      PressureAwareCompactionThroughputController
+        .HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
+      10L * 1024 * 1024);
+    conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 4);
+    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 6);
+    conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
+      PressureAwareCompactionThroughputController.class.getName());
+    conf.setInt(
+      PressureAwareCompactionThroughputController.HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD,
+      1000);
+    TEST_UTIL.startMiniCluster(1);
+    Connection conn = ConnectionFactory.createConnection(conf);
+    try {
+      HTableDescriptor htd = new HTableDescriptor(tableName);
+      htd.addFamily(new HColumnDescriptor(family));
+      htd.setCompactionEnabled(false);
+      TEST_UTIL.getHBaseAdmin().createTable(htd);
+      TEST_UTIL.waitTableAvailable(tableName);
+      HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
+      PressureAwareCompactionThroughputController throughputController =
+          (PressureAwareCompactionThroughputController) regionServer.compactSplitThread
+              .getCompactionThroughputController();
+      assertEquals(10L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
+      Table table = conn.getTable(tableName);
+      for (int i = 0; i < 5; i++) {
+        table.put(new Put(Bytes.toBytes(i)).add(family, qualifier, new byte[0]));
+        TEST_UTIL.flush(tableName);
+      }
+      Thread.sleep(2000);
+      assertEquals(15L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
+
+      table.put(new Put(Bytes.toBytes(5)).add(family, qualifier, new byte[0]));
+      TEST_UTIL.flush(tableName);
+      Thread.sleep(2000);
+      assertEquals(20L * 1024 * 1024, throughputController.maxThroughput, EPSILON);
+
+      table.put(new Put(Bytes.toBytes(6)).add(family, qualifier, new byte[0]));
+      TEST_UTIL.flush(tableName);
+      Thread.sleep(2000);
+      assertEquals(Double.MAX_VALUE, throughputController.maxThroughput, EPSILON);
+
+      conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
+        NoLimitCompactionThroughputController.class.getName());
+      regionServer.compactSplitThread.onConfigurationChange(conf);
+      assertTrue(throughputController.isStopped());
+      assertTrue(regionServer.compactSplitThread.getCompactionThroughputController()
+        instanceof NoLimitCompactionThroughputController);
+    } finally {
+      conn.close();
+      TEST_UTIL.shutdownMiniCluster();
+    }
+  }
+
+  /**
+   * Test the logic that we calculate compaction pressure for a striped store.
+   */
+  @Test
+  public void testGetCompactionPressureForStripedStore() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, StripeStoreEngine.class.getName());
+    conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, false);
+    conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, 2);
+    conf.setInt(StripeStoreConfig.MIN_FILES_KEY, 4);
+    conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 12);
+    TEST_UTIL.startMiniCluster(1);
+    Connection conn = ConnectionFactory.createConnection(conf);
+    try {
+      HTableDescriptor htd = new HTableDescriptor(tableName);
+      htd.addFamily(new HColumnDescriptor(family));
+      htd.setCompactionEnabled(false);
+      TEST_UTIL.getHBaseAdmin().createTable(htd);
+      TEST_UTIL.waitTableAvailable(tableName);
+      HStore store = (HStore) getStoreWithName(tableName);
+      assertEquals(0, store.getStorefilesCount());
+      assertEquals(0.0, store.getCompactionPressure(), EPSILON);
+      Table table = conn.getTable(tableName);
+      for (int i = 0; i < 4; i++) {
+        table.put(new Put(Bytes.toBytes(i)).add(family, qualifier, new byte[0]));
+        table.put(new Put(Bytes.toBytes(100 + i)).add(family, qualifier, new byte[0]));
+        TEST_UTIL.flush(tableName);
+      }
+      assertEquals(8, store.getStorefilesCount());
+      assertEquals(0.0, store.getCompactionPressure(), EPSILON);
+
+      table.put(new Put(Bytes.toBytes(4)).add(family, qualifier, new byte[0]));
+      table.put(new Put(Bytes.toBytes(104)).add(family, qualifier, new byte[0]));
+      TEST_UTIL.flush(tableName);
+      assertEquals(10, store.getStorefilesCount());
+      assertEquals(0.5, store.getCompactionPressure(), EPSILON);
+
+      table.put(new Put(Bytes.toBytes(5)).add(family, qualifier, new byte[0]));
+      table.put(new Put(Bytes.toBytes(105)).add(family, qualifier, new byte[0]));
+      TEST_UTIL.flush(tableName);
+      assertEquals(12, store.getStorefilesCount());
+      assertEquals(1.0, store.getCompactionPressure(), EPSILON);
+
+      table.put(new Put(Bytes.toBytes(6)).add(family, qualifier, new byte[0]));
+      table.put(new Put(Bytes.toBytes(106)).add(family, qualifier, new byte[0]));
+      TEST_UTIL.flush(tableName);
+      assertEquals(14, store.getStorefilesCount());
+      assertEquals(2.0, store.getCompactionPressure(), EPSILON);
+    } finally {
+      conn.close();
+      TEST_UTIL.shutdownMiniCluster();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/eb351b9f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
index 0685568..f3b7be4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -211,9 +212,10 @@ public class TestStripeCompactionPolicy {
     assertTrue(policy.needsCompactions(si, al()));
     StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(),
false);
     assertEquals(si.getStorefiles(), scr.getRequest().getFiles());
-    scr.execute(sc);
-    verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(),
-        aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY));
+    scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE);
+    verify(sc, only()).compact(eq(scr.getRequest()), anyInt(), anyLong(), aryEq(OPEN_KEY),
+      aryEq(OPEN_KEY), aryEq(OPEN_KEY), aryEq(OPEN_KEY),
+      any(NoLimitCompactionThroughputController.class));
   }
 
   @Test
@@ -453,7 +455,7 @@ public class TestStripeCompactionPolicy {
     // All the Stripes are expired, so the Compactor will not create any Writers. We need
to create
     // an empty file to preserve metadata
     StripeCompactor sc = createCompactor();
-    List<Path> paths = scr.execute(sc);
+    List<Path> paths = scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE);
     assertEquals(1, paths.size());
   }
 
@@ -512,22 +514,21 @@ public class TestStripeCompactionPolicy {
     assertTrue(policy.needsCompactions(si, al()));
     StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(),
false);
     verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
-    scr.execute(sc);
-    verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(
-        new ArgumentMatcher<List<byte[]>>() {
-          @Override
-          public boolean matches(Object argument) {
-            @SuppressWarnings("unchecked")
-            List<byte[]> other = (List<byte[]>)argument;
-            if (other.size() != boundaries.size()) return false;
-            for (int i = 0; i < other.size(); ++i) {
-              if (!Bytes.equals(other.get(i), boundaries.get(i))) return false;
-            }
-            return true;
-          }
-        }),
-        dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom),
-        dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo));
+    scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE);
+    verify(sc, times(1)).compact(eq(scr.getRequest()), argThat(new ArgumentMatcher<List<byte[]>>()
{
+      @Override
+      public boolean matches(Object argument) {
+        @SuppressWarnings("unchecked")
+        List<byte[]> other = (List<byte[]>) argument;
+        if (other.size() != boundaries.size()) return false;
+        for (int i = 0; i < other.size(); ++i) {
+          if (!Bytes.equals(other.get(i), boundaries.get(i))) return false;
+        }
+        return true;
+      }
+    }), dropDeletesFrom == null ? isNull(byte[].class) : aryEq(dropDeletesFrom),
+      dropDeletesTo == null ? isNull(byte[].class) : aryEq(dropDeletesTo),
+      any(NoLimitCompactionThroughputController.class));
   }
 
   /**
@@ -548,11 +549,12 @@ public class TestStripeCompactionPolicy {
     assertTrue(!needsCompaction || policy.needsCompactions(si, al()));
     StripeCompactionPolicy.StripeCompactionRequest scr = policy.selectCompaction(si, al(),
false);
     verifyCollectionsEqual(sfs, scr.getRequest().getFiles());
-    scr.execute(sc);
+    scr.execute(sc, NoLimitCompactionThroughputController.INSTANCE);
     verify(sc, times(1)).compact(eq(scr.getRequest()),
-        count == null ? anyInt() : eq(count.intValue()),
-        size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end),
-        dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end));
+      count == null ? anyInt() : eq(count.intValue()),
+      size == null ? anyLong() : eq(size.longValue()), aryEq(start), aryEq(end),
+      dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end),
+      any(NoLimitCompactionThroughputController.class));
   }
 
   /** Verify arbitrary flush. */
@@ -738,7 +740,10 @@ public class TestStripeCompactionPolicy {
     HColumnDescriptor col = new HColumnDescriptor(Bytes.toBytes("foo"));
     StoreFileWritersCapture writers = new StoreFileWritersCapture();
     Store store = mock(Store.class);
+    HRegionInfo info = mock(HRegionInfo.class);
+    when(info.getRegionNameAsString()).thenReturn("testRegion");
     when(store.getFamily()).thenReturn(col);
+    when(store.getRegionInfo()).thenReturn(info);
     when(
       store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(),
         anyBoolean(), anyBoolean())).thenAnswer(writers);


Mime
View raw message