hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r814038 - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/regionserver/ src/test/org/apache/hadoop/hbase/regionserver/
Date Fri, 11 Sep 2009 21:29:08 GMT
Author: stack
Date: Fri Sep 11 21:28:36 2009
New Revision: 814038

URL: http://svn.apache.org/viewvc?rev=814038&view=rev
Log:
HBASE-1740  ICV has a subtle race condition only visible under high load

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=814038&r1=814037&r2=814038&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Fri Sep 11 21:28:36 2009
@@ -18,6 +18,7 @@
                get tossed as 'duplicates'
    HBASE-1794  recovered log files are not inserted into the storefile map
    HBASE-1824  [stargate] default timestamp should be LATEST_TIMESTAMP
+   HBASE-1740  ICV has a subtle race condition only visible under high load
 
   IMPROVEMENTS
    HBASE-1760  Cleanup TODOs in HTable

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=814038&r1=814037&r2=814038&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Sep
11 21:28:36 2009
@@ -2317,25 +2317,47 @@
     boolean flush = false;
     // Lock row
     Integer lid = obtainRowLock(row);
-    long result = 0L;
+    long result = amount;
     try {
       Store store = stores.get(family);
-      // Determine what to do and perform increment on returned KV, no insertion 
-      Store.ICVResult vas =
-        store.incrementColumnValue(row, family, qualifier, amount);
-      // Write incremented value to WAL before inserting
+
+      // Get the old value:
+      Get get = new Get(row);
+      get.addColumn(family, qualifier);
+      List<KeyValue> results = new ArrayList<KeyValue>();
+      NavigableSet<byte[]> qualifiers = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+      qualifiers.add(qualifier);
+      store.get(get, qualifiers, results);
+
+      if (!results.isEmpty()) {
+        byte [] oldValue = results.get(0).getValue();
+        KeyValue kv = results.get(0);
+        byte [] buffer = kv.getBuffer();
+        int valueOffset = kv.getValueOffset();
+        result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG);
+      }
+
+      // bulid the KeyValue now:
+      KeyValue newKv = new KeyValue(row, family,
+          qualifier, System.currentTimeMillis(),
+          Bytes.toBytes(result));
+
+      // now log it:
       if (writeToWAL) {
         long now = System.currentTimeMillis();
         List<KeyValue> edits = new ArrayList<KeyValue>(1);
-        edits.add(vas.kv);
+        edits.add(newKv);
         this.log.append(regionInfo.getRegionName(),
           regionInfo.getTableDesc().getName(), edits,
           (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now);
       }
-      // Insert to the Store
-      store.add(vas.kv);
-      result = vas.value;
-      long size = this.memstoreSize.addAndGet(vas.sizeAdded);
+
+      // Now request the ICV to the store, this will set the timestamp
+      // appropriately depending on if there is a value in memcache or not.
+      // returns the
+      long size = store.updateColumnValue(row, family, qualifier, result);
+
+      size = this.memstoreSize.addAndGet(size);
       flush = isFlushSize(size);
     } finally {
       releaseRowLock(lid);

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=814038&r1=814037&r2=814038&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java Fri Sep
11 21:28:36 2009
@@ -496,6 +496,43 @@
       this.lock.readLock().unlock();
     }
   }
