Return-Path: Delivered-To: apmail-hadoop-hbase-commits-archive@minotaur.apache.org Received: (qmail 26951 invoked from network); 11 Sep 2009 21:29:52 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 11 Sep 2009 21:29:52 -0000 Received: (qmail 11606 invoked by uid 500); 11 Sep 2009 21:29:52 -0000 Delivered-To: apmail-hadoop-hbase-commits-archive@hadoop.apache.org Received: (qmail 11556 invoked by uid 500); 11 Sep 2009 21:29:52 -0000 Mailing-List: contact hbase-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hbase-dev@hadoop.apache.org Delivered-To: mailing list hbase-commits@hadoop.apache.org Received: (qmail 11547 invoked by uid 99); 11 Sep 2009 21:29:52 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Sep 2009 21:29:52 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Sep 2009 21:29:41 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 10E2423888D7; Fri, 11 Sep 2009 21:29:19 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r814038 - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/regionserver/ src/test/org/apache/hadoop/hbase/regionserver/ Date: Fri, 11 Sep 2009 21:29:08 -0000 To: hbase-commits@hadoop.apache.org From: stack@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090911212919.10E2423888D7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: stack Date: Fri Sep 11 21:28:36 2009 New Revision: 814038 URL: http://svn.apache.org/viewvc?rev=814038&view=rev Log: HBASE-1740 ICV has a subtle race condition only visible under high load Modified: hadoop/hbase/trunk/CHANGES.txt hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java Modified: hadoop/hbase/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=814038&r1=814037&r2=814038&view=diff ============================================================================== --- hadoop/hbase/trunk/CHANGES.txt (original) +++ hadoop/hbase/trunk/CHANGES.txt Fri Sep 11 21:28:36 2009 @@ -18,6 +18,7 @@ get tossed as 'duplicates' HBASE-1794 recovered log files are not inserted into the storefile map HBASE-1824 [stargate] default timestamp should be LATEST_TIMESTAMP + HBASE-1740 ICV has a subtle race condition only visible under high load IMPROVEMENTS HBASE-1760 Cleanup TODOs in HTable Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=814038&r1=814037&r2=814038&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Sep 11 21:28:36 2009 @@ -2317,25 +2317,47 @@ boolean flush = false; // Lock row Integer lid = obtainRowLock(row); - long result = 0L; + long result = amount; try { Store store = stores.get(family); - // Determine what to do and perform increment on returned KV, no insertion - Store.ICVResult vas = - store.incrementColumnValue(row, family, qualifier, amount); - // Write incremented value to WAL before inserting + + // Get the old value: + Get get = new Get(row); + get.addColumn(family, qualifier); + List results = new ArrayList(); + NavigableSet qualifiers = new TreeSet(Bytes.BYTES_COMPARATOR); + qualifiers.add(qualifier); + store.get(get, qualifiers, results); + + if (!results.isEmpty()) { + byte [] oldValue = results.get(0).getValue(); + KeyValue kv = results.get(0); + byte [] buffer = kv.getBuffer(); + int valueOffset = kv.getValueOffset(); + result += Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG); + } + + // bulid the KeyValue now: + KeyValue newKv = new KeyValue(row, family, + qualifier, System.currentTimeMillis(), + Bytes.toBytes(result)); + + // now log it: if (writeToWAL) { long now = System.currentTimeMillis(); List edits = new ArrayList(1); - edits.add(vas.kv); + edits.add(newKv); this.log.append(regionInfo.getRegionName(), regionInfo.getTableDesc().getName(), edits, (regionInfo.isMetaRegion() || regionInfo.isRootRegion()), now); } - // Insert to the Store - store.add(vas.kv); - result = vas.value; - long size = this.memstoreSize.addAndGet(vas.sizeAdded); + + // Now request the ICV to the store, this will set the timestamp + // appropriately depending on if there is a value in memcache or not. + // returns the + long size = store.updateColumnValue(row, family, qualifier, result); + + size = this.memstoreSize.addAndGet(size); flush = isFlushSize(size); } finally { releaseRowLock(lid); Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=814038&r1=814037&r2=814038&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java Fri Sep 11 21:28:36 2009 @@ -496,6 +496,43 @@ this.lock.readLock().unlock(); } } + + /** + * Gets from either the memstore or the snapshop, and returns a code + * to let you know which is which. + * + * @param matcher + * @param result + * @return 1 == memstore, 2 == snapshot, 0 == none + */ + int getWithCode(QueryMatcher matcher, List result) throws IOException { + this.lock.readLock().lock(); + try { + boolean fromMemstore = internalGet(this.kvset, matcher, result); + if (fromMemstore || matcher.isDone()) + return 1; + + matcher.update(); + boolean fromSnapshot = internalGet(this.snapshot, matcher, result); + if (fromSnapshot || matcher.isDone()) + return 2; + + return 0; + } finally { + this.lock.readLock().unlock(); + } + } + + /** + * Small utility functions for use by Store.incrementColumnValue + * _only_ under the threat of pain and everlasting race conditions. + */ + void readLockLock() { + this.lock.readLock().lock(); + } + void readLockUnlock() { + this.lock.readLock().unlock(); + } /** * Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=814038&r1=814037&r2=814038&view=diff ============================================================================== --- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original) +++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java Fri Sep 11 21:28:36 2009 @@ -436,7 +436,7 @@ lock.readLock().unlock(); } } - + /** * Adds a value to the memstore * @@ -1462,37 +1462,22 @@ scanner.get(result); } - /* - * Data structure to hold incrementColumnValue result. - */ - static class ICVResult { - final long value; - final long sizeAdded; - final KeyValue kv; - - ICVResult(long value, long sizeAdded, KeyValue kv) { - this.value = value; - this.sizeAdded = sizeAdded; - this.kv = kv; - } - } - /** * Increments the value for the given row/family/qualifier * @param row * @param f * @param qualifier - * @param amount - * @return The new value. + * @param newValue the new value to set into memstore + * @return memstore size delta * @throws IOException */ - public ICVResult incrementColumnValue(byte [] row, byte [] f, - byte [] qualifier, long amount) + public long updateColumnValue(byte [] row, byte [] f, + byte [] qualifier, long newValue) throws IOException { - long value = 0; List result = new ArrayList(); KeyComparator keyComparator = this.comparator.getRawComparator(); + KeyValue kv = null; // Setting up the QueryMatcher Get get = new Get(row); NavigableSet qualifiers = @@ -1501,78 +1486,41 @@ QueryMatcher matcher = new QueryMatcher(get, f, qualifiers, this.ttl, keyComparator, 1); - boolean newTs = true; - KeyValue kv = null; - // Read from memstore first: - this.memstore.internalGet(this.memstore.kvset, - matcher, result); - if (!result.isEmpty()) { - kv = result.get(0).clone(); - newTs = false; - } else { - // try the snapshot. - this.memstore.internalGet(this.memstore.snapshot, - matcher, result); - if (!result.isEmpty()) { - kv = result.get(0).clone(); - } - } + // lock memstore snapshot for this critical section: + this.lock.readLock().lock(); + memstore.readLockLock(); + try { + int memstoreCode = this.memstore.getWithCode(matcher, result); - if (kv != null) { - // Received early-out from memstore - // Make a copy of the KV and increment it - byte [] buffer = kv.getBuffer(); - int valueOffset = kv.getValueOffset(); - value = Bytes.toLong(buffer, valueOffset, Bytes.SIZEOF_LONG) + amount; - Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(value), 0, - Bytes.SIZEOF_LONG); - if (newTs) { - long currTs = System.currentTimeMillis(); - if (currTs == kv.getTimestamp()) { - currTs++; // just in case something wacky happens. - } - byte [] stampBytes = Bytes.toBytes(currTs); - Bytes.putBytes(buffer, kv.getTimestampOffset(), stampBytes, 0, + if (memstoreCode != 0) { + // was in memstore (or snapshot) + kv = result.get(0).clone(); + byte [] buffer = kv.getBuffer(); + int valueOffset = kv.getValueOffset(); + Bytes.putBytes(buffer, valueOffset, Bytes.toBytes(newValue), 0, Bytes.SIZEOF_LONG); + if (memstoreCode == 2) { + // from snapshot, assign new TS + long currTs = System.currentTimeMillis(); + if (currTs == kv.getTimestamp()) { + currTs++; // unlikely but catastrophic + } + Bytes.putBytes(buffer, kv.getTimestampOffset(), + Bytes.toBytes(currTs), 0, Bytes.SIZEOF_LONG); + } + } else { + kv = new KeyValue(row, f, qualifier, + System.currentTimeMillis(), + Bytes.toBytes(newValue)); } - return new ICVResult(value, 0, kv); - } - // Check if we even have storefiles - if(this.storefiles.isEmpty()) { - return createNewKeyValue(row, f, qualifier, value, amount); - } - - // Get storefiles for this store - List storefileScanners = new ArrayList(); - for (StoreFile sf : this.storefiles.descendingMap().values()) { - Reader r = sf.getReader(); - if (r == null) { - LOG.warn("StoreFile " + sf + " has a null Reader"); - continue; - } - storefileScanners.add(r.getScanner()); - } - - // StoreFileGetScan will handle reading this store's storefiles - StoreFileGetScan scanner = new StoreFileGetScan(storefileScanners, matcher); - - // Run a GET scan and put results into the specified list - scanner.get(result); - if(result.size() > 0) { - value = Bytes.toLong(result.get(0).getValue()); + return add(kv); + // end lock + } finally { + memstore.readLockUnlock(); + this.lock.readLock().unlock(); } - return createNewKeyValue(row, f, qualifier, value, amount); } - private ICVResult createNewKeyValue(byte [] row, byte [] f, - byte [] qualifier, long value, long amount) { - long newValue = value + amount; - KeyValue newKv = new KeyValue(row, f, qualifier, - System.currentTimeMillis(), - Bytes.toBytes(newValue)); - return new ICVResult(newValue, newKv.heapSize(), newKv); - } - public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + (17 * ClassSize.REFERENCE) + (5 * Bytes.SIZEOF_LONG) + (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN + Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=814038&r1=814037&r2=814038&view=diff ============================================================================== --- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original) +++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java Fri Sep 11 21:28:36 2009 @@ -60,9 +60,13 @@ // Test names private final byte[] tableName = Bytes.toBytes("testtable");; private final byte[] qual1 = Bytes.toBytes("qual1"); + private final byte[] qual2 = Bytes.toBytes("qual2"); + private final byte[] qual3 = Bytes.toBytes("qual3"); private final byte[] value1 = Bytes.toBytes("value1"); private final byte[] value2 = Bytes.toBytes("value2"); private final byte [] row = Bytes.toBytes("rowA"); + private final byte [] row2 = Bytes.toBytes("rowB"); + private final byte [] row3 = Bytes.toBytes("rowC"); /** * @see org.apache.hadoop.hbase.HBaseTestCase#setUp() @@ -1246,7 +1250,6 @@ byte [] col2 = Bytes.toBytes("Pub222"); - Put put = new Put(row1); put.add(family, col1, Bytes.toBytes(10L)); region.put(put); @@ -1275,11 +1278,166 @@ List results = new ArrayList(); assertEquals(false, s.next(results)); assertEquals(0, results.size()); + } + public void testIncrementColumnValue_UpdatingInPlace() throws IOException { + initHRegion(tableName, getName(), fam1); + long value = 1L; + long amount = 3L; + + Put put = new Put(row); + put.add(fam1, qual1, Bytes.toBytes(value)); + region.put(put); + long result = region.incrementColumnValue(row, fam1, qual1, amount, true); + assertEquals(value+amount, result); + + Store store = region.getStore(fam1); + assertEquals(1, store.memstore.kvset.size()); + assertTrue(store.memstore.snapshot.isEmpty()); + + assertICV(row, fam1, qual1, value+amount); } + + public void testIncrementColumnValue_ConcurrentFlush() throws IOException { + initHRegion(tableName, getName(), fam1); + + long value = 1L; + long amount = 3L; + + Put put = new Put(row); + put.add(fam1, qual1, Bytes.toBytes(value)); + region.put(put); + + // now increment during a flush + Thread t = new Thread() { + public void run() { + try { + region.flushcache(); + } catch (IOException e) { + LOG.info("test ICV, got IOE during flushcache()"); + } + } + }; + t.start(); + long r = region.incrementColumnValue(row, fam1, qual1, amount, true); + assertEquals(value+amount, r); + + // this also asserts there is only 1 KeyValue in the set. + assertICV(row, fam1, qual1, value+amount); + } + + public void testIncrementColumnValue_UpdatingInPlace_Negative() + throws IOException { + initHRegion(tableName, getName(), fam1); + + long value = 3L; + long amount = -1L; + + Put put = new Put(row); + put.add(fam1, qual1, Bytes.toBytes(value)); + region.put(put); + + long result = region.incrementColumnValue(row, fam1, qual1, amount, true); + assertEquals(value+amount, result); + + assertICV(row, fam1, qual1, value+amount); + } + + public void testIncrementColumnValue_AddingNew() + throws IOException { + initHRegion(tableName, getName(), fam1); + + long value = 1L; + long amount = 3L; + + Put put = new Put(row); + put.add(fam1, qual1, Bytes.toBytes(value)); + put.add(fam1, qual2, Bytes.toBytes(value)); + region.put(put); + + long result = region.incrementColumnValue(row, fam1, qual3, amount, true); + assertEquals(amount, result); + + Get get = new Get(row); + get.addColumn(fam1, qual3); + Result rr = region.get(get, null); + assertEquals(1, rr.size()); + + // ensure none of the other cols were incremented. + assertICV(row, fam1, qual1, value); + assertICV(row, fam1, qual2, value); + assertICV(row, fam1, qual3, amount); + } + + public void testIncrementColumnValue_UpdatingFromSF() throws IOException { + initHRegion(tableName, getName(), fam1); + + long value = 1L; + long amount = 3L; + + Put put = new Put(row); + put.add(fam1, qual1, Bytes.toBytes(value)); + put.add(fam1, qual2, Bytes.toBytes(value)); + region.put(put); + + // flush to disk. + region.flushcache(); + + Store store = region.getStore(fam1); + assertEquals(0, store.memstore.kvset.size()); + + long r = region.incrementColumnValue(row, fam1, qual1, amount, true); + assertEquals(value+amount, r); + + assertICV(row, fam1, qual1, value+amount); + } + + public void testIncrementColumnValue_AddingNewAfterSFCheck() + throws IOException { + initHRegion(tableName, getName(), fam1); + + long value = 1L; + long amount = 3L; + + Put put = new Put(row); + put.add(fam1, qual1, Bytes.toBytes(value)); + put.add(fam1, qual2, Bytes.toBytes(value)); + region.put(put); + region.flushcache(); + + Store store = region.getStore(fam1); + assertEquals(0, store.memstore.kvset.size()); + + long r = region.incrementColumnValue(row, fam1, qual3, amount, true); + assertEquals(amount, r); + + assertICV(row, fam1, qual3, amount); + + region.flushcache(); + + // ensure that this gets to disk. + assertICV(row, fam1, qual3, amount); + } + + private void assertICV(byte [] row, + byte [] familiy, + byte[] qualifier, + long amount) throws IOException { + // run a get and see? + Get get = new Get(row); + get.addColumn(familiy, qualifier); + Result result = region.get(get, null); + assertEquals(1, result.size()); + + KeyValue kv = result.raw()[0]; + long r = Bytes.toLong(kv.getValue()); + assertEquals(amount, r); + } + + public void testScanner_Wildcard_FromMemStoreAndFiles_EnforceVersions() throws IOException { Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=814038&r1=814037&r2=814038&view=diff ============================================================================== --- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java (original) +++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestStore.java Fri Sep 11 21:28:36 2009 @@ -232,163 +232,40 @@ ////////////////////////////////////////////////////////////////////////////// // IncrementColumnValue tests ////////////////////////////////////////////////////////////////////////////// - /** - * Testing if the update in place works. When you want to update a value that - * is already in memstore, you don't delete it and put a new one, but just - * update the value in the original KeyValue - * @throws IOException + /* + * test the internal details of how ICV works, especially during a flush scenario. */ - public void testIncrementColumnValue_UpdatingInPlace() throws IOException { - init(this.getName()); - - //Put data in memstore - long value = 1L; - long amount = 3L; - this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value))); - - Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount); - assertEquals(value+amount, vas.value); - store.add(vas.kv); - Get get = new Get(row); - get.addColumn(family, qf1); - NavigableSet qualifiers = - new ConcurrentSkipListSet(Bytes.BYTES_COMPARATOR); - qualifiers.add(qf1); - List result = new ArrayList(); - this.store.get(get, qualifiers, result); - assertEquals(value + amount, Bytes.toLong(result.get(0).getValue())); - } - - /** - * Same as above but for a negative number - * @throws IOException - */ - public void testIncrementColumnValue_UpdatingInPlace_Negative() - throws IOException { - init(this.getName()); - - //Put data in memstore - long value = 3L; - long amount = -1L; - this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value))); - - Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount); - assertEquals(vas.value, value+amount); - store.add(vas.kv); - Get get = new Get(row); - get.addColumn(family, qf1); - NavigableSet qualifiers = - new ConcurrentSkipListSet(Bytes.BYTES_COMPARATOR); - qualifiers.add(qf1); - List result = new ArrayList(); - this.store.get(get, qualifiers, result); - assertEquals(value + amount, Bytes.toLong(result.get(0).getValue())); - } - - /** - * When there is no mathing key already, adding a new. - * @throws IOException - */ - public void testIncrementColumnValue_AddingNew() throws IOException { - init(this.getName()); - - //Put data in memstore - long value = 1L; - long amount = 3L; - this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value))); - this.store.add(new KeyValue(row, family, qf2, Bytes.toBytes(value))); - - Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf3, amount); - store.add(vas.kv); - Get get = new Get(row); - get.addColumn(family, qf3); - NavigableSet qualifiers = - new ConcurrentSkipListSet(Bytes.BYTES_COMPARATOR); - qualifiers.add(qf3); - List result = new ArrayList(); - this.store.get(get, qualifiers, result); - assertEquals(amount, Bytes.toLong(result.get(0).getValue())); - } - - /** - * When we have the key in a file add a new key + value to memstore with the - * updates value. - * @throws IOException - */ - public void testIncrementColumnValue_UpdatingFromSF() throws IOException { - init(this.getName()); - - //Put data in memstore - long value = 1L; - long amount = 3L; - this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value))); - this.store.add(new KeyValue(row, family, qf2, Bytes.toBytes(value))); - - flush(1); - - Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount); - store.add(vas.kv); - Get get = new Get(row); - get.addColumn(family, qf1); - NavigableSet qualifiers = - new ConcurrentSkipListSet(Bytes.BYTES_COMPARATOR); - qualifiers.add(qf1); - List result = new ArrayList(); - this.store.get(get, qualifiers, result); - assertEquals(value + amount, Bytes.toLong(result.get(0).getValue())); - } - - /** - * Same as testIncrementColumnValue_AddingNew() except that the keys are - * checked in file not in memstore - * @throws IOException - */ - public void testIncrementColumnValue_AddingNewAfterSFCheck() - throws IOException { - init(this.getName()); - - //Put data in memstore - long value = 1L; - long amount = 3L; - this.store.add(new KeyValue(row, family, qf1, Bytes.toBytes(value))); - this.store.add(new KeyValue(row, family, qf2, Bytes.toBytes(value))); - - flush(1); - - Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf3, amount); - store.add(vas.kv); - Get get = new Get(row); - get.addColumn(family, qf3); - NavigableSet qualifiers = - new ConcurrentSkipListSet(Bytes.BYTES_COMPARATOR); - qualifiers.add(qf3); - List result = new ArrayList(); - this.store.get(get, qualifiers, result); - assertEquals(amount, Bytes.toLong(result.get(0).getValue())); - } - public void testIncrementColumnValue_ICVDuringFlush() throws IOException { init(this.getName()); - long value = 1L; - long amount = 3L; + long oldValue = 1L; + long newValue = 3L; this.store.add(new KeyValue(row, family, qf1, System.currentTimeMillis(), - Bytes.toBytes(value))); + Bytes.toBytes(oldValue))); // snapshot the store. this.store.snapshot(); - // incrment during the snapshot... + // add other things: + this.store.add(new KeyValue(row, family, qf2, + System.currentTimeMillis(), + Bytes.toBytes(oldValue))); + + // update during the snapshot. + long ret = this.store.updateColumnValue(row, family, qf1, newValue); - Store.ICVResult vas = this.store.incrementColumnValue(row, family, qf1, amount); + // memstore should have grown by some amount. + assertTrue(ret > 0); // then flush. this.store.flushCache(id++); assertEquals(1, this.store.getStorefiles().size()); - assertEquals(0, this.store.memstore.kvset.size()); + // from the one we inserted up there, and a new one + assertEquals(2, this.store.memstore.kvset.size()); + // how many key/values for this row are there? Get get = new Get(row); get.addColumn(family, qf1); get.setMaxVersions(); // all versions. @@ -398,12 +275,15 @@ cols.add(qf1); this.store.get(get, cols, results); - // only one, because Store.ICV doesnt add to memcache. - assertEquals(1, results.size()); + assertEquals(2, results.size()); + + long ts1 = results.get(0).getTimestamp(); + long ts2 = results.get(1).getTimestamp(); + + assertTrue(ts1 > ts2); + + assertEquals(newValue, Bytes.toLong(results.get(0).getValue())); + assertEquals(oldValue, Bytes.toLong(results.get(1).getValue())); - // but the timestamps should be different... - long icvTs = vas.kv.getTimestamp(); - long storeTs = results.get(0).getTimestamp(); - assertTrue(icvTs != storeTs); } } \ No newline at end of file