Return-Path: Delivered-To: apmail-hadoop-hbase-commits-archive@minotaur.apache.org Received: (qmail 17767 invoked from network); 15 May 2010 00:19:04 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 15 May 2010 00:19:04 -0000 Received: (qmail 32308 invoked by uid 500); 15 May 2010 00:19:04 -0000 Delivered-To: apmail-hadoop-hbase-commits-archive@hadoop.apache.org Received: (qmail 32287 invoked by uid 500); 15 May 2010 00:19:04 -0000 Mailing-List: contact hbase-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hbase-dev@hadoop.apache.org Delivered-To: mailing list hbase-commits@hadoop.apache.org Received: (qmail 32280 invoked by uid 99); 15 May 2010 00:19:04 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 15 May 2010 00:19:04 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 15 May 2010 00:19:00 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 2821923888FE; Sat, 15 May 2010 00:18:38 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r944529 [2/2] - in /hadoop/hbase/trunk/core/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/filter/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/had... Date: Sat, 15 May 2010 00:18:37 -0000 To: hbase-commits@hadoop.apache.org From: rawson@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100515001838.2821923888FE@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java?rev=944529&r1=944528&r2=944529&view=diff ============================================================================== --- hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (original) +++ hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java Sat May 15 00:18:37 2010 @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicReference; import junit.framework.TestCase; @@ -47,10 +48,12 @@ public class TestMemStore extends TestCa private static final byte [] CONTENTS = Bytes.toBytes("contents"); private static final byte [] BASIC = Bytes.toBytes("basic"); private static final String CONTENTSTR = "contentstr"; + private ReadWriteConsistencyControl rwcc; @Override public void setUp() throws Exception { super.setUp(); + this.rwcc = new ReadWriteConsistencyControl(); this.memstore = new MemStore(); } @@ -76,6 +79,7 @@ public class TestMemStore extends TestCa KeyValueScanner [] memstorescanners = this.memstore.getScanners(); Scan scan = new Scan(); List result = new ArrayList(); + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); StoreScanner s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, this.memstore.comparator, null, memstorescanners); int count = 0; @@ -94,6 +98,8 @@ public class TestMemStore extends TestCa for (int i = 0; i < memstorescanners.length; i++) { memstorescanners[0].close(); } + + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); memstorescanners = this.memstore.getScanners(); // Now assert can count same number even if a snapshot mid-scan. s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP, @@ -138,9 +144,9 @@ public class TestMemStore extends TestCa if (count == snapshotIndex) { this.memstore.snapshot(); this.memstore.clearSnapshot(this.memstore.getSnapshot()); - // Added more rows into kvset. + // Added more rows into kvset. But the scanner wont see these rows. addRows(this.memstore, ts); - LOG.info("Snapshotted, cleared it and then added values"); + LOG.info("Snapshotted, cleared it and then added values (which wont be seen)"); } result.clear(); } @@ -151,6 +157,181 @@ public class TestMemStore extends TestCa } /** + * A simple test which verifies the 3 possible states when scanning across snapshot. + */ + public void testScanAcrossSnapshot2() { + // we are going to the scanning across snapshot with two kvs + // kv1 should always be returned before kv2 + final byte[] one = Bytes.toBytes(1); + final byte[] two = Bytes.toBytes(2); + final byte[] f = Bytes.toBytes("f"); + final byte[] q = Bytes.toBytes("q"); + final byte[] v = Bytes.toBytes(3); + + final KeyValue kv1 = new KeyValue(one, f, q, v); + final KeyValue kv2 = new KeyValue(two, f, q, v); + + // use case 1: both kvs in kvset + this.memstore.add(kv1.clone()); + this.memstore.add(kv2.clone()); + verifyScanAcrossSnapshot2(kv1, kv2); + + // use case 2: both kvs in snapshot + this.memstore.snapshot(); + verifyScanAcrossSnapshot2(kv1, kv2); + + // use case 3: first in snapshot second in kvset + this.memstore = new MemStore(); + this.memstore.add(kv1.clone()); + this.memstore.snapshot(); + this.memstore.add(kv2.clone()); + verifyScanAcrossSnapshot2(kv1, kv2); + } + + private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) { + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + KeyValueScanner[] memstorescanners = this.memstore.getScanners(); + assertEquals(1, memstorescanners.length); + final KeyValueScanner scanner = memstorescanners[0]; + scanner.seek(KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW)); + assertEquals(kv1, scanner.next()); + assertEquals(kv2, scanner.next()); + assertNull(scanner.next()); + } + + private void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected) { + scanner.seek(KeyValue.createFirstOnRow(new byte[]{})); + for (KeyValue kv : expected) { + assertTrue(0 == + KeyValue.COMPARATOR.compare(kv, + scanner.next())); + } + assertNull(scanner.peek()); + } + + public void testMemstoreConcurrentControl() { + final byte[] row = Bytes.toBytes(1); + final byte[] f = Bytes.toBytes("family"); + final byte[] q1 = Bytes.toBytes("q1"); + final byte[] q2 = Bytes.toBytes("q2"); + final byte[] v = Bytes.toBytes("value"); + + ReadWriteConsistencyControl.WriteEntry w = + rwcc.beginMemstoreInsert(); + + KeyValue kv1 = new KeyValue(row, f, q1, v); + kv1.setMemstoreTS(w.getWriteNumber()); + memstore.add(kv1); + + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + KeyValueScanner[] s = this.memstore.getScanners(); + assertScannerResults(s[0], new KeyValue[]{}); + + rwcc.completeMemstoreInsert(w); + + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + s = this.memstore.getScanners(); + assertScannerResults(s[0], new KeyValue[]{kv1}); + + w = rwcc.beginMemstoreInsert(); + KeyValue kv2 = new KeyValue(row, f, q2, v); + kv2.setMemstoreTS(w.getWriteNumber()); + memstore.add(kv2); + + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + s = this.memstore.getScanners(); + assertScannerResults(s[0], new KeyValue[]{kv1}); + + rwcc.completeMemstoreInsert(w); + + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + s = this.memstore.getScanners(); + assertScannerResults(s[0], new KeyValue[]{kv1, kv2}); + } + + private static class ReadOwnWritesTester extends Thread { + final int id; + static final int NUM_TRIES = 1000; + + final byte[] row; + + final byte[] f = Bytes.toBytes("family"); + final byte[] q1 = Bytes.toBytes("q1"); + + final ReadWriteConsistencyControl rwcc; + final MemStore memstore; + + AtomicReference caughtException; + + + public ReadOwnWritesTester(int id, + MemStore memstore, + ReadWriteConsistencyControl rwcc, + AtomicReference caughtException) + { + this.id = id; + this.rwcc = rwcc; + this.memstore = memstore; + this.caughtException = caughtException; + row = Bytes.toBytes(id); + } + + public void run() { + try { + internalRun(); + } catch (Throwable t) { + caughtException.compareAndSet(null, t); + } + } + + private void internalRun() { + for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) { + ReadWriteConsistencyControl.WriteEntry w = + rwcc.beginMemstoreInsert(); + + // Insert the sequence value (i) + byte[] v = Bytes.toBytes(i); + + KeyValue kv = new KeyValue(row, f, q1, i, v); + kv.setMemstoreTS(w.getWriteNumber()); + memstore.add(kv); + rwcc.completeMemstoreInsert(w); + + // Assert that we can read back + + KeyValueScanner s = this.memstore.getScanners()[0]; + s.seek(kv); + + KeyValue ret = s.next(); + assertNotNull("Didnt find own write at all", ret); + assertEquals("Didnt read own writes", + kv.getTimestamp(), ret.getTimestamp()); + } + } + } + + public void no_testReadOwnWritesUnderConcurrency() throws Throwable { + + int NUM_THREADS = 8; + + ReadOwnWritesTester threads[] = new ReadOwnWritesTester[NUM_THREADS]; + AtomicReference caught = new AtomicReference(); + + for (int i = 0; i < NUM_THREADS; i++) { + threads[i] = new ReadOwnWritesTester(i, memstore, rwcc, caught); + threads[i].start(); + } + + for (int i = 0; i < NUM_THREADS; i++) { + threads[i].join(); + } + + if (caught.get() != null) { + throw caught.get(); + } + } + + /** * Test memstore snapshots * @throws IOException */ @@ -443,9 +624,10 @@ public class TestMemStore extends TestCa List expected = new ArrayList(); expected.add(put3); expected.add(del2); + expected.add(put2); expected.add(put1); - assertEquals(3, memstore.kvset.size()); + assertEquals(4, memstore.kvset.size()); int i = 0; for(KeyValue kv : memstore.kvset) { assertEquals(expected.get(i++), kv); @@ -477,8 +659,11 @@ public class TestMemStore extends TestCa List expected = new ArrayList(); expected.add(put3); expected.add(del2); + expected.add(put2); + expected.add(put1); - assertEquals(2, memstore.kvset.size()); + + assertEquals(4, memstore.kvset.size()); int i = 0; for (KeyValue kv: memstore.kvset) { assertEquals(expected.get(i++), kv); @@ -511,9 +696,14 @@ public class TestMemStore extends TestCa List expected = new ArrayList(); expected.add(del); + expected.add(put1); + expected.add(put2); expected.add(put4); + expected.add(put3); - assertEquals(2, memstore.kvset.size()); + + + assertEquals(5, memstore.kvset.size()); int i = 0; for (KeyValue kv: memstore.kvset) { assertEquals(expected.get(i++), kv); @@ -529,7 +719,7 @@ public class TestMemStore extends TestCa memstore.add(new KeyValue(row, fam, qf, ts, val)); KeyValue delete = new KeyValue(row, fam, qf, ts, KeyValue.Type.Delete, val); memstore.delete(delete); - assertEquals(1, memstore.kvset.size()); + assertEquals(2, memstore.kvset.size()); assertEquals(delete, memstore.kvset.first()); } @@ -542,7 +732,7 @@ public class TestMemStore extends TestCa "row1", "fam", "a", 100, KeyValue.Type.Delete, "dont-care"); memstore.delete(delete); - assertEquals(1, memstore.kvset.size()); + assertEquals(2, memstore.kvset.size()); assertEquals(delete, memstore.kvset.first()); } public void testRetainsDeleteColumn() throws IOException { @@ -554,7 +744,7 @@ public class TestMemStore extends TestCa KeyValue.Type.DeleteColumn, "dont-care"); memstore.delete(delete); - assertEquals(1, memstore.kvset.size()); + assertEquals(2, memstore.kvset.size()); assertEquals(delete, memstore.kvset.first()); } public void testRetainsDeleteFamily() throws IOException { @@ -566,21 +756,21 @@ public class TestMemStore extends TestCa KeyValue.Type.DeleteFamily, "dont-care"); memstore.delete(delete); - assertEquals(1, memstore.kvset.size()); + assertEquals(2, memstore.kvset.size()); assertEquals(delete, memstore.kvset.first()); } ////////////////////////////////////////////////////////////////////////////// // Helpers - ////////////////////////////////////////////////////////////////////////////// - private byte [] makeQualifier(final int i1, final int i2){ + ////////////////////////////////////////////////////////////////////////////// + private static byte [] makeQualifier(final int i1, final int i2){ return Bytes.toBytes(Integer.toString(i1) + ";" + Integer.toString(i2)); } /** - * Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT} + * Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT} * @param hmc Instance to add rows to. * @return How many rows we added. * @throws IOException @@ -590,7 +780,7 @@ public class TestMemStore extends TestCa } /** - * Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT} + * Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT} * @param hmc Instance to add rows to. * @return How many rows we added. * @throws IOException @@ -644,4 +834,57 @@ public class TestMemStore extends TestCa return new KeyValue(row, Bytes.toBytes("test_col"), null, HConstants.LATEST_TIMESTAMP, value); } + private static void addRows(int count, final MemStore mem) { + long nanos = System.nanoTime(); + + for (int i = 0 ; i < count ; i++) { + if (i % 1000 == 0) { + + System.out.println(i + " Took for 1k usec: " + (System.nanoTime() - nanos)/1000); + nanos = System.nanoTime(); + } + long timestamp = System.currentTimeMillis(); + + for (int ii = 0; ii < QUALIFIER_COUNT ; ii++) { + byte [] row = Bytes.toBytes(i); + byte [] qf = makeQualifier(i, ii); + mem.add(new KeyValue(row, FAMILY, qf, timestamp, qf)); + } + } + } + + + static void doScan(MemStore ms, int iteration) { + long nanos = System.nanoTime(); + KeyValueScanner [] ss = ms.getScanners(); + KeyValueScanner s = ss[0]; + s.seek(KeyValue.createFirstOnRow(new byte[]{})); + + System.out.println(iteration + " create/seek took: " + (System.nanoTime() - nanos)/1000); + int cnt=0; + while(s.next() != null) ++cnt; + + System.out.println(iteration + " took usec: " + (System.nanoTime() - nanos)/1000 + " for: " + cnt); + + } + + public static void main(String [] args) { + ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl(); + MemStore ms = new MemStore(); + + long n1 = System.nanoTime(); + addRows(25000, ms); + System.out.println("Took for insert: " + (System.nanoTime()-n1)/1000); + + + System.out.println("foo"); + + ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); + + for (int i = 0 ; i < 50 ; i++) + doScan(ms, i); + + } + + } Added: hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java?rev=944529&view=auto ============================================================================== --- hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java (added) +++ hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestReadWriteConsistencyControl.java Sat May 15 00:18:37 2010 @@ -0,0 +1,109 @@ +package org.apache.hadoop.hbase.regionserver; + +import junit.framework.TestCase; + +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class TestReadWriteConsistencyControl extends TestCase { + static class Writer implements Runnable { + final AtomicBoolean finished; + final ReadWriteConsistencyControl rwcc; + final AtomicBoolean status; + + Writer(AtomicBoolean finished, ReadWriteConsistencyControl rwcc, AtomicBoolean status) { + this.finished = finished; + this.rwcc = rwcc; + this.status = status; + } + private Random rnd = new Random(); + public boolean failed = false; + + public void run() { + while (!finished.get()) { + ReadWriteConsistencyControl.WriteEntry e = rwcc.beginMemstoreInsert(); +// System.out.println("Begin write: " + e.getWriteNumber()); + // 10 usec - 500usec (including 0) + int sleepTime = rnd.nextInt(500); + // 500 * 1000 = 500,000ns = 500 usec + // 1 * 100 = 100ns = 1usec + try { + if (sleepTime > 0) + Thread.sleep(0, sleepTime * 1000); + } catch (InterruptedException e1) { + } + try { + rwcc.completeMemstoreInsert(e); + } catch (RuntimeException ex) { + // got failure + System.out.println(ex.toString()); + ex.printStackTrace(); + status.set(false); + return; + // Report failure if possible. + } + } + } + } + + public void testParallelism() throws Exception { + final ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl(); + + final AtomicBoolean finished = new AtomicBoolean(false); + + // fail flag for the reader thread + final AtomicBoolean readerFailed = new AtomicBoolean(false); + final AtomicLong failedAt = new AtomicLong(); + Runnable reader = new Runnable() { + public void run() { + long prev = rwcc.memstoreReadPoint(); + while (!finished.get()) { + long newPrev = rwcc.memstoreReadPoint(); + if (newPrev < prev) { + // serious problem. + System.out.println("Reader got out of order, prev: " + + prev + " next was: " + newPrev); + readerFailed.set(true); + // might as well give up + failedAt.set(newPrev); + return; + } + } + } + }; + + // writer thread parallelism. + int n = 20; + Thread [] writers = new Thread[n]; + AtomicBoolean [] statuses = new AtomicBoolean[n]; + Thread readThread = new Thread(reader); + + for (int i = 0 ; i < n ; ++i ) { + statuses[i] = new AtomicBoolean(true); + writers[i] = new Thread(new Writer(finished, rwcc, statuses[i])); + writers[i].start(); + } + readThread.start(); + + try { + Thread.sleep(10 * 1000); + } catch (InterruptedException ex) { + } + + finished.set(true); + + readThread.join(); + for (int i = 0; i < n; ++i) { + writers[i].join(); + } + + // check failure. + assertFalse(readerFailed.get()); + for (int i = 0; i < n; ++i) { + assertTrue(statuses[i].get()); + } + + + } +} Modified: hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java?rev=944529&r1=944528&r2=944529&view=diff ============================================================================== --- hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java (original) +++ hadoop/hbase/trunk/core/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java Sat May 15 00:18:37 2010 @@ -1,5 +1,5 @@ /* - * Copyright 2009 The Apache Software Foundation + * Copyright 2010 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -20,24 +20,23 @@ package org.apache.hadoop.hbase.regionserver; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.NavigableSet; -import java.util.TreeSet; - import junit.framework.TestCase; - import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueTestUtil; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableSet; +import java.util.TreeSet; + public class TestStoreScanner extends TestCase { private static final String CF_STR = "cf"; final byte [] CF = Bytes.toBytes(CF_STR); - /** + /* * Test utility for building a NavigableSet for scanners. * @param strCols * @return @@ -128,7 +127,7 @@ public class TestStoreScanner extends Te assertEquals(kvs[0], results.get(0)); } - /** + /* * Test test shows exactly how the matcher's return codes confuses the StoreScanner * and prevent it from doing the right thing. Seeking once, then nexting twice * should return R1, then R2, but in this case it doesnt. @@ -189,7 +188,7 @@ public class TestStoreScanner extends Te assertEquals(0, results.size()); } - /** + /* * Test the case where there is a delete row 'in front of' the next row, the scanner * will move to the next row. */ @@ -407,9 +406,9 @@ public class TestStoreScanner extends Te results.clear(); assertEquals(false, scan.next(results)); } - - /** - * Test expiration of KeyValues in combination with a configured TTL for + + /* + * Test expiration of KeyValues in combination with a configured TTL for * a column family (as should be triggered in a major compaction). */ public void testWildCardTtlScan() throws IOException {