From dev-return-1921-archive-asf-public=cust-asf.ponee.io@tephra.incubator.apache.org Thu Jan 11 08:22:15 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 99FF7180656 for ; Thu, 11 Jan 2018 08:22:15 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 89C16160C1F; Thu, 11 Jan 2018 07:22:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 32288160C41 for ; Thu, 11 Jan 2018 08:22:14 +0100 (CET) Received: (qmail 57154 invoked by uid 500); 11 Jan 2018 07:22:13 -0000 Mailing-List: contact dev-help@tephra.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tephra.incubator.apache.org Delivered-To: mailing list dev@tephra.incubator.apache.org Received: (qmail 57058 invoked by uid 99); 11 Jan 2018 07:22:13 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Jan 2018 07:22:13 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 853141A08AD for ; Thu, 11 Jan 2018 07:22:12 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.03 X-Spam-Level: X-Spam-Status: No, score=-4.03 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id WOPdfbsF_g8I for ; Thu, 11 Jan 2018 07:22:08 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 626045F30C for ; Thu, 11 Jan 2018 07:22:06 +0000 (UTC) Received: (qmail 55726 invoked by uid 99); 11 Jan 2018 07:22:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Jan 2018 07:22:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0F922DFB31; Thu, 11 Jan 2018 07:22:05 +0000 (UTC) From: ankitsinghal To: dev@tephra.incubator.apache.org Reply-To: dev@tephra.incubator.apache.org References: In-Reply-To: Subject: [GitHub] incubator-tephra pull request #67: TEPHRA-272 Add HBase 2.0 compatibility mo... Content-Type: text/plain Message-Id: <20180111072205.0F922DFB31@git1-us-west.apache.org> Date: Thu, 11 Jan 2018 07:22:05 +0000 (UTC) Github user ankitsinghal commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/67#discussion_r160879714 --- Diff: tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java --- @@ -0,0 +1,677 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.hbase.coprocessor; + +import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import it.unimi.dsi.fastutil.longs.LongArrayList; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.ChoreService; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MockRegionServerServices; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.io.util.MemorySizeUtil; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; +import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.MemStoreLAB; +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TxConstants; +import org.apache.tephra.coprocessor.TransactionStateCache; +import org.apache.tephra.coprocessor.TransactionStateCacheSupplier; +import org.apache.tephra.manager.InvalidTxList; +import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.persist.HDFSTransactionStateStorage; +import org.apache.tephra.persist.TransactionSnapshot; +import org.apache.tephra.persist.TransactionVisibilityState; +import org.apache.tephra.snapshot.DefaultSnapshotCodec; +import org.apache.tephra.snapshot.SnapshotCodecProvider; +import org.apache.tephra.util.TxUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests filtering of invalid transaction data by the {@link TransactionProcessor} coprocessor. + */ +public class TransactionProcessorTest { + private static final Logger LOG = LoggerFactory.getLogger(TransactionProcessorTest.class); + protected static ChunkCreator chunkCreator; + // 8 versions, 1 hour apart, latest is current ts. + private static final long[] V; + + static { + long now = System.currentTimeMillis(); + V = new long[9]; + for (int i = 0; i < V.length; i++) { + V[i] = (now - TimeUnit.HOURS.toMillis(8 - i)) * TxConstants.MAX_TX_PER_MS; + } + } + + @ClassRule + public static TemporaryFolder tmpFolder = new TemporaryFolder(); + private static MiniDFSCluster dfsCluster; + private static Configuration conf; + private static LongArrayList invalidSet = new LongArrayList(new long[]{V[3], V[5], V[7]}); + private static TransactionVisibilityState txVisibilityState; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + Configuration hConf = new Configuration(); + String rootDir = tmpFolder.newFolder().getAbsolutePath(); + hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, rootDir); + hConf.set(HConstants.HBASE_DIR, rootDir + "/hbase"); + + dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build(); + dfsCluster.waitActive(); + conf = HBaseConfiguration.create(dfsCluster.getFileSystem().getConf()); + + conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER); + conf.unset(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES); + String localTestDir = tmpFolder.newFolder().getAbsolutePath(); + conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, localTestDir); + conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName()); + + // write an initial transaction snapshot + InvalidTxList invalidTxList = new InvalidTxList(); + invalidTxList.addAll(invalidSet); + TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom( + System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList, + // this will set visibility upper bound to V[6] + Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx( + V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))), + new HashMap(), new TreeMap()); + txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(), + txSnapshot.getWritePointer(), txSnapshot.getInvalid(), + txSnapshot.getInProgress()); + HDFSTransactionStateStorage tmpStorage = + new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector()); + tmpStorage.startAndWait(); + tmpStorage.writeSnapshot(txSnapshot); + tmpStorage.stopAndWait(); + long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() + .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); + chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, + globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); + assertTrue(chunkCreator != null); + } + + @AfterClass + public static void shutdownAfterClass() throws Exception { + dfsCluster.shutdown(); + } + + @Test + public void testDataJanitorRegionScanner() throws Exception { + String tableName = "TestRegionScanner"; + byte[] familyBytes = Bytes.toBytes("f"); + byte[] columnBytes = Bytes.toBytes("c"); + HRegion region = createRegion(tableName, familyBytes, TimeUnit.HOURS.toMillis(3)); + try { + region.initialize(); + TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get(); + LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache)); + + for (int i = 1; i <= 8; i++) { + for (int k = 1; k <= i; k++) { + Put p = new Put(Bytes.toBytes(i)); + p.addColumn(familyBytes, columnBytes, V[k], Bytes.toBytes(V[k])); + region.put(p); + } + } + + List results = Lists.newArrayList(); + + // force a flush to clear the data + // during flush, the coprocessor should drop all KeyValues with timestamps in the invalid set + + LOG.info("Flushing region " + region.getRegionInfo().getRegionNameAsString()); + FlushResultImpl flushResult = region.flushcache(true, false, new FlushLifeCycleTracker() { }); + Assert.assertTrue("Unexpected flush result: " + flushResult, flushResult.isFlushSucceeded()); + + // now a normal scan should only return the valid rows + // do not use a filter here to test that cleanup works on flush + Scan scan = new Scan(); + scan.setMaxVersions(10); + RegionScanner regionScanner = region.getScanner(scan); + + // first returned value should be "4" with version "4" + results.clear(); + assertTrue(regionScanner.next(results)); + assertKeyValueMatches(results, 4, new long[]{V[4]}); + + results.clear(); + assertTrue(regionScanner.next(results)); + assertKeyValueMatches(results, 5, new long[] {V[4]}); + + results.clear(); + assertTrue(regionScanner.next(results)); + assertKeyValueMatches(results, 6, new long[]{V[6], V[4]}); + + results.clear(); + assertTrue(regionScanner.next(results)); + assertKeyValueMatches(results, 7, new long[]{V[6], V[4]}); + + results.clear(); + assertFalse(regionScanner.next(results)); + assertKeyValueMatches(results, 8, new long[] {V[8], V[6], V[4]}); + } finally { + //region.close(); + } + } + + @Test + public void testDeleteFiltering() throws Exception { + String tableName = "TestDeleteFiltering"; + byte[] familyBytes = Bytes.toBytes("f"); + byte[] columnBytes = Bytes.toBytes("c"); + HRegion region = createRegion(tableName, familyBytes, 0); + try { + region.initialize(); + TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get(); + LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache)); + + byte[] row = Bytes.toBytes(1); + for (int i = 4; i < V.length; i++) { + Put p = new Put(row); + p.addColumn(familyBytes, columnBytes, V[i], Bytes.toBytes(V[i])); + region.put(p); + } + + // delete from the third entry back + // take that cell's timestamp + 1 to simulate a delete in a new tx + long deleteTs = V[5] + 1; + Delete d = new Delete(row, deleteTs); + LOG.info("Issuing delete at timestamp " + deleteTs); + // row deletes are not yet supported (TransactionAwareHTable normally handles this) + d.addColumns(familyBytes, columnBytes); + region.delete(d); + + List results = Lists.newArrayList(); + + // force a flush to clear the data + // during flush, we should drop the deleted version, but not the others + LOG.info("Flushing region " + region.getRegionInfo().getRegionNameAsString()); + region.flushcache(true, false, new FlushLifeCycleTracker() { }); + + // now a normal scan should return row with versions at: V[8], V[6]. + // V[7] is invalid and V[5] and prior are deleted. + Scan scan = new Scan(); + scan.setMaxVersions(10); + RegionScanner regionScanner = region.getScanner(scan); + // should be only one row + assertFalse(regionScanner.next(results)); + assertKeyValueMatches(results, 1, + new long[]{V[8], V[6], deleteTs}, + new byte[][]{Bytes.toBytes(V[8]), Bytes.toBytes(V[6]), new byte[0]}); + } finally { + //region.close(); + } + } + + @Test + public void testDeleteMarkerCleanup() throws Exception { + String tableName = "TestDeleteMarkerCleanup"; + byte[] familyBytes = Bytes.toBytes("f"); + HRegion region = createRegion(tableName, familyBytes, 0); + try { + region.initialize(); + + // all puts use a timestamp before the tx snapshot's visibility upper bound, making them eligible for removal + long writeTs = txVisibilityState.getVisibilityUpperBound() - 10; + // deletes are performed after the writes, but still before the visibility upper bound + long deleteTs = writeTs + 1; + // write separate columns to confirm that delete markers survive across flushes + byte[] row = Bytes.toBytes(100); + Put p = new Put(row); + + LOG.info("Writing columns at timestamp " + writeTs); + for (int i = 0; i < 5; i++) { + byte[] iBytes = Bytes.toBytes(i); + p.addColumn(familyBytes, iBytes, writeTs, iBytes); + } + region.put(p); + // read all back + Scan scan = new Scan(row); + RegionScanner regionScanner = region.getScanner(scan); + List results = Lists.newArrayList(); + assertFalse(regionScanner.next(results)); + + for (int i = 0; i < 5; i++) { + Cell cell = results.get(i); + assertArrayEquals(row, CellUtil.cloneRow(cell)); + byte[] idxBytes = Bytes.toBytes(i); + assertArrayEquals(idxBytes, CellUtil.cloneQualifier(cell)); + assertArrayEquals(idxBytes, CellUtil.cloneValue(cell)); + } + + // force a flush to clear the memstore + LOG.info("Before delete, flushing region " + region.getRegionInfo().getRegionNameAsString()); + region.flushcache(false, false, new FlushLifeCycleTracker() { }); + // delete the odd entries + for (int i = 0; i < 5; i++) { + if (i % 2 == 1) { + // deletes are performed as puts with empty values + Put deletePut = new Put(row); + deletePut.addColumn(familyBytes, Bytes.toBytes(i), deleteTs, new byte[0]); + region.put(deletePut); + } + } + + // read all back + scan = new Scan(row); + scan.readVersions(1); + scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState), + new TreeMap(), false, ScanType.USER_SCAN)); + regionScanner = region.getScanner(scan); + results = Lists.newArrayList(); + assertFalse(regionScanner.next(results)); + assertEquals(3, results.size()); + // only even columns should exist + for (int i = 0; i < 3; i++) { + Cell cell = results.get(i); + LOG.info("Got cell " + cell); + assertArrayEquals(row, CellUtil.cloneRow(cell)); + byte[] idxBytes = Bytes.toBytes(i * 2); + assertArrayEquals(idxBytes, CellUtil.cloneQualifier(cell)); + assertArrayEquals(idxBytes, CellUtil.cloneValue(cell)); + } + + // force another flush on the delete markers + // during flush, we should retain the delete markers, since they can only safely be dropped by a major compaction + LOG.info("After delete, flushing region " + region.getRegionInfo().getRegionNameAsString()); + FlushResultImpl flushResultImpl = region.flushcache(true, false, new FlushLifeCycleTracker() { }); + assertTrue(flushResultImpl + .getResult() == FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED + || flushResultImpl.getResult() == FlushResult.Result.FLUSHED_COMPACTION_NEEDED); + scan = new Scan(row); + scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState), + new TreeMap(), false, ScanType.USER_SCAN)); + scan.readVersions(1); + regionScanner = region.getScanner(scan); + results = Lists.newArrayList(); + assertFalse(regionScanner.next(results)); + assertEquals(3, results.size()); + // only even columns should exist + for (int i = 0; i < 3; i++) { + Cell cell = results.get(i); + assertArrayEquals(row, CellUtil.cloneRow(cell)); + byte[] idxBytes = Bytes.toBytes(i * 2); + assertArrayEquals(idxBytes, CellUtil.cloneQualifier(cell)); + assertArrayEquals(idxBytes, CellUtil.cloneValue(cell)); + } + + // force a major compaction + LOG.info("Forcing major compaction of region " + region.getRegionInfo().getRegionNameAsString()); + region.compact(true); + + // perform a raw scan (no filter) to confirm that the delete markers are now gone + scan = new Scan(row); + System.out.println("scan started"); + regionScanner = region.getScanner(scan); + results = Lists.newArrayList(); + assertFalse(regionScanner.next(results)); + assertEquals(3, results.size()); + // only even columns should exist + for (int i = 0; i < 3; i++) { + Cell cell = results.get(i); + assertArrayEquals(row, CellUtil.cloneRow(cell)); + byte[] idxBytes = Bytes.toBytes(i * 2); + assertArrayEquals(idxBytes, CellUtil.cloneQualifier(cell)); + assertArrayEquals(idxBytes, CellUtil.cloneValue(cell)); + } + } finally { +// region.close(); + } + } + + /** + * Test that we correctly preserve the timestamp set for column family delete markers. This is not + * directly required for the TransactionAwareHTable usage, but is the right thing to do and ensures + * that we make it easy to interoperate with other systems. + */ + @Test + public void testFamilyDeleteTimestamp() throws Exception { + String tableName = "TestFamilyDeleteTimestamp"; + byte[] family1Bytes = Bytes.toBytes("f1"); + byte[] columnBytes = Bytes.toBytes("c"); + byte[] rowBytes = Bytes.toBytes("row"); + byte[] valBytes = Bytes.toBytes("val"); + HRegion region = createRegion(tableName, family1Bytes, 0); + try { + region.initialize(); + + long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS; + Put p = new Put(rowBytes); + p.addColumn(family1Bytes, columnBytes, now - 10, valBytes); + region.put(p); + + // issue a family delete with an explicit timestamp + Delete delete = new Delete(rowBytes, now); + delete.addFamily(family1Bytes, now - 5); + region.delete(delete); + + // test that the delete marker preserved the timestamp + Scan scan = new Scan(); + scan.setMaxVersions(); + RegionScanner scanner = region.getScanner(scan); + List results = Lists.newArrayList(); + scanner.next(results); + assertEquals(2, results.size()); + // delete marker should appear first + Cell cell = results.get(0); + assertArrayEquals(new byte[0], CellUtil.cloneQualifier(cell)); + assertArrayEquals(new byte[0], CellUtil.cloneValue(cell)); + assertEquals(now - 5, cell.getTimestamp()); + // since this is an unfiltered scan against the region, the original put should be next + cell = results.get(1); + assertArrayEquals(valBytes, CellUtil.cloneValue(cell)); + assertEquals(now - 10, cell.getTimestamp()); + scanner.close(); + + + // with a filtered scan the original put should disappear + scan = new Scan(); + scan.setMaxVersions(); + scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState), + new TreeMap(), false, ScanType.USER_SCAN)); + scanner = region.getScanner(scan); + results = Lists.newArrayList(); + scanner.next(results); + assertEquals(0, results.size()); + scanner.close(); + } finally { + region.close(); + } + } + + @Test + public void testPreExistingData() throws Exception { + String tableName = "TestPreExistingData"; + byte[] familyBytes = Bytes.toBytes("f"); + long ttlMillis = TimeUnit.DAYS.toMillis(14); + HRegion region = createRegion(tableName, familyBytes, ttlMillis); + try { + region.initialize(); + + // timestamps for pre-existing, non-transactional data + long now = txVisibilityState.getVisibilityUpperBound() / TxConstants.MAX_TX_PER_MS; + long older = now - ttlMillis / 2; + long newer = now - ttlMillis / 3; + // timestamps for transactional data + long nowTx = txVisibilityState.getVisibilityUpperBound(); + long olderTx = nowTx - (ttlMillis / 2) * TxConstants.MAX_TX_PER_MS; + long newerTx = nowTx - (ttlMillis / 3) * TxConstants.MAX_TX_PER_MS; + + Map ttls = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + ttls.put(familyBytes, ttlMillis); + + List cells = new ArrayList<>(); + cells.add(new KeyValue(Bytes.toBytes("r1"), familyBytes, Bytes.toBytes("c1"), older, Bytes.toBytes("v11"))); + cells.add(new KeyValue(Bytes.toBytes("r1"), familyBytes, Bytes.toBytes("c2"), newer, Bytes.toBytes("v12"))); + cells.add(new KeyValue(Bytes.toBytes("r2"), familyBytes, Bytes.toBytes("c1"), older, Bytes.toBytes("v21"))); + cells.add(new KeyValue(Bytes.toBytes("r2"), familyBytes, Bytes.toBytes("c2"), newer, Bytes.toBytes("v22"))); + cells.add(new KeyValue(Bytes.toBytes("r3"), familyBytes, Bytes.toBytes("c1"), olderTx, Bytes.toBytes("v31"))); + cells.add(new KeyValue(Bytes.toBytes("r3"), familyBytes, Bytes.toBytes("c2"), newerTx, Bytes.toBytes("v32"))); + + // Write non-transactional and transactional data + for (Cell c : cells) { + region.put(new Put(CellUtil.cloneRow(c)).addColumn(CellUtil.cloneFamily(c), CellUtil.cloneQualifier(c), + c.getTimestamp(), CellUtil.cloneValue(c))); + } + + Scan rawScan = new Scan(); + rawScan.setMaxVersions(); + + Transaction dummyTransaction = TxUtils.createDummyTransaction(txVisibilityState); + Scan txScan = new Scan(); + txScan.setMaxVersions(); + txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true), + TxUtils.getMaxVisibleTimestamp(dummyTransaction)); + txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls, false, ScanType.USER_SCAN)); + + // read all back with raw scanner + scanAndAssert(region, cells, rawScan); + + // read all back with transaction filter + scanAndAssert(region, cells, txScan); + + // force a flush to clear the memstore + region.flushcache(true, false, new FlushLifeCycleTracker() { }); + scanAndAssert(region, cells, txScan); + + // force a major compaction to remove any expired cells + region.compact(true); + scanAndAssert(region, cells, txScan); + + // Reduce TTL, this should make cells with timestamps older and olderTx expire + long newTtl = ttlMillis / 2 - 1; + region = updateTtl(region, familyBytes, newTtl); + ttls.put(familyBytes, newTtl); + txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true), + TxUtils.getMaxVisibleTimestamp(dummyTransaction)); + txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls, false, ScanType.USER_SCAN)); + + // Raw scan should still give all cells + scanAndAssert(region, cells, rawScan); + // However, tx scan should not return expired cells + scanAndAssert(region, select(cells, 1, 3, 5), txScan); + + region.flushcache(true, false, new FlushLifeCycleTracker() { }); + scanAndAssert(region, cells, rawScan); + + // force a major compaction to remove any expired cells + region.compact(true); + // This time raw scan too should not return expired cells, as they would be dropped during major compaction + scanAndAssert(region, select(cells, 1, 3, 5), rawScan); + + // Reduce TTL again to 1 ms, this should expire all cells + newTtl = 1; + region = updateTtl(region, familyBytes, newTtl); + ttls.put(familyBytes, newTtl); + txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true), + TxUtils.getMaxVisibleTimestamp(dummyTransaction)); + txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls, false, ScanType.USER_SCAN)); + + // force a major compaction to remove expired cells + region.compact(true); + // This time raw scan should not return any cells, as all cells have expired. + scanAndAssert(region, Collections.emptyList(), rawScan); + } finally { + region.close(); + } + } + + private List select(List cells, int... indexes) { + List newCells = new ArrayList<>(); + for (int i : indexes) { + newCells.add(cells.get(i)); + } + return newCells; + } + + @SuppressWarnings("StatementWithEmptyBody") + private void scanAndAssert(HRegion region, List expected, Scan scan) throws Exception { + try (RegionScanner regionScanner = region.getScanner(scan)) { + List results = Lists.newArrayList(); + while (regionScanner.next(results)) { } + assertEquals(expected, results); + } + } + + private HRegion updateTtl(HRegion region, byte[] family, long ttl) throws Exception { + region.close(); + TableDescriptorBuilder tableBuilder = + TableDescriptorBuilder.newBuilder(region.getTableDescriptor()); + ColumnFamilyDescriptorBuilder cfd = + ColumnFamilyDescriptorBuilder + .newBuilder(tableBuilder.build().getColumnFamily(family)); + if (ttl > 0) { + cfd.setValue(Bytes.toBytes(TxConstants.PROPERTY_TTL), + Bytes.toBytes(String.valueOf(ttl))); + } + cfd.setMaxVersions(10); + tableBuilder.removeColumnFamily(family); --- End diff -- Actually, when we alter the existing column family and then we need to remove and add it otherwise builder will say column family already exists ---