+
+  /**
+   * Gets from either the memstore or the snapshop, and returns a code
+   * to let you know which is which.
+   *
+   * @param matcher
+   * @param result
+   * @return 1 == memstore, 2 == snapshot, 0 == none
+   */
+  int getWithCode(QueryMatcher matcher, List<KeyValue> result) throws IOException {
+    this.lock.readLock().lock();
+    try {
+      boolean fromMemstore = internalGet(this.kvset, matcher, result);
+      if (fromMemstore || matcher.isDone())
+        return 1;
+
+      matcher.update();
+      boolean fromSnapshot = internalGet(this.snapshot, matcher, result);
+      if (fromSnapshot || matcher.isDone())
+        return 2;
+
+      return 0;
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Small utility functions for use by Store.incrementColumnValue
+   * _only_ under the threat of pain and everlasting race conditions.
+   */
+  void readLockLock() {
+    this.lock.readLock().lock();
+  }
+  void readLockUnlock() {
+    this.lock.readLock().unlock();
+  }
   
   /**
    *

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=814038&r1=814037&r2=814038&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java Fri Sep 11
21:28:36 2009
@@ -436,7 +436,7 @@
       lock.readLock().unlock();
     }
   }
-  
+
   /**
    * Adds a value to the memstore
    * 
@@ -1462,37 +1462,22 @@
     scanner.get(result);
   }
 
-  /*
-   * Data structure to hold incrementColumnValue result.
-   */
-  static class ICVResult {
-    final long value;
-    final long sizeAdded;
-    final KeyValue kv;
-
-    ICVResult(long value, long sizeAdded, KeyValue kv) {
-      this.value = value;
-      this.sizeAdded = sizeAdded;
-      this.kv = kv;
-    }
-  }
-
   /**
    * Increments the value for the given row/family/qualifier
    * @param row
    * @param f
    * @param qualifier
-   * @param amount
-   * @return The new value.
+   * @param newValue the new value to set into memstore
+   * @return memstore size delta
    * @throws IOException
    */
-  public ICVResult incrementColumnValue(byte [] row, byte [] f,
-      byte [] qualifier, long amount)
+  public long updateColumnValue(byte [] row, byte [] f,
+      byte [] qualifier, long newValue)
   throws IOException {
-    long value = 0;
     List<KeyValue> result = new ArrayList<KeyValue>();
     KeyComparator keyComparator = this.comparator.getRawComparator();
 
+    KeyValue kv = null;
     // Setting up the QueryMatcher
     Get get = new Get(row);
     NavigableSet<byte[]> qualifiers =
@@ -1501,78 +1486,41 @@
     QueryMatcher matcher = new QueryMatcher(get, f, qualifiers, this.ttl,
       keyComparator, 1);
 
-    boolean newTs = true;
-    KeyValue kv = null;
-    // Read from memstore first:
-    this.memstore.internalGet(this.memstore.kvset,
-                                  matcher, result);
-    if (!result.isEmpty()) {
-      kv = result.get(0).clone();
-      newTs = false;
-    } else {
-      // try the snapshot.
-      this.memstore.internalGet(this.memstore.snapshot,
-          matcher, result);
-      if (!result.isEmpty()) {
-        kv = result.get(0).clone();
-      }
-    }
+    // lock memstore snapshot for this critical section:
+    this.lock.readLock().lock();
+    memstore.readLockLock();
+    try {
+      int memstoreCode = this.memstore.getWithCode(matcher, result);
 
-    if (kv != null) {
-      // Received early-out from memstore
-      // Make a copy of the KV and increment it
-      byte [] buffer = kv.getBuffer();
-      int valueOffset = kv.getValueOffset();
-      value = Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG) + amount;
-      Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(value), 0,
-        Bytes.SIZEOF_LONG);
-      if (newTs) {
-        long currTs = System.currentTimeMillis();
-        if (currTs == kv.getTimestamp()) {
-          currTs++; // just in case something wacky happens.
-        }
-        byte [] stampBytes = Bytes.toBytes(currTs);
-        Bytes.putBytes(buffer, kv.getTimestampOffset(), stampBytes, 0,
+      if (memstoreCode != 0) {
+        // was in memstore (or snapshot)
+        kv = result.get(0).clone();
+        byte [] buffer = kv.getBuffer();
+        int valueOffset = kv.getValueOffset();
+        Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(newValue), 0,
             Bytes.SIZEOF_LONG);
+        if (memstoreCode == 2) {
+          // from snapshot, assign new TS
+          long currTs = System.currentTimeMillis();
+          if (currTs == kv.getTimestamp()) {
+            currTs++; // unlikely but catastrophic
+          }
+          Bytes.putBytes(buffer, kv.getTimestampOffset(),
+              Bytes.toBytes(currTs), 0, Bytes.SIZEOF_LONG);
+        }
+      } else {
+        kv = new KeyValue(row, f, qualifier,
+            System.currentTimeMillis(),
+            Bytes.toBytes(newValue));
       }
-      return new ICVResult(value, 0, kv);
-    }
-    // Check if we even have storefiles
-    if(this.storefiles.isEmpty()) {
-      return createNewKeyValue(row, f, qualifier, value, amount);
-    }
-    
-    // Get storefiles for this store
-    List<HFileScanner> storefileScanners = new ArrayList<HFileScanner>();
-    for (StoreFile sf : this.storefiles.descendingMap().values()) {
-      Reader r = sf.getReader();
-      if (r == null) {
-        LOG.warn("StoreFile " + sf + " has a null Reader");
-        continue;
-      }
-      storefileScanners.add(r.getScanner());
-    }
-    
-    // StoreFileGetScan will handle reading this store's storefiles
-    StoreFileGetScan scanner = new StoreFileGetScan(storefileScanners, matcher);
-    
-    // Run a GET scan and put results into the specified list 
-    scanner.get(result);
-    if(result.size() > 0) {
-      value = Bytes.toLong(result.get(0).getValue());
+      return add(kv);
+      // end lock
+    } finally {
+      memstore.readLockUnlock();
+      this.lock.readLock().unlock();
     }
-    return createNewKeyValue(row, f, qualifier, value, amount);
   }
   
-  private ICVResult createNewKeyValue(byte [] row, byte [] f, 
-      byte [] qualifier, long value, long amount) {
-    long newValue = value + amount;
-    KeyValue newKv = new KeyValue(row, f, qualifier,
-        System.currentTimeMillis(),
-        Bytes.toBytes(newValue));
-    return new ICVResult(newValue, newKv.heapSize(), newKv);
-  }
-
   public static final long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT + (17 * ClassSize.REFERENCE) +
       (5 * Bytes.SIZEOF_LONG) + (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN +

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=814038&r1=814037&r2=814038&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java Fri
Sep 11 21:28:36 2009
@@ -60,9 +60,13 @@
   // Test names
   private final byte[] tableName = Bytes.toBytes("testtable");;
   private final byte[] qual1 = Bytes.toBytes("qual1");
+  private final byte[] qual2 = Bytes.toBytes("qual2");
+  private final byte[] qual3 = Bytes.toBytes("qual3");
   private final byte[] value1 = Bytes.toBytes("value1");
   private final byte[] value2 = Bytes.toBytes("value2");
   private final byte [] row = Bytes.toBytes("rowA");
+  private final byte [] row2 = Bytes.toBytes("rowB");
+  private final byte [] row3 = Bytes.toBytes("rowC");
 
   /**
    * @see org.apache.hadoop.hbase.HBaseTestCase#setUp()
@@ -1246,7 +1250,6 @@
     byte [] col2 = Bytes.toBytes("Pub222");
 
 
-
     Put put = new Put(row1);
     put.add(family, col1, Bytes.toBytes(10L));
     region.put(put);
@@ -1275,11 +1278,166 @@
     List<KeyValue> results = new ArrayList<KeyValue>();
     assertEquals(false, s.next(results));
     assertEquals(0, results.size());
+  }
 
+  public void testIncrementColumnValue_UpdatingInPlace() throws IOException {
+    initHRegion(tableName, getName(), fam1);
 
+    long value = 1L;
+    long amount = 3L;
+
+    Put put = new Put(row);
+    put.add(fam1, qual1, Bytes.toBytes(value));
+    region.put(put);
 
+    long result = region.incrementColumnValue(row, fam1, qual1, amount, true);
     
+    assertEquals(value+amount, result);
+
+    Store store = region.getStore(fam1);
+    assertEquals(1, store.memstore.kvset.size());
+    assertTrue(store.memstore.snapshot.isEmpty());
+
+    assertICV(row, fam1, qual1, value+amount);
   }
+
+  public void testIncrementColumnValue_ConcurrentFlush() throws IOException {
+    initHRegion(tableName, getName(), fam1);
+
+    long value = 1L;
+    long amount = 3L;
+
+    Put put = new Put(row);
+    put.add(fam1, qual1, Bytes.toBytes(value));
+    region.put(put);
+
+    // now increment during a flush
+    Thread t = new Thread() {
+      public void run() {
+        try {
+          region.flushcache();
+        } catch (IOException e) {
+          LOG.info("test ICV, got IOE during flushcache()");
+        }
+      }
+    };
+    t.start();
+    long r = region.incrementColumnValue(row, fam1, qual1, amount, true);
+    assertEquals(value+amount, r);
+
+    // this also asserts there is only 1 KeyValue in the set.
+    assertICV(row, fam1, qual1, value+amount);
+  }
+
+  public void testIncrementColumnValue_UpdatingInPlace_Negative()
+    throws IOException {
+    initHRegion(tableName, getName(), fam1);
+
+    long value = 3L;
+    long amount = -1L;
+
+    Put put = new Put(row);
+    put.add(fam1, qual1, Bytes.toBytes(value));
+    region.put(put);
+
+    long result = region.incrementColumnValue(row, fam1, qual1, amount, true);
+    assertEquals(value+amount, result);
+
+    assertICV(row, fam1, qual1, value+amount);
+  }
+
+  public void testIncrementColumnValue_AddingNew()
+    throws IOException {
+    initHRegion(tableName, getName(), fam1);
+
+    long value = 1L;
+    long amount = 3L;
+
+    Put put = new Put(row);
+    put.add(fam1, qual1, Bytes.toBytes(value));
+    put.add(fam1, qual2, Bytes.toBytes(value));
+    region.put(put);
+
+    long result = region.incrementColumnValue(row, fam1, qual3, amount, true);
+    assertEquals(amount, result);
+
+    Get get = new Get(row);
+    get.addColumn(fam1, qual3);
+    Result rr = region.get(get, null);
+    assertEquals(1, rr.size());
+
+    // ensure none of the other cols were incremented.
+    assertICV(row, fam1, qual1, value);
+    assertICV(row, fam1, qual2, value);
+    assertICV(row, fam1, qual3, amount);
+  }
+
+  public void testIncrementColumnValue_UpdatingFromSF() throws IOException {
+    initHRegion(tableName, getName(), fam1);
+
+    long value = 1L;
+    long amount = 3L;
+
+    Put put = new Put(row);
+    put.add(fam1, qual1, Bytes.toBytes(value));
+    put.add(fam1, qual2, Bytes.toBytes(value));
+    region.put(put);
+
+    // flush to disk.
+    region.flushcache();
+
+    Store store = region.getStore(fam1);
+    assertEquals(0, store.memstore.kvset.size());
+
+    long r = region.incrementColumnValue(row, fam1, qual1, amount, true);
+    assertEquals(value+amount, r);
+
+    assertICV(row, fam1, qual1, value+amount);
+  }
+
+  public void testIncrementColumnValue_AddingNewAfterSFCheck()
+    throws IOException {
+    initHRegion(tableName, getName(), fam1);
+
+    long value = 1L;
+    long amount = 3L;
+
+    Put put = new Put(row);
+    put.add(fam1, qual1, Bytes.toBytes(value));
+    put.add(fam1, qual2, Bytes.toBytes(value));
+    region.put(put);
+    region.flushcache();
+
+    Store store = region.getStore(fam1);
+    assertEquals(0, store.memstore.kvset.size());
+
+    long r = region.incrementColumnValue(row, fam1, qual3, amount, true);
+    assertEquals(amount, r);
+
+    assertICV(row, fam1, qual3, amount);
+
+    region.flushcache();
+
+    // ensure that this gets to disk.
+    assertICV(row, fam1, qual3, amount);
+  }
+
+  private void assertICV(byte [] row,
+                         byte [] familiy,
+                         byte[] qualifier,
+                         long amount) throws IOException {
+    // run a get and see?
+    Get get = new Get(row);
+    get.addColumn(familiy, qualifier);
+    Result result = region.get(get, null);
+    assertEquals(1, result.size());
+
+    KeyValue kv = result.raw()[0];
+    long r = Bytes.toLong(kv.getValue());
+    assertEquals(amount, r);
+  }
+
+
   
   public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions()
   throws IOException {

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=814038&r1=814037&r2=814038&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java Fri Sep
11 21:28:36 2009
@@ -232,163 +232,40 @@
   //////////////////////////////////////////////////////////////////////////////
   // IncrementColumnValue tests
   //////////////////////////////////////////////////////////////////////////////
-  /**
-   * Testing if the update in place works. When you want to update a value that
-   * is already in memstore, you don't delete it and put a new one, but just 
-   * update the value in the original KeyValue
-   * @throws IOException
+  /*
+   * test the internal details of how ICV works, especially during a flush scenario.
    */
-  public void testIncrementColumnValue_UpdatingInPlace() throws IOException {
-    init(this.getName());
-
-    //Put data in memstore
-    long value = 1L;
-    long amount = 3L;
-    this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value)));
-    
-    Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount);
-    assertEquals(value+amount, vas.value);
-    store.add(vas.kv);
-    Get get = new Get(row);
-    get.addColumn(family, qf1);
-    NavigableSet<byte[]> qualifiers = 
-      new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
-    qualifiers.add(qf1);
-    List<KeyValue> result = new ArrayList<KeyValue>();
-    this.store.get(get, qualifiers, result);
-    assertEquals(value + amount, Bytes.toLong(result.get(0).getValue()));
-  }
-
-  /**
-   * Same as above but for a negative number
-   * @throws IOException
-   */
-  public void testIncrementColumnValue_UpdatingInPlace_Negative() 
-  throws IOException {
-    init(this.getName());
-
-    //Put data in memstore
-    long value = 3L;
-    long amount = -1L;
-    this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value)));
-    
-    Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount);
-    assertEquals(vas.value, value+amount);
-    store.add(vas.kv);
-    Get get = new Get(row);
-    get.addColumn(family, qf1);
-    NavigableSet<byte[]> qualifiers = 
-      new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
-    qualifiers.add(qf1);
-    List<KeyValue> result = new ArrayList<KeyValue>();
-    this.store.get(get, qualifiers, result);
-    assertEquals(value + amount, Bytes.toLong(result.get(0).getValue()));
-  }
-  
-  /**
-   * When there is no mathing key already, adding a new.
-   * @throws IOException
-   */
-  public void testIncrementColumnValue_AddingNew() throws IOException {
-    init(this.getName());
-    
-    //Put data in memstore
-    long value = 1L;
-    long amount = 3L;
-    this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value)));
-    this.store.add(new KeyValue(row, family, qf2, Bytes.toBytes(value)));
-    
-    Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf3, amount);
-    store.add(vas.kv);
-    Get get = new Get(row);
-    get.addColumn(family, qf3);
-    NavigableSet<byte[]> qualifiers = 
-      new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
-    qualifiers.add(qf3);
-    List<KeyValue> result = new ArrayList<KeyValue>();
-    this.store.get(get, qualifiers, result);
-    assertEquals(amount, Bytes.toLong(result.get(0).getValue()));
-  }
-
-  /**
-   * When we have the key in a file add a new key + value to memstore with the 
-   * updates value. 
-   * @throws IOException
-   */
-  public void testIncrementColumnValue_UpdatingFromSF() throws IOException {
-    init(this.getName());
-    
-    //Put data in memstore
-    long value = 1L;
-    long amount = 3L;
-    this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value)));
-    this.store.add(new KeyValue(row, family, qf2, Bytes.toBytes(value)));
-    
-    flush(1);
-    
-    Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount);
-    store.add(vas.kv);
-    Get get = new Get(row);
-    get.addColumn(family, qf1);
-    NavigableSet<byte[]> qualifiers = 
-      new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
-    qualifiers.add(qf1);
-    List<KeyValue> result = new ArrayList<KeyValue>();
-    this.store.get(get, qualifiers, result);
-    assertEquals(value + amount, Bytes.toLong(result.get(0).getValue()));
-  }
-
-  /**
-   * Same as testIncrementColumnValue_AddingNew() except that the keys are
-   * checked in file not in memstore
-   * @throws IOException
-   */
-  public void testIncrementColumnValue_AddingNewAfterSFCheck() 
-  throws IOException {
-    init(this.getName());
-    
-    //Put data in memstore
-    long value = 1L;
-    long amount = 3L;
-    this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value)));
-    this.store.add(new KeyValue(row, family, qf2, Bytes.toBytes(value)));
-    
-    flush(1);
-    
-    Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf3, amount);
-    store.add(vas.kv);
-    Get get = new Get(row);
-    get.addColumn(family, qf3);
-    NavigableSet<byte[]> qualifiers = 
-      new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
-    qualifiers.add(qf3);
-    List<KeyValue> result = new ArrayList<KeyValue>();
-    this.store.get(get, qualifiers, result);
-    assertEquals(amount, Bytes.toLong(result.get(0).getValue()));
-  }
-
   public void testIncrementColumnValue_ICVDuringFlush()
     throws IOException {
     init(this.getName());
 
-    long value = 1L;
-    long amount = 3L;
+    long oldValue = 1L;
+    long newValue = 3L;
     this.store.add(new KeyValue(row, family, qf1,
         System.currentTimeMillis(),
-        Bytes.toBytes(value)));
+        Bytes.toBytes(oldValue)));
 
     // snapshot the store.
     this.store.snapshot();
 
