Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A1410C250 for ; Fri, 19 Dec 2014 01:35:24 +0000 (UTC) Received: (qmail 66438 invoked by uid 500); 19 Dec 2014 01:35:24 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 66392 invoked by uid 500); 19 Dec 2014 01:35:24 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 66383 invoked by uid 99); 19 Dec 2014 01:35:24 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Dec 2014 01:35:24 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 325519CB4E3; Fri, 19 Dec 2014 01:35:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jmhsieh@apache.org To: commits@hbase.apache.org Message-Id: <3e71d28b786f4c0183401549176b56a3@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-12718 Convert TestAcidGuarantees from a unit test to an integration test Date: Fri, 19 Dec 2014 01:35:24 +0000 (UTC) Repository: hbase Updated Branches: refs/heads/branch-1 f0e27b2f7 -> 11d72ff2a HBASE-12718 Convert TestAcidGuarantees from a unit test to an integration test Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/11d72ff2 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/11d72ff2 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/11d72ff2 Branch: refs/heads/branch-1 Commit: 11d72ff2a4e9b6a700da9b9dad41258e28fae63a Parents: f0e27b2 Author: Jonathan M Hsieh Authored: Thu Dec 18 07:48:20 2014 -0800 Committer: Jonathan M Hsieh Committed: Thu Dec 18 17:34:43 2014 -0800 ---------------------------------------------------------------------- .../hbase/IntegrationTestAcidGuarantees.java | 371 +++++++++++++++++ .../apache/hadoop/hbase/TestAcidGuarantees.java | 400 ------------------- 2 files changed, 371 insertions(+), 400 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/11d72ff2/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestAcidGuarantees.java ---------------------------------------------------------------------- diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestAcidGuarantees.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestAcidGuarantees.java new file mode 100644 index 0000000..2307019 --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestAcidGuarantees.java @@ -0,0 +1,371 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; +import org.apache.hadoop.hbase.testclassification.IntegrationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.ToolRunner; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +/** + * This Integration Test verifies acid guarantees across column families by frequently writing + * values to rows with multiple column families and concurrently reading entire rows that expect all + * column families. + */ +@Category(IntegrationTests.class) +public class IntegrationTestAcidGuarantees extends IntegrationTestBase { + + protected static final Log LOG = LogFactory.getLog(IntegrationTestAcidGuarantees.class); + + private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster + public static final TableName TABLE_NAME = + TableName.valueOf(IntegrationTestAcidGuarantees.class.getSimpleName()); + public static final byte [] FAMILY_A = Bytes.toBytes("A"); + public static final byte [] FAMILY_B = Bytes.toBytes("B"); + public static final byte [] FAMILY_C = Bytes.toBytes("C"); + public static final byte[][] FAMILIES = new byte[][] { + FAMILY_A, FAMILY_B, FAMILY_C }; + + public static int NUM_COLS_TO_CHECK = 50; + + protected IntegrationTestingUtility util; + + // **** extends IntegrationTestBase + + @Override + public int runTestFromCommandLine() throws Exception { + Configuration c = getConf(); + int millis = c.getInt("millis", 5000); + int numWriters = c.getInt("numWriters", 50); + int numGetters = c.getInt("numGetters", 2); + int numScanners = c.getInt("numScanners", 2); + int numUniqueRows = c.getInt("numUniqueRows", 3); + runTestAtomicity(millis, numWriters, numGetters, numScanners, numUniqueRows, true); + return 0; + } + + @Override + public void setUpCluster() throws Exception { + // Set small flush size for minicluster so we exercise reseeking scanners + util = getTestingUtil(getConf()); + util.initializeCluster(SERVER_COUNT); + conf = getConf(); + conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128*1024)); + // prevent aggressive region split + conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, + ConstantSizeRegionSplitPolicy.class.getName()); + this.setConf(util.getConfiguration()); + } + + @Override + public TableName getTablename() { + return TABLE_NAME; + } + + @Override + protected Set getColumnFamilies() { + return Sets.newHashSet(String.valueOf(FAMILY_A), String.valueOf(FAMILY_B), + String.valueOf(FAMILY_C)); + } + + // **** core + + /** + * Thread that does random full-row writes into a table. + */ + public static class AtomicityWriter extends MultithreadedTestUtil.RepeatingTestThread { + Random rand = new Random(); + byte data[] = new byte[10]; + byte targetRows[][]; + byte targetFamilies[][]; + Table table; + AtomicLong numWritten = new AtomicLong(); + + public AtomicityWriter(MultithreadedTestUtil.TestContext ctx, byte targetRows[][], + byte targetFamilies[][]) throws IOException { + super(ctx); + this.targetRows = targetRows; + this.targetFamilies = targetFamilies; + table = new HTable(ctx.getConf(), TABLE_NAME); + } + public void doAnAction() throws Exception { + // Pick a random row to write into + byte[] targetRow = targetRows[rand.nextInt(targetRows.length)]; + Put p = new Put(targetRow); + rand.nextBytes(data); + + for (byte[] family : targetFamilies) { + for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { + byte qualifier[] = Bytes.toBytes("col" + i); + p.add(family, qualifier, data); + } + } + table.put(p); + numWritten.getAndIncrement(); + } + } + + /** + * Thread that does single-row reads in a table, looking for partially + * completed rows. + */ + public static class AtomicGetReader extends MultithreadedTestUtil.RepeatingTestThread { + byte targetRow[]; + byte targetFamilies[][]; + Table table; + int numVerified = 0; + AtomicLong numRead = new AtomicLong(); + + public AtomicGetReader(MultithreadedTestUtil.TestContext ctx, byte targetRow[], + byte targetFamilies[][]) throws IOException { + super(ctx); + this.targetRow = targetRow; + this.targetFamilies = targetFamilies; + table = new HTable(ctx.getConf(), TABLE_NAME); + } + + public void doAnAction() throws Exception { + Get g = new Get(targetRow); + Result res = table.get(g); + byte[] gotValue = null; + if (res.getRow() == null) { + // Trying to verify but we didn't find the row - the writing + // thread probably just hasn't started writing yet, so we can + // ignore this action + return; + } + + for (byte[] family : targetFamilies) { + for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { + byte qualifier[] = Bytes.toBytes("col" + i); + byte thisValue[] = res.getValue(family, qualifier); + if (gotValue != null && !Bytes.equals(gotValue, thisValue)) { + gotFailure(gotValue, res); + } + numVerified++; + gotValue = thisValue; + } + } + numRead.getAndIncrement(); + } + + private void gotFailure(byte[] expected, Result res) { + StringBuilder msg = new StringBuilder(); + msg.append("Failed after ").append(numVerified).append("!"); + msg.append("Expected=").append(Bytes.toStringBinary(expected)); + msg.append("Got:\n"); + for (Cell kv : res.listCells()) { + msg.append(kv.toString()); + msg.append(" val= "); + msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv))); + msg.append("\n"); + } + throw new RuntimeException(msg.toString()); + } + } + + /** + * Thread that does full scans of the table looking for any partially completed + * rows. + */ + public static class AtomicScanReader extends MultithreadedTestUtil.RepeatingTestThread { + byte targetFamilies[][]; + Table table; + AtomicLong numScans = new AtomicLong(); + AtomicLong numRowsScanned = new AtomicLong(); + + public AtomicScanReader(MultithreadedTestUtil.TestContext ctx, + byte targetFamilies[][]) throws IOException { + super(ctx); + this.targetFamilies = targetFamilies; + table = new HTable(ctx.getConf(), TABLE_NAME); + } + + public void doAnAction() throws Exception { + Scan s = new Scan(); + for (byte[] family : targetFamilies) { + s.addFamily(family); + } + ResultScanner scanner = table.getScanner(s); + + for (Result res : scanner) { + byte[] gotValue = null; + + for (byte[] family : targetFamilies) { + for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { + byte qualifier[] = Bytes.toBytes("col" + i); + byte thisValue[] = res.getValue(family, qualifier); + if (gotValue != null && !Bytes.equals(gotValue, thisValue)) { + gotFailure(gotValue, res); + } + gotValue = thisValue; + } + } + numRowsScanned.getAndIncrement(); + } + numScans.getAndIncrement(); + } + + private void gotFailure(byte[] expected, Result res) { + StringBuilder msg = new StringBuilder(); + msg.append("Failed after ").append(numRowsScanned).append("!"); + msg.append("Expected=").append(Bytes.toStringBinary(expected)); + msg.append("Got:\n"); + for (Cell kv : res.listCells()) { + msg.append(kv.toString()); + msg.append(" val= "); + msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv))); + msg.append("\n"); + } + throw new RuntimeException(msg.toString()); + } + } + + public void runTestAtomicity(long millisToRun, + int numWriters, + int numGetters, + int numScanners, + int numUniqueRows) throws Exception { + runTestAtomicity(millisToRun, numWriters, numGetters, numScanners, numUniqueRows, false); + } + + private void createTableIfMissing() + throws IOException { + try { + util.createTable(TABLE_NAME, FAMILIES); + } catch (TableExistsException tee) { + } + } + + public void runTestAtomicity(long millisToRun, + int numWriters, + int numGetters, + int numScanners, + int numUniqueRows, + final boolean systemTest) throws Exception { + createTableIfMissing(); + MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(util.getConfiguration()); + + byte rows[][] = new byte[numUniqueRows][]; + for (int i = 0; i < numUniqueRows; i++) { + rows[i] = Bytes.toBytes("test_row_" + i); + } + + List writers = Lists.newArrayList(); + for (int i = 0; i < numWriters; i++) { + AtomicityWriter writer = new AtomicityWriter( + ctx, rows, FAMILIES); + writers.add(writer); + ctx.addThread(writer); + } + // Add a flusher + ctx.addThread(new MultithreadedTestUtil.RepeatingTestThread(ctx) { + Admin admin = util.getHBaseAdmin(); + public void doAnAction() throws Exception { + try { + admin.flush(TABLE_NAME); + } catch(IOException ioe) { + LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe)); + } + // Flushing has been a source of ACID violations previously (see HBASE-2856), so ideally, + // we would flush as often as possible. On a running cluster, this isn't practical: + // (1) we will cause a lot of load due to all the flushing and compacting + // (2) we cannot change the flushing/compacting related Configuration options to try to + // alleviate this + // (3) it is an unrealistic workload, since no one would actually flush that often. + // Therefore, let's flush every minute to have more flushes than usual, but not overload + // the running cluster. + if (systemTest) Thread.sleep(60000); + } + }); + + List getters = Lists.newArrayList(); + for (int i = 0; i < numGetters; i++) { + AtomicGetReader getter = new AtomicGetReader( + ctx, rows[i % numUniqueRows], FAMILIES); + getters.add(getter); + ctx.addThread(getter); + } + + List scanners = Lists.newArrayList(); + for (int i = 0; i < numScanners; i++) { + AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES); + scanners.add(scanner); + ctx.addThread(scanner); + } + + ctx.startThreads(); + ctx.waitFor(millisToRun); + ctx.stop(); + + LOG.info("Finished test. Writers:"); + for (AtomicityWriter writer : writers) { + LOG.info(" wrote " + writer.numWritten.get()); + } + LOG.info("Readers:"); + for (AtomicGetReader reader : getters) { + LOG.info(" read " + reader.numRead.get()); + } + LOG.info("Scanners:"); + for (AtomicScanReader scanner : scanners) { + LOG.info(" scanned " + scanner.numScans.get()); + LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); + } + } + + @Test + public void testGetAtomicity() throws Exception { + runTestAtomicity(20000, 5, 5, 0, 3); + } + + @Test + public void testScanAtomicity() throws Exception { + runTestAtomicity(20000, 5, 0, 5, 3); + } + + @Test + public void testMixedAtomicity() throws Exception { + runTestAtomicity(20000, 5, 2, 2, 3); + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(conf); + int ret = ToolRunner.run(conf, new IntegrationTestAcidGuarantees(), args); + System.exit(ret); + } + + +} + + http://git-wip-us.apache.org/repos/asf/hbase/blob/11d72ff2/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java deleted file mode 100644 index 36da08d..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestAcidGuarantees.java +++ /dev/null @@ -1,400 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase; - -import java.io.IOException; -import java.util.List; -import java.util.Random; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread; -import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import com.google.common.collect.Lists; - -/** - * Test case that uses multiple threads to read and write multifamily rows - * into a table, verifying that reads never see partially-complete writes. - * - * This can run as a junit test, or with a main() function which runs against - * a real cluster (eg for testing with failures, region movement, etc) - */ -@Category(MediumTests.class) -public class TestAcidGuarantees implements Tool { - protected static final Log LOG = LogFactory.getLog(TestAcidGuarantees.class); - public static final TableName TABLE_NAME = TableName.valueOf("TestAcidGuarantees"); - public static final byte [] FAMILY_A = Bytes.toBytes("A"); - public static final byte [] FAMILY_B = Bytes.toBytes("B"); - public static final byte [] FAMILY_C = Bytes.toBytes("C"); - public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data"); - - public static final byte[][] FAMILIES = new byte[][] { - FAMILY_A, FAMILY_B, FAMILY_C }; - - private HBaseTestingUtility util; - - public static int NUM_COLS_TO_CHECK = 50; - - // when run as main - private Configuration conf; - - private void createTableIfMissing() - throws IOException { - try { - util.createTable(TABLE_NAME, FAMILIES); - } catch (TableExistsException tee) { - } - } - - public TestAcidGuarantees() { - // Set small flush size for minicluster so we exercise reseeking scanners - Configuration conf = HBaseConfiguration.create(); - conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128*1024)); - // prevent aggressive region split - conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, - ConstantSizeRegionSplitPolicy.class.getName()); - util = new HBaseTestingUtility(conf); - } - - /** - * Thread that does random full-row writes into a table. - */ - public static class AtomicityWriter extends RepeatingTestThread { - Random rand = new Random(); - byte data[] = new byte[10]; - byte targetRows[][]; - byte targetFamilies[][]; - Table table; - AtomicLong numWritten = new AtomicLong(); - - public AtomicityWriter(TestContext ctx, byte targetRows[][], - byte targetFamilies[][]) throws IOException { - super(ctx); - this.targetRows = targetRows; - this.targetFamilies = targetFamilies; - table = new HTable(ctx.getConf(), TABLE_NAME); - } - public void doAnAction() throws Exception { - // Pick a random row to write into - byte[] targetRow = targetRows[rand.nextInt(targetRows.length)]; - Put p = new Put(targetRow); - rand.nextBytes(data); - - for (byte[] family : targetFamilies) { - for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { - byte qualifier[] = Bytes.toBytes("col" + i); - p.add(family, qualifier, data); - } - } - table.put(p); - numWritten.getAndIncrement(); - } - } - - /** - * Thread that does single-row reads in a table, looking for partially - * completed rows. - */ - public static class AtomicGetReader extends RepeatingTestThread { - byte targetRow[]; - byte targetFamilies[][]; - Table table; - int numVerified = 0; - AtomicLong numRead = new AtomicLong(); - - public AtomicGetReader(TestContext ctx, byte targetRow[], - byte targetFamilies[][]) throws IOException { - super(ctx); - this.targetRow = targetRow; - this.targetFamilies = targetFamilies; - table = new HTable(ctx.getConf(), TABLE_NAME); - } - - public void doAnAction() throws Exception { - Get g = new Get(targetRow); - Result res = table.get(g); - byte[] gotValue = null; - if (res.getRow() == null) { - // Trying to verify but we didn't find the row - the writing - // thread probably just hasn't started writing yet, so we can - // ignore this action - return; - } - - for (byte[] family : targetFamilies) { - for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { - byte qualifier[] = Bytes.toBytes("col" + i); - byte thisValue[] = res.getValue(family, qualifier); - if (gotValue != null && !Bytes.equals(gotValue, thisValue)) { - gotFailure(gotValue, res); - } - numVerified++; - gotValue = thisValue; - } - } - numRead.getAndIncrement(); - } - - private void gotFailure(byte[] expected, Result res) { - StringBuilder msg = new StringBuilder(); - msg.append("Failed after ").append(numVerified).append("!"); - msg.append("Expected=").append(Bytes.toStringBinary(expected)); - msg.append("Got:\n"); - for (Cell kv : res.listCells()) { - msg.append(kv.toString()); - msg.append(" val= "); - msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv))); - msg.append("\n"); - } - throw new RuntimeException(msg.toString()); - } - } - - /** - * Thread that does full scans of the table looking for any partially completed - * rows. - */ - public static class AtomicScanReader extends RepeatingTestThread { - byte targetFamilies[][]; - Table table; - AtomicLong numScans = new AtomicLong(); - AtomicLong numRowsScanned = new AtomicLong(); - - public AtomicScanReader(TestContext ctx, - byte targetFamilies[][]) throws IOException { - super(ctx); - this.targetFamilies = targetFamilies; - table = new HTable(ctx.getConf(), TABLE_NAME); - } - - public void doAnAction() throws Exception { - Scan s = new Scan(); - for (byte[] family : targetFamilies) { - s.addFamily(family); - } - ResultScanner scanner = table.getScanner(s); - - for (Result res : scanner) { - byte[] gotValue = null; - - for (byte[] family : targetFamilies) { - for (int i = 0; i < NUM_COLS_TO_CHECK; i++) { - byte qualifier[] = Bytes.toBytes("col" + i); - byte thisValue[] = res.getValue(family, qualifier); - if (gotValue != null && !Bytes.equals(gotValue, thisValue)) { - gotFailure(gotValue, res); - } - gotValue = thisValue; - } - } - numRowsScanned.getAndIncrement(); - } - numScans.getAndIncrement(); - } - - private void gotFailure(byte[] expected, Result res) { - StringBuilder msg = new StringBuilder(); - msg.append("Failed after ").append(numRowsScanned).append("!"); - msg.append("Expected=").append(Bytes.toStringBinary(expected)); - msg.append("Got:\n"); - for (Cell kv : res.listCells()) { - msg.append(kv.toString()); - msg.append(" val= "); - msg.append(Bytes.toStringBinary(CellUtil.cloneValue(kv))); - msg.append("\n"); - } - throw new RuntimeException(msg.toString()); - } - } - - public void runTestAtomicity(long millisToRun, - int numWriters, - int numGetters, - int numScanners, - int numUniqueRows) throws Exception { - runTestAtomicity(millisToRun, numWriters, numGetters, numScanners, numUniqueRows, false); - } - - public void runTestAtomicity(long millisToRun, - int numWriters, - int numGetters, - int numScanners, - int numUniqueRows, - final boolean systemTest) throws Exception { - createTableIfMissing(); - TestContext ctx = new TestContext(util.getConfiguration()); - - byte rows[][] = new byte[numUniqueRows][]; - for (int i = 0; i < numUniqueRows; i++) { - rows[i] = Bytes.toBytes("test_row_" + i); - } - - List writers = Lists.newArrayList(); - for (int i = 0; i < numWriters; i++) { - AtomicityWriter writer = new AtomicityWriter( - ctx, rows, FAMILIES); - writers.add(writer); - ctx.addThread(writer); - } - // Add a flusher - ctx.addThread(new RepeatingTestThread(ctx) { - HBaseAdmin admin = util.getHBaseAdmin(); - public void doAnAction() throws Exception { - try { - admin.flush(TABLE_NAME); - } catch(IOException ioe) { - LOG.warn("Ignoring exception while flushing: " + StringUtils.stringifyException(ioe)); - } - // Flushing has been a source of ACID violations previously (see HBASE-2856), so ideally, - // we would flush as often as possible. On a running cluster, this isn't practical: - // (1) we will cause a lot of load due to all the flushing and compacting - // (2) we cannot change the flushing/compacting related Configuration options to try to - // alleviate this - // (3) it is an unrealistic workload, since no one would actually flush that often. - // Therefore, let's flush every minute to have more flushes than usual, but not overload - // the running cluster. - if (systemTest) Thread.sleep(60000); - } - }); - - List getters = Lists.newArrayList(); - for (int i = 0; i < numGetters; i++) { - AtomicGetReader getter = new AtomicGetReader( - ctx, rows[i % numUniqueRows], FAMILIES); - getters.add(getter); - ctx.addThread(getter); - } - - List scanners = Lists.newArrayList(); - for (int i = 0; i < numScanners; i++) { - AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES); - scanners.add(scanner); - ctx.addThread(scanner); - } - - ctx.startThreads(); - ctx.waitFor(millisToRun); - ctx.stop(); - - LOG.info("Finished test. Writers:"); - for (AtomicityWriter writer : writers) { - LOG.info(" wrote " + writer.numWritten.get()); - } - LOG.info("Readers:"); - for (AtomicGetReader reader : getters) { - LOG.info(" read " + reader.numRead.get()); - } - LOG.info("Scanners:"); - for (AtomicScanReader scanner : scanners) { - LOG.info(" scanned " + scanner.numScans.get()); - LOG.info(" verified " + scanner.numRowsScanned.get() + " rows"); - } - } - - @Test - public void testGetAtomicity() throws Exception { - util.startMiniCluster(1); - try { - runTestAtomicity(20000, 5, 5, 0, 3); - } finally { - util.shutdownMiniCluster(); - } - } - - @Test - public void testScanAtomicity() throws Exception { - util.startMiniCluster(1); - try { - runTestAtomicity(20000, 5, 0, 5, 3); - } finally { - util.shutdownMiniCluster(); - } - } - - @Test - public void testMixedAtomicity() throws Exception { - util.startMiniCluster(1); - try { - runTestAtomicity(20000, 5, 2, 2, 3); - } finally { - util.shutdownMiniCluster(); - } - } - - //////////////////////////////////////////////////////////////////////////// - // Tool interface - //////////////////////////////////////////////////////////////////////////// - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration c) { - this.conf = c; - this.util = new HBaseTestingUtility(c); - } - - @Override - public int run(String[] arg0) throws Exception { - Configuration c = getConf(); - int millis = c.getInt("millis", 5000); - int numWriters = c.getInt("numWriters", 50); - int numGetters = c.getInt("numGetters", 2); - int numScanners = c.getInt("numScanners", 2); - int numUniqueRows = c.getInt("numUniqueRows", 3); - runTestAtomicity(millis, numWriters, numGetters, numScanners, numUniqueRows, true); - return 0; - } - - public static void main(String args[]) throws Exception { - Configuration c = HBaseConfiguration.create(); - int status; - try { - TestAcidGuarantees test = new TestAcidGuarantees(); - status = ToolRunner.run(c, test, args); - } catch (Exception e) { - LOG.error("Exiting due to error", e); - status = -1; - } - System.exit(status); - } - - -} -