From commits-return-82829-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Fri Feb 1 02:27:09 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 0E431180648 for ; Fri, 1 Feb 2019 03:27:08 +0100 (CET) Received: (qmail 725 invoked by uid 500); 1 Feb 2019 02:27:08 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 716 invoked by uid 99); 1 Feb 2019 02:27:08 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Feb 2019 02:27:08 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 6F4028576D; Fri, 1 Feb 2019 02:27:07 +0000 (UTC) Date: Fri, 01 Feb 2019 02:27:07 +0000 To: "commits@hbase.apache.org" Subject: [hbase] branch branch-2.1 updated: HBASE-21764 Size of in-memory compaction thread pool shoud be configurable MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <154898802733.24499.16936290007944911357@gitbox.apache.org> From: openinx@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: hbase X-Git-Refname: refs/heads/branch-2.1 X-Git-Reftype: branch X-Git-Oldrev: ab233de2f0f4795794159ece1d970fdcf26a4025 X-Git-Newrev: 02728391926d9b960a9c64c304f37fac3b9cc7ee X-Git-Rev: 02728391926d9b960a9c64c304f37fac3b9cc7ee X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. openinx pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/hbase.git The following commit(s) were added to refs/heads/branch-2.1 by this push: new 0272839 HBASE-21764 Size of in-memory compaction thread pool shoud be configurable 0272839 is described below commit 02728391926d9b960a9c64c304f37fac3b9cc7ee Author: huzheng AuthorDate: Wed Jan 30 16:15:02 2019 +0800 HBASE-21764 Size of in-memory compaction thread pool shoud be configurable --- .../hadoop/hbase/executor/ExecutorService.java | 17 +++++++-- .../apache/hadoop/hbase/executor/ExecutorType.java | 4 +- .../hbase/regionserver/CompactingMemStore.java | 4 ++ .../apache/hadoop/hbase/regionserver/HRegion.java | 4 +- .../regionserver/RegionServicesForStores.java | 35 +++++++++--------- .../hbase/regionserver/TestCompactingMemStore.java | 9 ++++- .../hadoop/hbase/regionserver/TestHStore.java | 4 ++ .../TestRecoveredEditsReplayAndAbort.java | 1 + .../TestWalAndCompactingMemStoreFlush.java | 43 +++++++--------------- .../regionserver/wal/AbstractTestWALReplay.java | 1 + 10 files changed, 69 insertions(+), 53 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java index 342d441..20177a2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java @@ -32,10 +32,10 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.hbase.monitoring.ThreadMonitoring; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hbase.monitoring.ThreadMonitoring; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; @@ -126,13 +126,24 @@ public class ExecutorService { public void startExecutorService(final ExecutorType type, final int maxThreads) { String name = type.getExecutorName(this.servername); if (isExecutorServiceRunning(name)) { - LOG.debug("Executor service " + toString() + " already running on " + - this.servername); + LOG.debug("Executor service " + toString() + " already running on " + this.servername); return; } startExecutorService(name, maxThreads); } + /** + * Initialize the executor lazily, Note if an executor need to be initialized lazily, then all + * paths should use this method to get the executor, should not start executor by using + * {@link ExecutorService#startExecutorService(ExecutorType, int)} + */ + public ThreadPoolExecutor getExecutorLazily(ExecutorType type, int maxThreads) { + String name = type.getExecutorName(this.servername); + return executorMap + .computeIfAbsent(name, (executorName) -> new Executor(executorName, maxThreads)) + .getThreadPoolExecutor(); + } + public void submit(final EventHandler eh) { Executor executor = getExecutor(eh.getEventType().getExecutorServiceType()); if (executor == null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index 7f130d1..596385d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -47,7 +47,9 @@ public enum ExecutorType { RS_REGION_REPLICA_FLUSH_OPS (28), RS_COMPACTED_FILES_DISCHARGER (29), RS_OPEN_PRIORITY_REGION (30), - RS_REFRESH_PEER (31); + RS_REFRESH_PEER(31), + RS_SWITCH_RPC_THROTTLE(33), + RS_IN_MEMORY_COMPACTION(34); ExecutorType(int value) { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 21659bb..c1176ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -63,6 +63,10 @@ public class CompactingMemStore extends AbstractMemStore { public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY = "hbase.memstore.inmemoryflush.threshold.factor"; private static final double IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT = 0.014; + // In-Memory compaction pool size + public static final String IN_MEMORY_CONPACTION_POOL_SIZE_KEY = + "hbase.regionserver.inmemory.compaction.pool.size"; + public static final int IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT = 10; private static final Logger LOG = LoggerFactory.getLogger(CompactingMemStore.class); private HStore store; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 03f7487..32e3d12 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -295,7 +295,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Track data size in all memstores private final MemStoreSizing memStoreSizing = new ThreadSafeMemStoreSizing(); - private final RegionServicesForStores regionServicesForStores = new RegionServicesForStores(this); + @VisibleForTesting + RegionServicesForStores regionServicesForStores; // Debug possible data loss due to WAL off final LongAdder numMutationsWithoutWAL = new LongAdder(); @@ -773,6 +774,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } this.rsServices = rsServices; + this.regionServicesForStores = new RegionServicesForStores(this, rsServices); setHTableSpecificConf(); this.scannerReadPoints = new ConcurrentHashMap<>(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java index c1af9db..68d46ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java @@ -18,12 +18,10 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.wal.WAL; import org.apache.yetus.audience.InterfaceAudience; @@ -39,22 +37,18 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti @InterfaceAudience.Private public class RegionServicesForStores { - private static final int POOL_SIZE = 10; - private static final ThreadPoolExecutor INMEMORY_COMPACTION_POOL = - new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 60, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - String name = Thread.currentThread().getName() + "-inmemoryCompactions-" + - System.currentTimeMillis(); - return new Thread(r, name); - } - }); private final HRegion region; + private final RegionServerServices rsServices; + private int inMemoryPoolSize; - public RegionServicesForStores(HRegion region) { + public RegionServicesForStores(HRegion region, RegionServerServices rsServices) { this.region = region; + this.rsServices = rsServices; + if (this.rsServices != null) { + this.inMemoryPoolSize = rsServices.getConfiguration().getInt( + CompactingMemStore.IN_MEMORY_CONPACTION_POOL_SIZE_KEY, + CompactingMemStore.IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT); + } } public void blockUpdates() { @@ -78,7 +72,14 @@ public class RegionServicesForStores { return region.getWAL(); } - public ThreadPoolExecutor getInMemoryCompactionPool() { return INMEMORY_COMPACTION_POOL; } + ThreadPoolExecutor getInMemoryCompactionPool() { + if (rsServices != null) { + return rsServices.getExecutorService().getExecutorLazily(ExecutorType.RS_IN_MEMORY_COMPACTION, + inMemoryPoolSize); + } else { + return null; + } + } public long getMemStoreFlushSize() { return region.getMemStoreFlushSize(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index eef91d2..9afa052 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -25,6 +25,9 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; @@ -56,6 +59,7 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -111,8 +115,9 @@ public class TestCompactingMemStore extends TestDefaultMemStore { new HRegionInfo(TableName.valueOf("foobar"), null, null, false); WAL wal = hbaseUtility.createWal(conf, hbaseUtility.getDataTestDir(), info); this.region = HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, htd, wal, true); - //this.region = hbaseUtility.createTestRegion("foobar", hcd); - this.regionServicesForStores = region.getRegionServicesForStores(); + this.regionServicesForStores = Mockito.spy(region.getRegionServicesForStores()); + ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); + Mockito.when(regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); this.store = new HStore(region, hcd, conf); long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index 46ba464..9dbd3e0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -44,6 +44,7 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -219,6 +220,9 @@ public class TestHStore { WALFactory wals = new WALFactory(walConf, methodName); region = new HRegion(new HRegionFileSystem(conf, fs, tableDir, info), wals.getWAL(info), conf, htd, null); + region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); + ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); + Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); } private HStore init(String methodName, Configuration conf, TableDescriptorBuilder builder, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java index b0cbd58..e55f3ee 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java @@ -122,6 +122,7 @@ public class TestRecoveredEditsReplayAndAbort { Mockito.when(rs.getNonceManager()).thenReturn(null); Mockito.when(rs.getServerName()).thenReturn(ServerName .valueOf("test", 0, 111)); + Mockito.when(rs.getConfiguration()).thenReturn(CONF); //create a region TableName testTable = TableName.valueOf("testRecoveredEidtsReplayAndAbort"); TableDescriptor htd = TableDescriptorBuilder.newBuilder(testTable) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java index 15bf2a4..edd8382 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestWalAndCompactingMemStoreFlush.java @@ -18,19 +18,17 @@ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.util.Arrays; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; @@ -40,6 +38,7 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; /** * This test verifies the correctness of the Per Column Family flushing strategy @@ -67,14 +66,14 @@ public class TestWalAndCompactingMemStoreFlush { private Configuration conf; private HRegion initHRegion(String callingMethod, Configuration conf) throws IOException { - int i=0; + int i = 0; HTableDescriptor htd = new HTableDescriptor(TABLENAME); for (byte[] family : FAMILIES) { HColumnDescriptor hcd = new HColumnDescriptor(family); // even column families are going to have compacted memstore - if(i%2 == 0) { - hcd.setInMemoryCompaction(MemoryCompactionPolicy.valueOf( - conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY))); + if (i % 2 == 0) { + hcd.setInMemoryCompaction(MemoryCompactionPolicy + .valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY))); } else { hcd.setInMemoryCompaction(MemoryCompactionPolicy.NONE); } @@ -84,7 +83,12 @@ public class TestWalAndCompactingMemStoreFlush { HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false); Path path = new Path(DIR, callingMethod); - return HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd); + HRegion region = HBaseTestingUtility.createRegionAndWAL(info, path, conf, htd, false); + region.regionServicesForStores = Mockito.spy(region.regionServicesForStores); + ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); + Mockito.when(region.regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool); + region.initialize(null); + return region; } // A helper function to create puts. @@ -109,31 +113,12 @@ public class TestWalAndCompactingMemStoreFlush { return p; } - // A helper function to create gets. - private Get createGet(int familyNum, int putNum) { - byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); - return new Get(row); - } - private void verifyInMemoryFlushSize(Region region) { assertEquals( ((CompactingMemStore) ((HStore)region.getStore(FAMILY1)).memstore).getInmemoryFlushSize(), ((CompactingMemStore) ((HStore)region.getStore(FAMILY3)).memstore).getInmemoryFlushSize()); } - // A helper function to verify edits. - void verifyEdit(int familyNum, int putNum, Table table) throws IOException { - Result r = table.get(createGet(familyNum, putNum)); - byte[] family = FAMILIES[familyNum - 1]; - byte[] qf = Bytes.toBytes("q" + familyNum); - byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); - assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family)); - assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), - r.getFamilyMap(family).get(qf)); - assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum), - Arrays.equals(r.getFamilyMap(family).get(qf), val)); - } - @Before public void setup() { conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index 51827f8..7663313 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -681,6 +681,7 @@ public abstract class AbstractTestWALReplay { RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); Mockito.doReturn(false).when(rsServices).isAborted(); when(rsServices.getServerName()).thenReturn(ServerName.valueOf("foo", 10, 10)); + when(rsServices.getConfiguration()).thenReturn(conf); Configuration customConf = new Configuration(this.conf); customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, CustomStoreFlusher.class.getName());