From commits-return-77994-archive-asf-public=cust-asf.ponee.io@hbase.apache.org Wed Sep 12 16:54:09 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 9367918067A for ; Wed, 12 Sep 2018 16:54:05 +0200 (CEST) Received: (qmail 7833 invoked by uid 500); 12 Sep 2018 14:54:03 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 6819 invoked by uid 99); 12 Sep 2018 14:54:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Sep 2018 14:54:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2B04EE0201; Wed, 12 Sep 2018 14:54:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: git-site-role@apache.org To: commits@hbase.apache.org Date: Wed, 12 Sep 2018 14:54:11 -0000 Message-Id: <1311e6b83f734f6791e6f9a321ef2a1f@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [10/49] hbase-site git commit: Published site at 3810ba2c6edfc531181ffc9e6c68396a0c2d2027. http://git-wip-us.apache.org/repos/asf/hbase-site/blob/705d69c4/testdevapidocs/src-html/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.MockHRegion.WrappedRowLock.html ---------------------------------------------------------------------- diff --git a/testdevapidocs/src-html/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.MockHRegion.WrappedRowLock.html b/testdevapidocs/src-html/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.MockHRegion.WrappedRowLock.html index 7ed37ca..907a45d 100644 --- a/testdevapidocs/src-html/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.MockHRegion.WrappedRowLock.html +++ b/testdevapidocs/src-html/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.MockHRegion.WrappedRowLock.html @@ -28,713 +28,752 @@ 020import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; 021import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; 022import static org.junit.Assert.assertEquals; -023import static org.junit.Assert.assertTrue; -024import static org.junit.Assert.fail; -025 -026import java.io.IOException; -027import java.util.ArrayList; -028import java.util.Arrays; -029import java.util.List; -030import java.util.Objects; -031import java.util.Random; -032import java.util.concurrent.CountDownLatch; -033import java.util.concurrent.atomic.AtomicInteger; -034import java.util.concurrent.atomic.AtomicLong; -035import org.apache.hadoop.conf.Configuration; -036import org.apache.hadoop.fs.FileSystem; -037import org.apache.hadoop.fs.Path; -038import org.apache.hadoop.hbase.Cell; -039import org.apache.hadoop.hbase.CellUtil; -040import org.apache.hadoop.hbase.CompareOperator; -041import org.apache.hadoop.hbase.HBaseClassTestRule; -042import org.apache.hadoop.hbase.HBaseTestingUtility; -043import org.apache.hadoop.hbase.HColumnDescriptor; -044import org.apache.hadoop.hbase.HConstants; -045import org.apache.hadoop.hbase.HRegionInfo; -046import org.apache.hadoop.hbase.HTableDescriptor; -047import org.apache.hadoop.hbase.MultithreadedTestUtil; -048import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; -049import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; -050import org.apache.hadoop.hbase.TableName; -051import org.apache.hadoop.hbase.client.Append; -052import org.apache.hadoop.hbase.client.Delete; -053import org.apache.hadoop.hbase.client.Durability; -054import org.apache.hadoop.hbase.client.Get; -055import org.apache.hadoop.hbase.client.Increment; -056import org.apache.hadoop.hbase.client.IsolationLevel; -057import org.apache.hadoop.hbase.client.Mutation; -058import org.apache.hadoop.hbase.client.Put; -059import org.apache.hadoop.hbase.client.RegionInfo; -060import org.apache.hadoop.hbase.client.Result; -061import org.apache.hadoop.hbase.client.RowMutations; -062import org.apache.hadoop.hbase.client.Scan; -063import org.apache.hadoop.hbase.client.TableDescriptor; -064import org.apache.hadoop.hbase.filter.BinaryComparator; -065import org.apache.hadoop.hbase.io.HeapSize; -066import org.apache.hadoop.hbase.io.hfile.BlockCache; -067import org.apache.hadoop.hbase.testclassification.MediumTests; -068import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; -069import org.apache.hadoop.hbase.util.Bytes; -070import org.apache.hadoop.hbase.wal.WAL; -071import org.junit.After; -072import org.junit.Before; -073import org.junit.ClassRule; -074import org.junit.Rule; -075import org.junit.Test; -076import org.junit.experimental.categories.Category; -077import org.junit.rules.TestName; -078import org.slf4j.Logger; -079import org.slf4j.LoggerFactory; -080 -081/** -082 * Testing of HRegion.incrementColumnValue, HRegion.increment, -083 * and HRegion.append -084 */ -085@Category({VerySlowRegionServerTests.class, MediumTests.class}) // Starts 100 threads -086public class TestAtomicOperation { -087 -088 @ClassRule -089 public static final HBaseClassTestRule CLASS_RULE = -090 HBaseClassTestRule.forClass(TestAtomicOperation.class); -091 -092 private static final Logger LOG = LoggerFactory.getLogger(TestAtomicOperation.class); -093 @Rule public TestName name = new TestName(); -094 -095 HRegion region = null; -096 private HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU(); -097 -098 // Test names -099 static byte[] tableName; -100 static final byte[] qual1 = Bytes.toBytes("qual1"); -101 static final byte[] qual2 = Bytes.toBytes("qual2"); -102 static final byte[] qual3 = Bytes.toBytes("qual3"); -103 static final byte[] value1 = Bytes.toBytes("value1"); -104 static final byte[] value2 = Bytes.toBytes("value2"); -105 static final byte [] row = Bytes.toBytes("rowA"); -106 static final byte [] row2 = Bytes.toBytes("rowB"); -107 -108 @Before -109 public void setup() { -110 tableName = Bytes.toBytes(name.getMethodName()); -111 } -112 -113 @After -114 public void teardown() throws IOException { -115 if (region != null) { -116 BlockCache bc = region.getStores().get(0).getCacheConfig().getBlockCache(); -117 region.close(); -118 WAL wal = region.getWAL(); -119 if (wal != null) wal.close(); -120 if (bc != null) bc.shutdown(); -121 region = null; -122 } -123 } -124 ////////////////////////////////////////////////////////////////////////////// -125 // New tests that doesn't spin up a mini cluster but rather just test the -126 // individual code pieces in the HRegion. -127 ////////////////////////////////////////////////////////////////////////////// -128 -129 /** -130 * Test basic append operation. -131 * More tests in -132 * @see org.apache.hadoop.hbase.client.TestFromClientSide#testAppend() -133 */ -134 @Test -135 public void testAppend() throws IOException { -136 initHRegion(tableName, name.getMethodName(), fam1); -137 String v1 = "Ultimate Answer to the Ultimate Question of Life,"+ -138 " The Universe, and Everything"; -139 String v2 = " is... 42."; -140 Append a = new Append(row); -141 a.setReturnResults(false); -142 a.addColumn(fam1, qual1, Bytes.toBytes(v1)); -143 a.addColumn(fam1, qual2, Bytes.toBytes(v2)); -144 assertTrue(region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE).isEmpty()); -145 a = new Append(row); -146 a.addColumn(fam1, qual1, Bytes.toBytes(v2)); -147 a.addColumn(fam1, qual2, Bytes.toBytes(v1)); -148 Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); -149 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1+v2), result.getValue(fam1, qual1))); -150 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2+v1), result.getValue(fam1, qual2))); -151 } -152 -153 @Test -154 public void testAppendWithNonExistingFamily() throws IOException { -155 initHRegion(tableName, name.getMethodName(), fam1); -156 final String v1 = "Value"; -157 final Append a = new Append(row); -158 a.addColumn(fam1, qual1, Bytes.toBytes(v1)); -159 a.addColumn(fam2, qual2, Bytes.toBytes(v1)); -160 Result result = null; -161 try { -162 result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); -163 fail("Append operation should fail with NoSuchColumnFamilyException."); -164 } catch (NoSuchColumnFamilyException e) { -165 assertEquals(null, result); -166 } catch (Exception e) { -167 fail("Append operation should fail with NoSuchColumnFamilyException."); -168 } -169 } -170 -171 @Test -172 public void testIncrementWithNonExistingFamily() throws IOException { -173 initHRegion(tableName, name.getMethodName(), fam1); -174 final Increment inc = new Increment(row); -175 inc.addColumn(fam1, qual1, 1); -176 inc.addColumn(fam2, qual2, 1); -177 inc.setDurability(Durability.ASYNC_WAL); -178 try { -179 region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE); -180 } catch (NoSuchColumnFamilyException e) { -181 final Get g = new Get(row); -182 final Result result = region.get(g); -183 assertEquals(null, result.getValue(fam1, qual1)); -184 assertEquals(null, result.getValue(fam2, qual2)); -185 } catch (Exception e) { -186 fail("Increment operation should fail with NoSuchColumnFamilyException."); -187 } -188 } -189 -190 /** -191 * Test multi-threaded increments. -192 */ -193 @Test -194 public void testIncrementMultiThreads() throws IOException { -195 boolean fast = true; -196 LOG.info("Starting test testIncrementMultiThreads"); -197 // run a with mixed column families (1 and 3 versions) -198 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2); -199 -200 // Create 100 threads, each will increment by its own quantity. All 100 threads update the -201 // same row over two column families. -202 int numThreads = 100; -203 int incrementsPerThread = 1000; -204 Incrementer[] all = new Incrementer[numThreads]; -205 int expectedTotal = 0; -206 // create all threads -207 for (int i = 0; i < numThreads; i++) { -208 all[i] = new Incrementer(region, i, i, incrementsPerThread); -209 expectedTotal += (i * incrementsPerThread); -210 } -211 -212 // run all threads -213 for (int i = 0; i < numThreads; i++) { -214 all[i].start(); -215 } -216 -217 // wait for all threads to finish -218 for (int i = 0; i < numThreads; i++) { -219 try { -220 all[i].join(); -221 } catch (InterruptedException e) { -222 LOG.info("Ignored", e); -223 } -224 } -225 assertICV(row, fam1, qual1, expectedTotal, fast); -226 assertICV(row, fam1, qual2, expectedTotal*2, fast); -227 assertICV(row, fam2, qual3, expectedTotal*3, fast); -228 LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal); -229 } -230 -231 -232 private void assertICV(byte [] row, -233 byte [] familiy, -234 byte[] qualifier, -235 long amount, -236 boolean fast) throws IOException { -237 // run a get and see? -238 Get get = new Get(row); -239 if (fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); -240 get.addColumn(familiy, qualifier); -241 Result result = region.get(get); -242 assertEquals(1, result.size()); -243 -244 Cell kv = result.rawCells()[0]; -245 long r = Bytes.toLong(CellUtil.cloneValue(kv)); -246 assertEquals(amount, r); -247 } -248 -249 private void initHRegion (byte [] tableName, String callingMethod, -250 byte[] ... families) -251 throws IOException { -252 initHRegion(tableName, callingMethod, null, families); -253 } -254 -255 private void initHRegion (byte [] tableName, String callingMethod, int [] maxVersions, -256 byte[] ... families) -257 throws IOException { -258 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); -259 int i=0; -260 for(byte [] family : families) { -261 HColumnDescriptor hcd = new HColumnDescriptor(family); -262 hcd.setMaxVersions(maxVersions != null ? maxVersions[i++] : 1); -263 htd.addFamily(hcd); -264 } -265 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); -266 region = TEST_UTIL.createLocalHRegion(info, htd); -267 } -268 -269 /** -270 * A thread that makes increment calls always on the same row, this.row against two column -271 * families on this row. -272 */ -273 public static class Incrementer extends Thread { -274 -275 private final Region region; -276 private final int numIncrements; -277 private final int amount; -278 -279 -280 public Incrementer(Region region, int threadNumber, int amount, int numIncrements) { -281 super("Incrementer." + threadNumber); -282 this.region = region; -283 this.numIncrements = numIncrements; -284 this.amount = amount; -285 setDaemon(true); -286 } +023import static org.junit.Assert.assertNotNull; +024import static org.junit.Assert.assertTrue; +025import static org.junit.Assert.fail; +026 +027import java.io.IOException; +028import java.util.ArrayList; +029import java.util.Arrays; +030import java.util.List; +031import java.util.Objects; +032import java.util.Random; +033import java.util.concurrent.CountDownLatch; +034import java.util.concurrent.atomic.AtomicInteger; +035import java.util.concurrent.atomic.AtomicLong; +036import org.apache.hadoop.conf.Configuration; +037import org.apache.hadoop.fs.FileSystem; +038import org.apache.hadoop.fs.Path; +039import org.apache.hadoop.hbase.Cell; +040import org.apache.hadoop.hbase.CellUtil; +041import org.apache.hadoop.hbase.CompareOperator; +042import org.apache.hadoop.hbase.HBaseClassTestRule; +043import org.apache.hadoop.hbase.HBaseTestingUtility; +044import org.apache.hadoop.hbase.HColumnDescriptor; +045import org.apache.hadoop.hbase.HConstants; +046import org.apache.hadoop.hbase.HRegionInfo; +047import org.apache.hadoop.hbase.HTableDescriptor; +048import org.apache.hadoop.hbase.MultithreadedTestUtil; +049import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext; +050import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; +051import org.apache.hadoop.hbase.TableName; +052import org.apache.hadoop.hbase.client.Append; +053import org.apache.hadoop.hbase.client.Delete; +054import org.apache.hadoop.hbase.client.Durability; +055import org.apache.hadoop.hbase.client.Get; +056import org.apache.hadoop.hbase.client.Increment; +057import org.apache.hadoop.hbase.client.IsolationLevel; +058import org.apache.hadoop.hbase.client.Mutation; +059import org.apache.hadoop.hbase.client.Put; +060import org.apache.hadoop.hbase.client.RegionInfo; +061import org.apache.hadoop.hbase.client.Result; +062import org.apache.hadoop.hbase.client.RowMutations; +063import org.apache.hadoop.hbase.client.Scan; +064import org.apache.hadoop.hbase.client.TableDescriptor; +065import org.apache.hadoop.hbase.filter.BinaryComparator; +066import org.apache.hadoop.hbase.io.HeapSize; +067import org.apache.hadoop.hbase.io.hfile.BlockCache; +068import org.apache.hadoop.hbase.testclassification.MediumTests; +069import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; +070import org.apache.hadoop.hbase.util.Bytes; +071import org.apache.hadoop.hbase.wal.WAL; +072import org.junit.After; +073import org.junit.Before; +074import org.junit.ClassRule; +075import org.junit.Rule; +076import org.junit.Test; +077import org.junit.experimental.categories.Category; +078import org.junit.rules.TestName; +079import org.slf4j.Logger; +080import org.slf4j.LoggerFactory; +081 +082/** +083 * Testing of HRegion.incrementColumnValue, HRegion.increment, +084 * and HRegion.append +085 */ +086@Category({VerySlowRegionServerTests.class, MediumTests.class}) // Starts 100 threads +087public class TestAtomicOperation { +088 +089 @ClassRule +090 public static final HBaseClassTestRule CLASS_RULE = +091 HBaseClassTestRule.forClass(TestAtomicOperation.class); +092 +093 private static final Logger LOG = LoggerFactory.getLogger(TestAtomicOperation.class); +094 @Rule public TestName name = new TestName(); +095 +096 HRegion region = null; +097 private HBaseTestingUtility TEST_UTIL = HBaseTestingUtility.createLocalHTU(); +098 +099 // Test names +100 static byte[] tableName; +101 static final byte[] qual1 = Bytes.toBytes("qual1"); +102 static final byte[] qual2 = Bytes.toBytes("qual2"); +103 static final byte[] qual3 = Bytes.toBytes("qual3"); +104 static final byte[] value1 = Bytes.toBytes("value1"); +105 static final byte[] value2 = Bytes.toBytes("value2"); +106 static final byte [] row = Bytes.toBytes("rowA"); +107 static final byte [] row2 = Bytes.toBytes("rowB"); +108 +109 @Before +110 public void setup() { +111 tableName = Bytes.toBytes(name.getMethodName()); +112 } +113 +114 @After +115 public void teardown() throws IOException { +116 if (region != null) { +117 BlockCache bc = region.getStores().get(0).getCacheConfig().getBlockCache(); +118 region.close(); +119 WAL wal = region.getWAL(); +120 if (wal != null) wal.close(); +121 if (bc != null) bc.shutdown(); +122 region = null; +123 } +124 } +125 ////////////////////////////////////////////////////////////////////////////// +126 // New tests that doesn't spin up a mini cluster but rather just test the +127 // individual code pieces in the HRegion. +128 ////////////////////////////////////////////////////////////////////////////// +129 +130 /** +131 * Test basic append operation. +132 * More tests in +133 * @see org.apache.hadoop.hbase.client.TestFromClientSide#testAppend() +134 */ +135 @Test +136 public void testAppend() throws IOException { +137 initHRegion(tableName, name.getMethodName(), fam1); +138 String v1 = "Ultimate Answer to the Ultimate Question of Life,"+ +139 " The Universe, and Everything"; +140 String v2 = " is... 42."; +141 Append a = new Append(row); +142 a.setReturnResults(false); +143 a.addColumn(fam1, qual1, Bytes.toBytes(v1)); +144 a.addColumn(fam1, qual2, Bytes.toBytes(v2)); +145 assertTrue(region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE).isEmpty()); +146 a = new Append(row); +147 a.addColumn(fam1, qual1, Bytes.toBytes(v2)); +148 a.addColumn(fam1, qual2, Bytes.toBytes(v1)); +149 Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); +150 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1+v2), result.getValue(fam1, qual1))); +151 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2+v1), result.getValue(fam1, qual2))); +152 } +153 +154 @Test +155 public void testAppendWithMultipleFamilies() throws IOException { +156 final byte[] fam3 = Bytes.toBytes("colfamily31"); +157 initHRegion(tableName, name.getMethodName(), fam1, fam2, fam3); +158 String v1 = "Appended"; +159 String v2 = "Value"; +160 +161 Append a = new Append(row); +162 a.setReturnResults(false); +163 a.addColumn(fam1, qual1, Bytes.toBytes(v1)); +164 a.addColumn(fam2, qual2, Bytes.toBytes(v2)); +165 Result result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); +166 assertTrue("Expected an empty result but result contains " + result.size() + " keys", +167 result.isEmpty()); +168 +169 a = new Append(row); +170 a.addColumn(fam2, qual2, Bytes.toBytes(v1)); +171 a.addColumn(fam1, qual1, Bytes.toBytes(v2)); +172 a.addColumn(fam3, qual3, Bytes.toBytes(v2)); +173 a.addColumn(fam1, qual2, Bytes.toBytes(v1)); +174 +175 result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); +176 +177 byte[] actualValue1 = result.getValue(fam1, qual1); +178 byte[] actualValue2 = result.getValue(fam2, qual2); +179 byte[] actualValue3 = result.getValue(fam3, qual3); +180 byte[] actualValue4 = result.getValue(fam1, qual2); +181 +182 assertNotNull("Value1 should bot be null", actualValue1); +183 assertNotNull("Value2 should bot be null", actualValue2); +184 assertNotNull("Value3 should bot be null", actualValue3); +185 assertNotNull("Value4 should bot be null", actualValue4); +186 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1 + v2), actualValue1)); +187 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2 + v1), actualValue2)); +188 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v2), actualValue3)); +189 assertEquals(0, Bytes.compareTo(Bytes.toBytes(v1), actualValue4)); +190 } +191 +192 @Test +193 public void testAppendWithNonExistingFamily() throws IOException { +194 initHRegion(tableName, name.getMethodName(), fam1); +195 final String v1 = "Value"; +196 final Append a = new Append(row); +197 a.addColumn(fam1, qual1, Bytes.toBytes(v1)); +198 a.addColumn(fam2, qual2, Bytes.toBytes(v1)); +199 Result result = null; +200 try { +201 result = region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); +202 fail("Append operation should fail with NoSuchColumnFamilyException."); +203 } catch (NoSuchColumnFamilyException e) { +204 assertEquals(null, result); +205 } catch (Exception e) { +206 fail("Append operation should fail with NoSuchColumnFamilyException."); +207 } +208 } +209 +210 @Test +211 public void testIncrementWithNonExistingFamily() throws IOException { +212 initHRegion(tableName, name.getMethodName(), fam1); +213 final Increment inc = new Increment(row); +214 inc.addColumn(fam1, qual1, 1); +215 inc.addColumn(fam2, qual2, 1); +216 inc.setDurability(Durability.ASYNC_WAL); +217 try { +218 region.increment(inc, HConstants.NO_NONCE, HConstants.NO_NONCE); +219 } catch (NoSuchColumnFamilyException e) { +220 final Get g = new Get(row); +221 final Result result = region.get(g); +222 assertEquals(null, result.getValue(fam1, qual1)); +223 assertEquals(null, result.getValue(fam2, qual2)); +224 } catch (Exception e) { +225 fail("Increment operation should fail with NoSuchColumnFamilyException."); +226 } +227 } +228 +229 /** +230 * Test multi-threaded increments. +231 */ +232 @Test +233 public void testIncrementMultiThreads() throws IOException { +234 boolean fast = true; +235 LOG.info("Starting test testIncrementMultiThreads"); +236 // run a with mixed column families (1 and 3 versions) +237 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2); +238 +239 // Create 100 threads, each will increment by its own quantity. All 100 threads update the +240 // same row over two column families. +241 int numThreads = 100; +242 int incrementsPerThread = 1000; +243 Incrementer[] all = new Incrementer[numThreads]; +244 int expectedTotal = 0; +245 // create all threads +246 for (int i = 0; i < numThreads; i++) { +247 all[i] = new Incrementer(region, i, i, incrementsPerThread); +248 expectedTotal += (i * incrementsPerThread); +249 } +250 +251 // run all threads +252 for (int i = 0; i < numThreads; i++) { +253 all[i].start(); +254 } +255 +256 // wait for all threads to finish +257 for (int i = 0; i < numThreads; i++) { +258 try { +259 all[i].join(); +260 } catch (InterruptedException e) { +261 LOG.info("Ignored", e); +262 } +263 } +264 assertICV(row, fam1, qual1, expectedTotal, fast); +265 assertICV(row, fam1, qual2, expectedTotal*2, fast); +266 assertICV(row, fam2, qual3, expectedTotal*3, fast); +267 LOG.info("testIncrementMultiThreads successfully verified that total is " + expectedTotal); +268 } +269 +270 +271 private void assertICV(byte [] row, +272 byte [] familiy, +273 byte[] qualifier, +274 long amount, +275 boolean fast) throws IOException { +276 // run a get and see? +277 Get get = new Get(row); +278 if (fast) get.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); +279 get.addColumn(familiy, qualifier); +280 Result result = region.get(get); +281 assertEquals(1, result.size()); +282 +283 Cell kv = result.rawCells()[0]; +284 long r = Bytes.toLong(CellUtil.cloneValue(kv)); +285 assertEquals(amount, r); +286 } 287 -288 @Override -289 public void run() { -290 for (int i = 0; i < numIncrements; i++) { -291 try { -292 Increment inc = new Increment(row); -293 inc.addColumn(fam1, qual1, amount); -294 inc.addColumn(fam1, qual2, amount*2); -295 inc.addColumn(fam2, qual3, amount*3); -296 inc.setDurability(Durability.ASYNC_WAL); -297 Result result = region.increment(inc); -298 if (result != null) { -299 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, -300 Bytes.toLong(result.getValue(fam1, qual2))); -301 assertTrue(result.getValue(fam2, qual3) != null); -302 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3, -303 Bytes.toLong(result.getValue(fam2, qual3))); -304 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, -305 Bytes.toLong(result.getValue(fam1, qual2))); -306 long fam1Increment = Bytes.toLong(result.getValue(fam1, qual1))*3; -307 long fam2Increment = Bytes.toLong(result.getValue(fam2, qual3)); -308 assertEquals("fam1=" + fam1Increment + ", fam2=" + fam2Increment, -309 fam1Increment, fam2Increment); -310 } -311 } catch (IOException e) { -312 e.printStackTrace(); -313 } -314 } -315 } -316 } +288 private void initHRegion (byte [] tableName, String callingMethod, +289 byte[] ... families) +290 throws IOException { +291 initHRegion(tableName, callingMethod, null, families); +292 } +293 +294 private void initHRegion (byte [] tableName, String callingMethod, int [] maxVersions, +295 byte[] ... families) +296 throws IOException { +297 HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); +298 int i=0; +299 for(byte [] family : families) { +300 HColumnDescriptor hcd = new HColumnDescriptor(family); +301 hcd.setMaxVersions(maxVersions != null ? maxVersions[i++] : 1); +302 htd.addFamily(hcd); +303 } +304 HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false); +305 region = TEST_UTIL.createLocalHRegion(info, htd); +306 } +307 +308 /** +309 * A thread that makes increment calls always on the same row, this.row against two column +310 * families on this row. +311 */ +312 public static class Incrementer extends Thread { +313 +314 private final Region region; +315 private final int numIncrements; +316 private final int amount; 317 -318 @Test -319 public void testAppendMultiThreads() throws IOException { -320 LOG.info("Starting test testAppendMultiThreads"); -321 // run a with mixed column families (1 and 3 versions) -322 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2); -323 -324 int numThreads = 100; -325 int opsPerThread = 100; -326 AtomicOperation[] all = new AtomicOperation[numThreads]; -327 final byte[] val = new byte[]{1}; -328 -329 AtomicInteger failures = new AtomicInteger(0); -330 // create all threads -331 for (int i = 0; i < numThreads; i++) { -332 all[i] = new AtomicOperation(region, opsPerThread, null, failures) { -333 @Override -334 public void run() { -335 for (int i=0; i<numOps; i++) { -336 try { -337 Append a = new Append(row); -338 a.addColumn(fam1, qual1, val); -339 a.addColumn(fam1, qual2, val); -340 a.addColumn(fam2, qual3, val); -341 a.setDurability(Durability.ASYNC_WAL); -342 region.append(a, HConstants.NO_NONCE, HConstants.NO_NONCE); -343 -344 Get g = new Get(row); -345 Result result = region.get(g); -346 assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam1, qual2).length); -347 assertEquals(result.getValue(fam1, qual1).length, result.getValue(fam2, qual3).length); -348 } catch (IOException e) { -349 e.printStackTrace(); -350 failures.incrementAndGet(); -351 fail(); -352 } -353 } -354 } -355 }; -356 } -357 -358 // run all threads -359 for (int i = 0; i < numThreads; i++) { -360 all[i].start(); -361 } +318 +319 public Incrementer(Region region, int threadNumber, int amount, int numIncrements) { +320 super("Incrementer." + threadNumber); +321 this.region = region; +322 this.numIncrements = numIncrements; +323 this.amount = amount; +324 setDaemon(true); +325 } +326 +327 @Override +328 public void run() { +329 for (int i = 0; i < numIncrements; i++) { +330 try { +331 Increment inc = new Increment(row); +332 inc.addColumn(fam1, qual1, amount); +333 inc.addColumn(fam1, qual2, amount*2); +334 inc.addColumn(fam2, qual3, amount*3); +335 inc.setDurability(Durability.ASYNC_WAL); +336 Result result = region.increment(inc); +337 if (result != null) { +338 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, +339 Bytes.toLong(result.getValue(fam1, qual2))); +340 assertTrue(result.getValue(fam2, qual3) != null); +341 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*3, +342 Bytes.toLong(result.getValue(fam2, qual3))); +343 assertEquals(Bytes.toLong(result.getValue(fam1, qual1))*2, +344 Bytes.toLong(result.getValue(fam1, qual2))); +345 long fam1Increment = Bytes.toLong(result.getValue(fam1, qual1))*3; +346 long fam2Increment = Bytes.toLong(result.getValue(fam2, qual3)); +347 assertEquals("fam1=" + fam1Increment + ", fam2=" + fam2Increment, +348 fam1Increment, fam2Increment); +349 } +350 } catch (IOException e) { +351 e.printStackTrace(); +352 } +353 } +354 } +355 } +356 +357 @Test +358 public void testAppendMultiThreads() throws IOException { +359 LOG.info("Starting test testAppendMultiThreads"); +360 // run a with mixed column families (1 and 3 versions) +361 initHRegion(tableName, name.getMethodName(), new int[] {1,3}, fam1, fam2); 362 -363 // wait for all threads to finish -364 for (int i = 0; i < numThreads; i++) { -365 try { -366 all[i].join(); -367 } catch (InterruptedException e) { -368 } -369 } -370 assertEquals(0, failures.get()); -371 Get g = new Get(row); -372 Result result = region.get(g); -373 assertEquals(10000, result.getValue(fam1, qual1).length); -374 assertEquals(10000, result.getValue(fam1, qual2).length); -375 assertEquals(10000, result.getValue(fam2, qual3).length); -376 } -377 /** -378 * Test multi-threaded row mutations. -379 */ -380 @Test -381 public void testRowMutationMultiThreads() throws IOException { -382 LOG.info("Starting test testRowMutationMultiThreads"); -383 initHRegion(tableName, name.getMethodName(), fam1); -384 -385 // create 10 threads, each will alternate between adding and -386 // removing a column -387 int numThreads = 10; -388 int opsPerThread = 250; -389 AtomicOperation[] all = new AtomicOperation[numThreads]; -390 -391 AtomicLong timeStamps = new AtomicLong(0); -392 AtomicInteger failures = new AtomicInteger(0); -393 // create all threads -394 for (int i = 0; i < numThreads; i++) { -395 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) { -396 @Override -397 public void run() { -398 boolean op = true; -399 for (int i=0; i<numOps; i++) { -400 try { -401 // throw in some flushes -402 if (i%10==0) { -403 synchronized(region) { -404 LOG.debug("flushing"); -405 region.flush(true); -406 if (i%100==0) { -407 region.compact(false); -408 } -409 } -410 } -411 long ts = timeStamps.incrementAndGet(); -412 RowMutations rm = new RowMutations(row); -413 if (op) { -414 Put p = new Put(row, ts); -415 p.addColumn(fam1, qual1, value1); -416 p.setDurability(Durability.ASYNC_WAL); -417 rm.add(p); -418 Delete d = new Delete(row); -419 d.addColumns(fam1, qual2, ts); -420 d.setDurability(Durability.ASYNC_WAL); -421 rm.add(d); -422 } else { -423 Delete d = new Delete(row); -424 d.addColumns(fam1, qual1, ts); -425 d.setDurability(Durability.ASYNC_WAL); -426 rm.add(d); -427 Put p = new Put(row, ts); -428 p.addColumn(fam1, qual2, value2); -429 p.setDurability(Durability.ASYNC_WAL); -430 rm.add(p); -431 } -432 region.mutateRow(rm); -433 op ^= true; -434 // check: should always see exactly one column -435 Get g = new Get(row); -436 Result r = region.get(g); -437 if (r.size() != 1) { -438 LOG.debug(Objects.toString(r)); -439 failures.incrementAndGet(); -440 fail(); -441 } -442 } catch (IOException e) { -443 e.printStackTrace(); -444 failures.incrementAndGet(); -445 fail(); -446 } -447 } -448 } -449 }; -450 } -451 -452 // run all threads -453 for (int i = 0; i < numThreads; i++) { -454 all[i].start(); -455 } -456 -457 // wait for all threads to finish -458 for (int i = 0; i < numThreads; i++) { -459 try { -460 all[i].join(); -461 } catch (InterruptedException e) { -462 } -463 } -464 assertEquals(0, failures.get()); -465 } -466 -467 -468 /** -469 * Test multi-threaded region mutations. -470 */ -471 @Test -472 public void testMultiRowMutationMultiThreads() throws IOException { -473 -474 LOG.info("Starting test testMultiRowMutationMultiThreads"); -475 initHRegion(tableName, name.getMethodName(), fam1); -476 -477 // create 10 threads, each will alternate between adding and -478 // removing a column -479 int numThreads = 10; -480 int opsPerThread = 250; -481 AtomicOperation[] all = new AtomicOperation[numThreads]; -482 -483 AtomicLong timeStamps = new AtomicLong(0); -484 AtomicInteger failures = new AtomicInteger(0); -485 final List<byte[]> rowsToLock = Arrays.asList(row, row2); -486 // create all threads -487 for (int i = 0; i < numThreads; i++) { -488 all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) { -489 @Override -490 public void run() { -491 boolean op = true; -492 for (int i=0; i<numOps; i++) { -493 try { -494 // throw in some flushes -495 if (i%10==0) { -496 synchronized(region) { -497 LOG.debug("flushing"); -498 region.flush(true); -499 if (i%100==0) { -500 region.compact(false); -501 } -502 } -503 } -504 long ts = timeStamps.incrementAndGet(); -505 List<Mutation> mrm = new ArrayList<>(); -506 if (op) { -507 Put p = new Put(row2, ts); -508 p.addColumn(fam1, qual1, value1); -509 p.setDurability(Durability.ASYNC_WAL); -510 mrm.add(p); -511 Delete d = new Delete(row); -512 d.addColumns(fam1, qual1, ts); -513 d.setDurability(Durability.ASYNC_WAL); -514 mrm.add(d); -515 } else { -516 Delete d = new Delete(row2); -517 d.addColumns(fam1, qual1, ts); -518 d.setDurability(Durability.ASYNC_WAL); -519 mrm.add(d); -520 Put p = new Put(row, ts); -521 p.setDurability(Durability.ASYNC_WAL); -522 p.addColumn(fam1, qual1, value2); -523 mrm.add(p); -524 } -525 region.mutateRowsWithLocks(mrm, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE); -526 op ^= true; -527 // check: should always see exactly one column -528 Scan s = new Scan(row); -529 RegionScanner rs = region.getScanner(s); -530 List<Cell> r = new ArrayList<>(); -531 while (rs.next(r)) -532 ; -533 rs.close(); -534 if (r.size() != 1) { -535 LOG.debug(Objects.toString(r)); -536 failures.incrementAndGet(); -537 fail(); -538 } -539 } catch (IOException e) { -540 e.printStackTrace(); -541 failures.incrementAndGet(); -542 fail(); -543 } -544 } -545 } -546 }; -547 } -548 -549 // run all threads -550 for (int i = 0; i < numThreads; i++) { -551 all[i].start(); -552 } -553 -554 // wait for all threads to finish -555 for (int i = 0; i < numThreads; i++) { -556 try { -557 all[i].join(); -558 } catch (InterruptedException e) { -559 } -560 } -561 assertEquals(0, failures.get()); -562 } -563 -564 public static class AtomicOperation extends Thread { -565 protected final HRegion region; -566 protected final int numOps; -567 protected final AtomicLong timeStamps; -568 protected final AtomicInteger failures; -569 protected final Random r = new Random(); -570 -571 public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps, -572 AtomicInteger failures) { -573 this.region = region; -574 this.numOps = numOps; -575 this.timeStamps = timeStamps; -576 this.failures = failures; -577 } -578 } -579 -580 private static CountDownLatch latch = new CountDownLatch(1); -581 private enum TestStep { -582 INIT, // initial put of 10 to set value of the cell -583 PUT_STARTED, // began doing a put of 50 to cell -584 PUT_COMPLETED, // put complete (released RowLock, but may not have advanced MVCC). -585 CHECKANDPUT_STARTED, // began checkAndPut: if 10 -> 11 -586 CHECKANDPUT_COMPLETED // completed checkAndPut -587 // NOTE: at the end of these steps, the value of the cell should be 50, not 11! -588 } -589 private static volatile TestStep testStep = TestStep.INIT;