hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r934210 [2/2] - in /hadoop/hbase/branches/0.20_pre_durability: ./ src/contrib/indexed/ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/filter/ src/java/org/apache/hadoop/hbase/regi...
Date Wed, 14 Apr 2010 21:52:27 GMT
Modified: hadoop/hbase/branches/0.20_pre_durability/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_pre_durability/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java?rev=934210&r1=934209&r2=934210&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_pre_durability/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java
(original)
+++ hadoop/hbase/branches/0.20_pre_durability/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java
Wed Apr 14 21:52:26 2010
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
 
 import junit.framework.TestCase;
 
@@ -46,10 +47,12 @@ public class TestMemStore extends TestCa
   private static final byte [] FAMILY = Bytes.toBytes("column");
   private static final byte [] CONTENTS_BASIC = Bytes.toBytes("contents:basic");
   private static final String CONTENTSTR = "contentstr";
+  private ReadWriteConsistencyControl rwcc;
 
   @Override
   public void setUp() throws Exception {
     super.setUp();
+    this.rwcc = new ReadWriteConsistencyControl();
     this.memstore = new MemStore();
   }
 
@@ -75,6 +78,7 @@ public class TestMemStore extends TestCa
     KeyValueScanner [] memstorescanners = this.memstore.getScanners();
     Scan scan = new Scan();
     List<KeyValue> result = new ArrayList<KeyValue>();
+    ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
     StoreScanner s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
       this.memstore.comparator, null, memstorescanners);
     int count = 0;
@@ -93,6 +97,8 @@ public class TestMemStore extends TestCa
     for (int i = 0; i < memstorescanners.length; i++) {
       memstorescanners[0].close();
     }
+
+    ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
     memstorescanners = this.memstore.getScanners();
     // Now assert can count same number even if a snapshot mid-scan.
     s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
@@ -137,9 +143,9 @@ public class TestMemStore extends TestCa
         if (count == snapshotIndex) {
           this.memstore.snapshot();
           this.memstore.clearSnapshot(this.memstore.getSnapshot());
-          // Added more rows into kvset.
+          // Added more rows into kvset.  But the scanner wont see these rows.
           addRows(this.memstore, ts);
-          LOG.info("Snapshotted, cleared it and then added values");
+          LOG.info("Snapshotted, cleared it and then added values (which wont be seen)");
         }
         result.clear();
       }
@@ -149,6 +155,181 @@ public class TestMemStore extends TestCa
     assertEquals(rowCount, count);
   }
 