-    // incrment during the snapshot...
+    // add other things:
+    this.store.add(new KeyValue(row, family, qf2,
+        System.currentTimeMillis(),
+        Bytes.toBytes(oldValue)));
+
+    // update during the snapshot.
+    long ret = this.store.updateColumnValue(row, family, qf1, newValue);
 
-    Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount);
+    // memstore should have grown by some amount.
+    assertTrue(ret > 0);
 
     // then flush.
     this.store.flushCache(id++);
     assertEquals(1, this.store.getStorefiles().size());
-    assertEquals(0, this.store.memstore.kvset.size());
+    // from the one we inserted up there, and a new one
+    assertEquals(2, this.store.memstore.kvset.size());
 
+    // how many key/values for this row are there?
     Get get = new Get(row);
     get.addColumn(family, qf1);
     get.setMaxVersions(); // all versions.
@@ -398,12 +275,15 @@
     cols.add(qf1);
 
     this.store.get(get, cols, results);
-    // only one, because Store.ICV doesnt add to memcache.
-    assertEquals(1, results.size());
+    assertEquals(2, results.size());
+
+    long ts1 = results.get(0).getTimestamp();
+    long ts2 = results.get(1).getTimestamp();
+
+    assertTrue(ts1 > ts2);
+
+    assertEquals(newValue, Bytes.toLong(results.get(0).getValue()));
+    assertEquals(oldValue, Bytes.toLong(results.get(1).getValue()));
 
-    // but the timestamps should be different...
-    long icvTs = vas.kv.getTimestamp();
-    long storeTs = results.get(0).getTimestamp();
-    assertTrue(icvTs != storeTs);
   }
 }
\ No newline at end of file



Mime
View raw message