Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5B6D6188DD for ; Wed, 2 Dec 2015 14:25:35 +0000 (UTC) Received: (qmail 64569 invoked by uid 500); 2 Dec 2015 14:25:35 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 64525 invoked by uid 500); 2 Dec 2015 14:25:35 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 64516 invoked by uid 99); 2 Dec 2015 14:25:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Dec 2015 14:25:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 03824E03A9; Wed, 2 Dec 2015 14:25:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tedyu@apache.org To: commits@hbase.apache.org Message-Id: <9aaba0ea36cd4ee18b48b64b075fcf55@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-14575 Relax region read lock for compactions (Nick and Ted) Date: Wed, 2 Dec 2015 14:25:35 +0000 (UTC) Repository: hbase Updated Branches: refs/heads/branch-1 c4bc1c07b -> 3b6ae17a7 HBASE-14575 Relax region read lock for compactions (Nick and Ted) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3b6ae17a Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3b6ae17a Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3b6ae17a Branch: refs/heads/branch-1 Commit: 3b6ae17a72059e8fad9487ab5c02e47d04378e66 Parents: c4bc1c0 Author: tedyu Authored: Wed Dec 2 06:25:26 2015 -0800 Committer: tedyu Committed: Wed Dec 2 06:25:26 2015 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/regionserver/HRegion.java | 84 ++++++++++++++++++-- .../regionserver/TestHRegionServerBulkLoad.java | 73 ++++++++++++++--- 2 files changed, 136 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/3b6ae17a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 3d1d717..7d51101 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1795,8 +1795,80 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } MonitoredTask status = null; boolean requestNeedsCancellation = true; - // block waiting for the lock for compaction - lock.readLock().lock(); + /* + * We are trying to remove / relax the region read lock for compaction. + * Let's see what are the potential race conditions among the operations (user scan, + * region split, region close and region bulk load). + * + * user scan ---> region read lock + * region split --> region close first --> region write lock + * region close --> region write lock + * region bulk load --> region write lock + * + * read lock is compatible with read lock. ---> no problem with user scan/read + * region bulk load does not cause problem for compaction (no consistency problem, store lock + * will help the store file accounting). + * They can run almost concurrently at the region level. + * + * The only remaining race condition is between the region close and compaction. + * So we will evaluate, below, how region close intervenes with compaction if compaction does + * not acquire region read lock. + * + * Here are the steps for compaction: + * 1. obtain list of StoreFile's + * 2. create StoreFileScanner's based on list from #1 + * 3. perform compaction and save resulting files under tmp dir + * 4. swap in compacted files + * + * #1 is guarded by store lock. This patch does not change this --> no worse or better + * For #2, we obtain smallest read point (for region) across all the Scanners (for both default + * compactor and stripe compactor). + * The read points are for user scans. Region keeps the read points for all currently open + * user scanners. + * Compaction needs to know the smallest read point so that during re-write of the hfiles, + * it can remove the mvcc points for the cells if their mvccs are older than the smallest + * since they are not needed anymore. + * This will not conflict with compaction. + * For #3, it can be performed in parallel to other operations. + * For #4 bulk load and compaction don't conflict with each other on the region level + * (for multi-family atomicy). + * Region close and compaction are guarded pretty well by the 'writestate'. + * In HRegion#doClose(), we have : + * synchronized (writestate) { + * // Disable compacting and flushing by background threads for this + * // region. + * canFlush = !writestate.readOnly; + * writestate.writesEnabled = false; + * LOG.debug("Closing " + this + ": disabling compactions & flushes"); + * waitForFlushesAndCompactions(); + * } + * waitForFlushesAndCompactions() would wait for writestate.compacting to come down to 0. + * and in HRegion.compact() + * try { + * synchronized (writestate) { + * if (writestate.writesEnabled) { + * wasStateSet = true; + * ++writestate.compacting; + * } else { + * String msg = "NOT compacting region " + this + ". Writes disabled."; + * LOG.info(msg); + * status.abort(msg); + * return false; + * } + * } + * Also in compactor.performCompaction(): + * check periodically to see if a system stop is requested + * if (closeCheckInterval > 0) { + * bytesWritten += len; + * if (bytesWritten > closeCheckInterval) { + * bytesWritten = 0; + * if (!store.areWritesEnabled()) { + * progress.cancel(); + * return false; + * } + * } + * } + */ try { byte[] cf = Bytes.toBytes(store.getColumnFamilyName()); if (stores.get(cf) != store) { @@ -1854,12 +1926,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi status.markComplete("Compaction complete"); return true; } finally { - try { - if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction); - if (status != null) status.cleanup(); - } finally { - lock.readLock().unlock(); - } + if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction); + if (status != null) status.cleanup(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/3b6ae17a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java index 118270a..6e64eb6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java @@ -17,7 +17,17 @@ */ package org.apache.hadoop.hbase.regionserver; -import com.google.common.collect.Lists; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -30,6 +40,9 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; @@ -59,24 +72,20 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALKey; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import com.google.common.collect.Lists; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; - /** * Tests bulk loading of HFiles and shows the atomicity or lack of atomicity of * the region server's bullkLoad functionality. */ +@RunWith(Parameterized.class) @Category(LargeTests.class) public class TestHRegionServerBulkLoad { private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class); @@ -84,6 +93,7 @@ public class TestHRegionServerBulkLoad { private final static Configuration conf = UTIL.getConfiguration(); private final static byte[] QUAL = Bytes.toBytes("qual"); private final static int NUM_CFS = 10; + private int sleepDuration; public static int BLOCKSIZE = 64 * 1024; public static Algorithm COMPRESSION = Compression.Algorithm.NONE; @@ -93,6 +103,24 @@ public class TestHRegionServerBulkLoad { families[i] = Bytes.toBytes(family(i)); } } + @Parameters + public static final Collection parameters() { + int[] sleepDurations = new int[] { 0, 30000 }; + List configurations = new ArrayList(); + for (int i : sleepDurations) { + configurations.add(new Object[] { i }); + } + return configurations; + } + + public TestHRegionServerBulkLoad(int duration) { + this.sleepDuration = duration; + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf.setInt("hbase.rpc.timeout", 10 * 1000); + } /** * Create a rowkey compatible with @@ -188,8 +216,8 @@ public class TestHRegionServerBulkLoad { caller.callWithRetries(callable, Integer.MAX_VALUE); // Periodically do compaction to reduce the number of open file handles. - if (numBulkLoads.get() % 10 == 0) { - // 10 * 50 = 500 open file handles! + if (numBulkLoads.get() % 5 == 0) { + // 5 * 50 = 250 open file handles! callable = new RegionServerCallable(conn, tableName, Bytes.toBytes("aaa")) { @Override public Void call(int callTimeout) throws Exception { @@ -210,6 +238,23 @@ public class TestHRegionServerBulkLoad { } } + public static class MyObserver extends BaseRegionObserver { + static int sleepDuration; + @Override + public InternalScanner preCompact(ObserverContext e, + final Store store, final InternalScanner scanner, final ScanType scanType) + throws IOException { + try { + Thread.sleep(sleepDuration); + } catch (InterruptedException ie) { + IOException ioe = new InterruptedIOException(); + ioe.initCause(ie); + throw ioe; + } + return scanner; + } + } + /** * Thread that does full scans of the table looking for any partially * completed rows. @@ -277,6 +322,8 @@ public class TestHRegionServerBulkLoad { try { LOG.info("Creating table " + table); HTableDescriptor htd = new HTableDescriptor(table); + htd.addCoprocessor(MyObserver.class.getName()); + MyObserver.sleepDuration = this.sleepDuration; for (int i = 0; i < 10; i++) { htd.addFamily(new HColumnDescriptor(family(i))); } @@ -347,7 +394,7 @@ public class TestHRegionServerBulkLoad { public static void main(String args[]) throws Exception { try { Configuration c = HBaseConfiguration.create(); - TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(); + TestHRegionServerBulkLoad test = new TestHRegionServerBulkLoad(0); test.setConf(c); test.runAtomicBulkloadTest(TableName.valueOf("atomicTableTest"), 5 * 60 * 1000, 50); } finally {