+  /**
+   * A simple test which verifies the 3 possible states when scanning across snapshot.
+   */
+  public void testScanAcrossSnapshot2() {
+    // we are going to the scanning across snapshot with two kvs
+    // kv1 should always be returned before kv2
+    final byte[] one = Bytes.toBytes(1);
+    final byte[] two = Bytes.toBytes(2);
+    final byte[] f = Bytes.toBytes("f");
+    final byte[] q = Bytes.toBytes("q");
+    final byte[] v = Bytes.toBytes(3);
+
+    final KeyValue kv1 = new KeyValue(one, f, q, v);
+    final KeyValue kv2 = new KeyValue(two, f, q, v);
+
+    // use case 1: both kvs in kvset
+    this.memstore.add(kv1.clone());
+    this.memstore.add(kv2.clone());
+    verifyScanAcrossSnapshot2(kv1, kv2);
+
+    // use case 2: both kvs in snapshot
+    this.memstore.snapshot();
+    verifyScanAcrossSnapshot2(kv1, kv2);
+
+    // use case 3: first in snapshot second in kvset
+    this.memstore = new MemStore();
+    this.memstore.add(kv1.clone());
+    this.memstore.snapshot();
+    this.memstore.add(kv2.clone());
+    verifyScanAcrossSnapshot2(kv1, kv2);
+  }
+
+  private void verifyScanAcrossSnapshot2(KeyValue kv1, KeyValue kv2) {
+    ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+    KeyValueScanner[] memstorescanners = this.memstore.getScanners();
+    assertEquals(1, memstorescanners.length);
+    final KeyValueScanner scanner = memstorescanners[0];
+    scanner.seek(KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW));
+    assertEquals(kv1, scanner.next());
+    assertEquals(kv2, scanner.next());
+    assertNull(scanner.next());
+  }
+
+  private void assertScannerResults(KeyValueScanner scanner, KeyValue[] expected) {
+    scanner.seek(KeyValue.createFirstOnRow(new byte[]{}));
+    for (KeyValue kv : expected) {
+      assertTrue(0 ==
+          KeyValue.COMPARATOR.compare(kv,
+              scanner.next()));
+    }
+    assertNull(scanner.peek());
+  }
+
+  public void testMemstoreConcurrentControl() {
+    final byte[] row = Bytes.toBytes(1);
+    final byte[] f = Bytes.toBytes("family");
+    final byte[] q1 = Bytes.toBytes("q1");
+    final byte[] q2 = Bytes.toBytes("q2");
+    final byte[] v = Bytes.toBytes("value");
+
+    ReadWriteConsistencyControl.WriteEntry w =
+        rwcc.beginMemstoreInsert();
+
+    KeyValue kv1 = new KeyValue(row, f, q1, v);
+    kv1.setMemstoreTS(w.getWriteNumber());
+    memstore.add(kv1);
+
+    ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+    KeyValueScanner[] s = this.memstore.getScanners();
+    assertScannerResults(s[0], new KeyValue[]{});
+
+    rwcc.completeMemstoreInsert(w);
+
+    ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+    s = this.memstore.getScanners();
+    assertScannerResults(s[0], new KeyValue[]{kv1});
+
+    w = rwcc.beginMemstoreInsert();
+    KeyValue kv2 = new KeyValue(row, f, q2, v);
+    kv2.setMemstoreTS(w.getWriteNumber());
+    memstore.add(kv2);
+
+    ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+    s = this.memstore.getScanners();
+    assertScannerResults(s[0], new KeyValue[]{kv1});
+
+    rwcc.completeMemstoreInsert(w);
+
+    ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+    s = this.memstore.getScanners();
+    assertScannerResults(s[0], new KeyValue[]{kv1, kv2});
+  }
+
+  private static class ReadOwnWritesTester extends Thread {
+    final int id;
+    static final int NUM_TRIES = 1000;
+
+    final byte[] row;
+
+    final byte[] f = Bytes.toBytes("family");
+    final byte[] q1 = Bytes.toBytes("q1");
+
+    final ReadWriteConsistencyControl rwcc;
+    final MemStore memstore;
+
+    AtomicReference<Throwable> caughtException;
+
+
+    public ReadOwnWritesTester(int id,
+                               MemStore memstore,
+                               ReadWriteConsistencyControl rwcc,
+                               AtomicReference<Throwable> caughtException)
+    {
+      this.id = id;
+      this.rwcc = rwcc;
+      this.memstore = memstore;
+      this.caughtException = caughtException;
+      row = Bytes.toBytes(id);
+    }
+
+    public void run() {
+      try {
+        internalRun();
+      } catch (Throwable t) {
+        caughtException.compareAndSet(null, t);
+      }
+    }
+
+    private void internalRun() {
+      for (long i = 0; i < NUM_TRIES && caughtException.get() == null; i++) {
+        ReadWriteConsistencyControl.WriteEntry w =
+          rwcc.beginMemstoreInsert();
+
+        // Insert the sequence value (i)
+        byte[] v = Bytes.toBytes(i);
+
+        KeyValue kv = new KeyValue(row, f, q1, i, v);
+        kv.setMemstoreTS(w.getWriteNumber());
+        memstore.add(kv);
+        rwcc.completeMemstoreInsert(w);
+
+        // Assert that we can read back
+
+        KeyValueScanner s = this.memstore.getScanners()[0];
+        s.seek(kv);
+
+        KeyValue ret = s.next();
+        assertNotNull("Didnt find own write at all", ret);
+        assertEquals("Didnt read own writes",
+                     kv.getTimestamp(), ret.getTimestamp());
+      }
+    }
+  }
+
+  public void no_testReadOwnWritesUnderConcurrency() throws Throwable {
+
+    int NUM_THREADS = 8;
+
+    ReadOwnWritesTester threads[] = new ReadOwnWritesTester[NUM_THREADS];
+    AtomicReference<Throwable> caught = new AtomicReference<Throwable>();
+
+    for (int i = 0; i < NUM_THREADS; i++) {
+      threads[i] = new ReadOwnWritesTester(i, memstore, rwcc, caught);
+      threads[i].start();
+    }
+
+    for (int i = 0; i < NUM_THREADS; i++) {
+      threads[i].join();
+    }
+
+    if (caught.get() != null) {
+      throw caught.get();
+    }
+  }
+
   /** 
    * Test memstore snapshots
    * @throws IOException
@@ -442,9 +623,10 @@ public class TestMemStore extends TestCa
     List<KeyValue> expected = new ArrayList<KeyValue>();
     expected.add(put3);
     expected.add(del2);
+    expected.add(put2);
     expected.add(put1);
-    
-    assertEquals(3, memstore.kvset.size());
+
+    assertEquals(4, memstore.kvset.size());
     int i = 0;
     for(KeyValue kv : memstore.kvset) {
       assertEquals(expected.get(i++), kv);
@@ -476,8 +658,11 @@ public class TestMemStore extends TestCa
     List<KeyValue> expected = new ArrayList<KeyValue>();
     expected.add(put3);
     expected.add(del2);
+    expected.add(put2);
+    expected.add(put1);
+
     
-    assertEquals(2, memstore.kvset.size());
+    assertEquals(4, memstore.kvset.size());
     int i = 0;
     for (KeyValue kv: memstore.kvset) {
       assertEquals(expected.get(i++), kv);
@@ -510,9 +695,14 @@ public class TestMemStore extends TestCa
 
     List<KeyValue> expected = new ArrayList<KeyValue>();
     expected.add(del);
+    expected.add(put1);
+    expected.add(put2);
     expected.add(put4);
+    expected.add(put3);
+
+
     
-    assertEquals(2, memstore.kvset.size());
+    assertEquals(5, memstore.kvset.size());
     int i = 0;
     for (KeyValue kv: memstore.kvset) {
       assertEquals(expected.get(i++), kv);
@@ -528,7 +718,7 @@ public class TestMemStore extends TestCa
     memstore.add(new KeyValue(row, fam, qf, ts, val));
     KeyValue delete = new KeyValue(row, fam, qf, ts, KeyValue.Type.Delete, val);
     memstore.delete(delete);
-    assertEquals(1, memstore.kvset.size());
+    assertEquals(2, memstore.kvset.size());
     assertEquals(delete, memstore.kvset.first());
   }
 
@@ -541,7 +731,7 @@ public class TestMemStore extends TestCa
         "row1", "fam", "a", 100, KeyValue.Type.Delete, "dont-care");
     memstore.delete(delete);
 
-    assertEquals(1, memstore.kvset.size());
+    assertEquals(2, memstore.kvset.size());
     assertEquals(delete, memstore.kvset.first());
   }
   public void testRetainsDeleteColumn() throws IOException {
@@ -553,7 +743,7 @@ public class TestMemStore extends TestCa
         KeyValue.Type.DeleteColumn, "dont-care");
     memstore.delete(delete);
 
-    assertEquals(1, memstore.kvset.size());
+    assertEquals(2, memstore.kvset.size());
     assertEquals(delete, memstore.kvset.first());
   }
   public void testRetainsDeleteFamily() throws IOException {
@@ -565,7 +755,7 @@ public class TestMemStore extends TestCa
         KeyValue.Type.DeleteFamily, "dont-care");
     memstore.delete(delete);
 
-    assertEquals(1, memstore.kvset.size());
+    assertEquals(2, memstore.kvset.size());
     assertEquals(delete, memstore.kvset.first());
   }
 
@@ -573,13 +763,13 @@ public class TestMemStore extends TestCa
   //////////////////////////////////////////////////////////////////////////////
   // Helpers
   //////////////////////////////////////////////////////////////////////////////  
-  private byte [] makeQualifier(final int i1, final int i2){
+  private static byte [] makeQualifier(final int i1, final int i2){
     return Bytes.toBytes(Integer.toString(i1) + ";" +
         Integer.toString(i2));
   }
   
   /**
-   * Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT}
+   * Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT}
    * @param hmc Instance to add rows to.
    * @return How many rows we added.
    * @throws IOException 
@@ -589,7 +779,7 @@ public class TestMemStore extends TestCa
   }
   
   /**
-   * Adds {@link #ROW_COUNT} rows and {@link #COLUMNS_COUNT}
+   * Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT}
    * @param hmc Instance to add rows to.
    * @return How many rows we added.
    * @throws IOException 
@@ -643,4 +833,57 @@ public class TestMemStore extends TestCa
     return new KeyValue(row, Bytes.toBytes("test_col:"),
       HConstants.LATEST_TIMESTAMP, value);
   }
+  private static void addRows(int count, final MemStore mem) {
+    long nanos = System.nanoTime();
+
+    for (int i = 0 ; i < count ; i++) {
+      if (i % 1000 == 0) {
+
+        System.out.println(i + " Took for 1k usec: " + (System.nanoTime() - nanos)/1000);
+        nanos = System.nanoTime();
+      }
+      long timestamp = System.currentTimeMillis();
+
+      for (int ii = 0; ii < QUALIFIER_COUNT ; ii++) {
+        byte [] row = Bytes.toBytes(i);
+        byte [] qf = makeQualifier(i, ii);
+        mem.add(new KeyValue(row, FAMILY, qf, timestamp, qf));
+      }
+    }
+  }
+
+
+  static void doScan(MemStore ms, int iteration) {
+    long nanos = System.nanoTime();
+    KeyValueScanner [] ss = ms.getScanners();
+    KeyValueScanner s = ss[0];
+    s.seek(KeyValue.createFirstOnRow(new byte[]{}));
+
+    System.out.println(iteration + " create/seek took: " + (System.nanoTime() - nanos)/1000);
+    int cnt=0;
+    while(s.next() != null) ++cnt;
+
+    System.out.println(iteration + " took usec: " + (System.nanoTime() - nanos)/1000 + "
for: " + cnt);
+
+  }
+
+  public static void main(String [] args) {
+    ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl();
+    MemStore ms = new MemStore();
+
+    long n1 = System.nanoTime();
+    addRows(25000, ms);
+    System.out.println("Took for insert: " + (System.nanoTime()-n1)/1000);
+
+
+    System.out.println("foo");
+
+    ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
+
+    for (int i = 0 ; i < 50 ; i++)
+      doScan(ms, i);
+
+  }
+
+  
 }

Modified: hadoop/hbase/branches/0.20_pre_durability/src/test/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/branches/0.20_pre_durability/src/test/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java?rev=934210&r1=934209&r2=934210&view=diff
==============================================================================
--- hadoop/hbase/branches/0.20_pre_durability/src/test/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
(original)
+++ hadoop/hbase/branches/0.20_pre_durability/src/test/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
Wed Apr 14 21:52:26 2010
@@ -1,5 +1,5 @@
 /*
- * Copyright 2009 The Apache Software Foundation
+ * Copyright 2010 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -20,24 +20,23 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NavigableSet;
-import java.util.TreeSet;
-
 import junit.framework.TestCase;
-
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueTestUtil;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
 public class TestStoreScanner extends TestCase {
   private final String CF_STR = "cf";
   final byte [] CF = Bytes.toBytes(CF_STR);
 
-  /**
+  /*
    * Test utility for building a NavigableSet for scanners.
    * @param strCols
    * @return
@@ -128,7 +127,7 @@ public class TestStoreScanner extends Te
     assertEquals(kvs[0], results.get(0));
   }
 
-  /**
+  /*
    * Test test shows exactly how the matcher's return codes confuses the StoreScanner
    * and prevent it from doing the right thing.  Seeking once, then nexting twice
    * should return R1, then R2, but in this case it doesnt.
@@ -189,7 +188,7 @@ public class TestStoreScanner extends Te
     assertEquals(0, results.size());
   }
 
-  /**
+  /*
    * Test the case where there is a delete row 'in front of' the next row, the scanner
    * will move to the next row.
    */
@@ -408,7 +407,7 @@ public class TestStoreScanner extends Te
     assertEquals(false, scan.next(results));
   }
   
-  /**
+  /*
    * Test expiration of KeyValues in combination with a configured TTL for 
    * a column family (as should be triggered in a major compaction).
    */



Mime
View raw message