hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ramkris...@apache.org
Subject svn commit: r1356381 - in /hbase/branches/0.92/src: main/java/org/apache/hadoop/hbase/regionserver/HRegion.java test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Date Mon, 02 Jul 2012 18:00:29 GMT
Author: ramkrishna
Date: Mon Jul  2 18:00:28 2012
New Revision: 1356381

URL: http://svn.apache.org/viewvc?rev=1356381&view=rev
Log:
HBASE-6297 Backport HBASE-6195 to 0.92 (Ram)

Submitted by:Ram	
Reviewed by:Stack	

Modified:
    hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Modified: hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1356381&r1=1356380&r2=1356381&view=diff
==============================================================================
--- hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.92/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Mon
Jul  2 18:00:28 2012
@@ -3827,8 +3827,8 @@ public class HRegion implements HeapSize
     boolean flush = false;
     WALEdit walEdits = null;
     List<KeyValue> allKVs = new ArrayList<KeyValue>(increment.numColumns());
-    List<KeyValue> kvs = new ArrayList<KeyValue>(increment.numColumns());
-    long now = EnvironmentEdgeManager.currentTimeMillis();
+    Map<Store, List<KeyValue>> tempMemstore = new HashMap<Store, List<KeyValue>>();
+    long before = EnvironmentEdgeManager.currentTimeMillis();
     long size = 0;
 
     // Lock row
@@ -3838,11 +3838,13 @@ public class HRegion implements HeapSize
       Integer lid = getLock(lockid, row, true);
       this.updatesLock.readLock().lock();
       try {
+        long now = EnvironmentEdgeManager.currentTimeMillis();
         // Process each family
         for (Map.Entry<byte [], NavigableMap<byte [], Long>> family :
           increment.getFamilyMap().entrySet()) {
 
           Store store = stores.get(family.getKey());
+          List<KeyValue> kvs = new ArrayList<KeyValue>(family.getValue().size());
 
           // Get previous values for all columns in this family
           Get get = new Get(row);
@@ -3877,10 +3879,8 @@ public class HRegion implements HeapSize
             }
           }
 
-          // Write the KVs for this family into the store
-          size += store.upsert(kvs);
-          allKVs.addAll(kvs);
-          kvs.clear();
+          // store the kvs to the temporary memstore before writing HLog
+          tempMemstore.put(store, kvs);
         }
 
         // Actually write to WAL now
@@ -3889,10 +3889,16 @@ public class HRegion implements HeapSize
           // cluster. A slave cluster receives the final value (not the delta)
           // as a Put.
           this.log.append(regionInfo, this.htableDescriptor.getName(),
-              walEdits, HConstants.DEFAULT_CLUSTER_ID, now,
+              walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(),
               this.htableDescriptor);
         }
 
+        // Actually write to Memstore now
+        for (Map.Entry<Store, List<KeyValue>> entry : tempMemstore.entrySet())
{
+          Store store = entry.getKey();
+          size += store.upsert(entry.getValue());
+          allKVs.addAll(entry.getValue());
+        }
         size = this.addAndGetGlobalMemstoreSize(size);
         flush = isFlushSize(size);
       } finally {

Modified: hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1356381&r1=1356380&r2=1356381&view=diff
==============================================================================
--- hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
(original)
+++ hbase/branches/0.92/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
Mon Jul  2 18:00:28 2012
@@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.MiniHBase
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
@@ -3419,6 +3420,98 @@ public class TestHRegion extends HBaseTe
       this.region = null;
     }
   }
+  
+  /**
+   * TestCase for increment
+   *
+   */
+  private static class Incrementer implements Runnable {
+    private HRegion region;
+    private final static byte[] incRow = Bytes.toBytes("incRow");
+    private final static byte[] family = Bytes.toBytes("family");
+    private final static byte[] qualifier = Bytes.toBytes("qualifier");
+    private final static long ONE = 1l;
+    private int incCounter;
+
+    public Incrementer(HRegion region, int incCounter) {
+      this.region = region;
+      this.incCounter = incCounter;
+    }
+
+    @Override
+    public void run() {
+      int count = 0;
+      while (count < incCounter) {
+        Increment inc = new Increment(incRow);
+        inc.addColumn(family, qualifier, ONE);
+        count++;
+        try {
+          region.increment(inc, null, true);
+        } catch (IOException e) {
+          e.printStackTrace();
+          break;
+        }
+      }
+    }
+  }
+  
+  /**
+   * Test case to check increment function with memstore flushing
+   * @throws Exception
+   */
+  @Test
+  public void testParallelIncrementWithMemStoreFlush() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    String method = "testParallelIncrementWithMemStoreFlush";
+    byte[] tableName = Bytes.toBytes(method);
+    byte[] family = Incrementer.family;
+    this.region = initHRegion(tableName, method, conf, family);
+    final HRegion region = this.region;
+    final AtomicBoolean incrementDone = new AtomicBoolean(false);
+    Runnable reader = new Runnable() {
+      @Override
+      public void run() {
+        while (!incrementDone.get()) {
+          try {
+            region.flushcache();
+          } catch (Exception e) {
+            e.printStackTrace();
+          }
+        }
+      }
+    };
+
+    //after all increment finished, the row will increment to 20*100 = 2000
+    int threadNum = 20;
+    int incCounter = 100;
+    long expected = threadNum * incCounter;
+    Thread[] incrementers = new Thread[threadNum];
+    Thread flushThread = new Thread(reader);
+    for (int i = 0; i < threadNum; i++) {
+      incrementers[i] = new Thread(new Incrementer(this.region, incCounter));
+      incrementers[i].start();
+    }
+    flushThread.start();
+    for (int i = 0; i < threadNum; i++) {
+      incrementers[i].join();
+    }
+
+    incrementDone.set(true);
+    flushThread.join();
+
+    Get get = new Get(Incrementer.incRow);
+    get.addColumn(Incrementer.family, Incrementer.qualifier);
+    get.setMaxVersions(1);
+    Result res = this.region.get(get, null);
+    List<KeyValue> kvs = res.getColumn(Incrementer.family,
+        Incrementer.qualifier);
+    
+    //we just got the latest version
+    assertEquals(kvs.size(), 1);
+    KeyValue kv = kvs.get(0);
+    assertEquals(expected, Bytes.toLong(kv.getBuffer(), kv.getValueOffset()));
+    this.region = null;
+  }
 
   private void putData(int startRow, int numRows, byte [] qf,
       byte [] ...families)



Mime
View raw message