Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8520E18244 for ; Fri, 29 Jan 2016 01:44:19 +0000 (UTC) Received: (qmail 97817 invoked by uid 500); 29 Jan 2016 01:44:19 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 97771 invoked by uid 500); 29 Jan 2016 01:44:19 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 97762 invoked by uid 99); 29 Jan 2016 01:44:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Jan 2016 01:44:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3291CE0B3A; Fri, 29 Jan 2016 01:44:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhangduo@apache.org To: commits@hbase.apache.org Date: Fri, 29 Jan 2016 01:44:19 -0000 Message-Id: <977cda12bb3e4034a6d42b68a789da98@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] hbase git commit: HBASE-14969 Add throughput controller for flush Repository: hbase Updated Branches: refs/heads/master 14dd959aa -> b3b1ce99c http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java new file mode 100644 index 0000000..5d5be87 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/throttle/TestFlushWithThroughputController.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.throttle; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreEngine; +import org.apache.hadoop.hbase.regionserver.StripeStoreEngine; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestFlushWithThroughputController { + + private static final Log LOG = LogFactory.getLog(TestFlushWithThroughputController.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static final double EPSILON = 1E-6; + + private final TableName tableName = TableName.valueOf(getClass().getSimpleName()); + + private final byte[] family = Bytes.toBytes("f"); + + private final byte[] qualifier = Bytes.toBytes("q"); + + private Store getStoreWithName(TableName tableName) { + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + List rsts = cluster.getRegionServerThreads(); + for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { + HRegionServer hrs = rsts.get(i).getRegionServer(); + for (Region region : hrs.getOnlineRegions(tableName)) { + return region.getStores().iterator().next(); + } + } + return null; + } + + private Store generateAndFlushData() throws IOException { + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + if (admin.tableExists(tableName)) { + admin.disableTable(tableName); + admin.deleteTable(tableName); + } + HTable table = TEST_UTIL.createTable(tableName, family); + Random rand = new Random(); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + byte[] value = new byte[256 * 1024]; + rand.nextBytes(value); + table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); + } + admin.flush(tableName); + } + return getStoreWithName(tableName); + } + + private long testFlushWithThroughputLimit() throws Exception { + long throughputLimit = 1L * 1024 * 1024; + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, + PressureAwareFlushThroughputController.class.getName()); + conf.setLong( + PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND, + throughputLimit); + conf.setLong( + PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND, + throughputLimit); + conf.setLong( + PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_CONTROL_CHECK_INTERVAL, + throughputLimit); + TEST_UTIL.startMiniCluster(1); + try { + long startTime = System.nanoTime(); + Store store = generateAndFlushData(); + assertEquals(10, store.getStorefilesCount()); + long duration = System.nanoTime() - startTime; + double throughput = (double) store.getStorefilesSize() / duration * 1000 * 1000 * 1000; + LOG.debug("Throughput is: " + (throughput / 1024 / 1024) + " MB/s"); + // confirm that the speed limit work properly(not too fast, and also not too slow) + // 20% is the max acceptable error rate. + assertTrue(throughput < throughputLimit * 1.2); + assertTrue(throughput > throughputLimit * 0.8); + return duration; + } finally { + TEST_UTIL.shutdownMiniCluster(); + } + } + + private long testFlushWithoutThroughputLimit() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, + NoLimitThroughputController.class.getName()); + TEST_UTIL.startMiniCluster(1); + try { + long startTime = System.nanoTime(); + Store store = generateAndFlushData(); + assertEquals(10, store.getStorefilesCount()); + long duration = System.nanoTime() - startTime; + double throughput = (double) store.getStorefilesSize() / duration * 1000 * 1000 * 1000; + LOG.debug("Throughput w/o limit is: " + (throughput / 1024 / 1024) + " MB/s"); + return duration; + } finally { + TEST_UTIL.shutdownMiniCluster(); + } + } + + @Test + public void testFlushControl() throws Exception { + long limitTime = testFlushWithThroughputLimit(); + long noLimitTime = testFlushWithoutThroughputLimit(); + LOG.info("With 1M/s limit, flush use " + (limitTime / 1000000) + + "ms; without limit, flush use " + (noLimitTime / 1000000) + "ms"); + // Commonly if multiple region flush at the same time, the throughput could be very high + // but flush in this test is in serial, so we use a weak assumption. + assertTrue(limitTime > 2 * noLimitTime); + } + + /** + * Test the tuning task of {@link PressureAwareFlushThroughputController} + */ + @Test + public void testFlushThroughputTuning() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, DefaultStoreEngine.class.getName()); + conf.setLong( + PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_UPPER_BOUND, + 20L * 1024 * 1024); + conf.setLong( + PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_MAX_THROUGHPUT_LOWER_BOUND, + 10L * 1024 * 1024); + conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, + PressureAwareFlushThroughputController.class.getName()); + conf.setInt(PressureAwareFlushThroughputController.HBASE_HSTORE_FLUSH_THROUGHPUT_TUNE_PERIOD, + 3000); + TEST_UTIL.startMiniCluster(1); + assertEquals(10L * 1024 * 1024, + ((PressureAwareThroughputController) TEST_UTIL.getMiniHBaseCluster().getRegionServer(0) + .getFlushThroughputController()).getMaxThroughput(), EPSILON); + Connection conn = ConnectionFactory.createConnection(conf); + try { + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(family)); + htd.setCompactionEnabled(false); + TEST_UTIL.getHBaseAdmin().createTable(htd); + TEST_UTIL.waitTableAvailable(tableName); + HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); + PressureAwareFlushThroughputController throughputController = + (PressureAwareFlushThroughputController) regionServer.getFlushThroughputController(); + Table table = conn.getTable(tableName); + Random rand = new Random(); + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + byte[] value = new byte[256 * 1024]; + rand.nextBytes(value); + table.put(new Put(Bytes.toBytes(i * 10 + j)).addColumn(family, qualifier, value)); + } + } + Thread.sleep(5000); + double expectedThroughPut = 10L * 1024 * 1024 * (1 + regionServer.getFlushPressure()); + assertEquals(expectedThroughPut, throughputController.getMaxThroughput(), EPSILON); + + conf.set(FlushThroughputControllerFactory.HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY, + NoLimitThroughputController.class.getName()); + regionServer.onConfigurationChange(conf); + assertTrue(throughputController.isStopped()); + assertTrue(regionServer.getFlushThroughputController() instanceof NoLimitThroughputController); + } finally { + conn.close(); + TEST_UTIL.shutdownMiniCluster(); + } + } + + /** + * Test the logic for striped store. + */ + @Test + public void testFlushControlForStripedStore() throws Exception { + TEST_UTIL.getConfiguration().set(StoreEngine.STORE_ENGINE_CLASS_KEY, + StripeStoreEngine.class.getName()); + testFlushControl(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 549a018..011c75d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -64,7 +64,19 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; -import org.apache.hadoop.hbase.regionserver.*; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; +import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; +import org.apache.hadoop.hbase.regionserver.FlushRequestListener; +import org.apache.hadoop.hbase.regionserver.FlushRequester; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; @@ -644,11 +656,11 @@ public class TestWALReplay { } @Override public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, - MonitoredTask status) throws IOException { + MonitoredTask status, ThroughputController throughputController) throws IOException { if (throwExceptionWhenFlushing.get()) { throw new IOException("Simulated exception by tests"); } - return super.flushSnapshot(snapshot, cacheFlushId, status); + return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController); } };