tephra-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From poornachandra <...@git.apache.org>
Subject [GitHub] incubator-tephra pull request #67: TEPHRA-272 Add HBase 2.0 compatibility mo...
Date Tue, 09 Jan 2018 13:39:50 GMT
Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/67#discussion_r160406873
  
    --- Diff: tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
---
    @@ -0,0 +1,677 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *      http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tephra.hbase.coprocessor;
    +
    +import com.google.common.collect.ImmutableSortedMap;
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +
    +import it.unimi.dsi.fastutil.longs.LongArrayList;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.CellUtil;
    +import org.apache.hadoop.hbase.ChoreService;
    +import org.apache.hadoop.hbase.HBaseConfiguration;
    +import org.apache.hadoop.hbase.HColumnDescriptor;
    +import org.apache.hadoop.hbase.HConstants;
    +import org.apache.hadoop.hbase.HRegionInfo;
    +import org.apache.hadoop.hbase.HTableDescriptor;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.MockRegionServerServices;
    +import org.apache.hadoop.hbase.ServerName;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
    +import org.apache.hadoop.hbase.client.Delete;
    +import org.apache.hadoop.hbase.client.Put;
    +import org.apache.hadoop.hbase.client.Scan;
    +import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
    +import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
    +import org.apache.hadoop.hbase.regionserver.ChunkCreator;
    +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
    +import org.apache.hadoop.hbase.regionserver.HRegion;
    +import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
    +import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
    +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
    +import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
    +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
    +import org.apache.hadoop.hbase.regionserver.RegionScanner;
    +import org.apache.hadoop.hbase.regionserver.ScanType;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.hadoop.hbase.util.FSUtils;
    +import org.apache.hadoop.hbase.wal.WAL;
    +import org.apache.hadoop.hbase.wal.WALFactory;
    +import org.apache.hadoop.hdfs.MiniDFSCluster;
    +import org.apache.tephra.Transaction;
    +import org.apache.tephra.TransactionManager;
    +import org.apache.tephra.TxConstants;
    +import org.apache.tephra.coprocessor.TransactionStateCache;
    +import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
    +import org.apache.tephra.manager.InvalidTxList;
    +import org.apache.tephra.metrics.TxMetricsCollector;
    +import org.apache.tephra.persist.HDFSTransactionStateStorage;
    +import org.apache.tephra.persist.TransactionSnapshot;
    +import org.apache.tephra.persist.TransactionVisibilityState;
    +import org.apache.tephra.snapshot.DefaultSnapshotCodec;
    +import org.apache.tephra.snapshot.SnapshotCodecProvider;
    +import org.apache.tephra.util.TxUtils;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.lang.management.ManagementFactory;
    +import java.net.InetAddress;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.TreeMap;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.junit.Assert.assertArrayEquals;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertTrue;
    +
    +/**
    + * Tests filtering of invalid transaction data by the {@link TransactionProcessor} coprocessor.
    + */
    +public class TransactionProcessorTest {
    +  private static final Logger LOG = LoggerFactory.getLogger(TransactionProcessorTest.class);
    +  protected static ChunkCreator chunkCreator;
    +  // 8 versions, 1 hour apart, latest is current ts.
    +  private static final long[] V;
    +
    +  static {
    +    long now = System.currentTimeMillis();
    +    V = new long[9];
    +    for (int i = 0; i < V.length; i++) {
    +      V[i] = (now - TimeUnit.HOURS.toMillis(8 - i)) * TxConstants.MAX_TX_PER_MS;
    +    }
    +  }
    +
    +  @ClassRule
    +  public static TemporaryFolder tmpFolder = new TemporaryFolder();
    +  private static MiniDFSCluster dfsCluster;
    +  private static Configuration conf;
    +  private static LongArrayList invalidSet = new LongArrayList(new long[]{V[3], V[5],
V[7]});
    +  private static TransactionVisibilityState txVisibilityState;
    +
    +  @BeforeClass
    +  public static void setupBeforeClass() throws Exception {
    +    Configuration hConf = new Configuration();
    +    String rootDir = tmpFolder.newFolder().getAbsolutePath();
    +    hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, rootDir);
    +    hConf.set(HConstants.HBASE_DIR, rootDir + "/hbase");
    +
    +    dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build();
    +    dfsCluster.waitActive();
    +    conf = HBaseConfiguration.create(dfsCluster.getFileSystem().getConf());
    +
    +    conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
    +    conf.unset(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES);
    +    String localTestDir = tmpFolder.newFolder().getAbsolutePath();
    +    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, localTestDir);
    +    conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES, DefaultSnapshotCodec.class.getName());
    +
    +    // write an initial transaction snapshot
    +    InvalidTxList invalidTxList = new InvalidTxList();
    +    invalidTxList.addAll(invalidSet);
    +    TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom(
    +        System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList,
    +        // this will set visibility upper bound to V[6]
    +        Maps.newTreeMap(ImmutableSortedMap.of(V[6], new TransactionManager.InProgressTx(
    +          V[6] - 1, Long.MAX_VALUE, TransactionManager.InProgressType.SHORT))),
    +        new HashMap<Long, TransactionManager.ChangeSet>(), new TreeMap<Long,
TransactionManager.ChangeSet>());
    +    txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(), txSnapshot.getReadPointer(),
    +                                                txSnapshot.getWritePointer(), txSnapshot.getInvalid(),
    +                                                txSnapshot.getInProgress());
    +    HDFSTransactionStateStorage tmpStorage =
    +      new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
    +    tmpStorage.startAndWait();
    +    tmpStorage.writeSnapshot(txSnapshot);
    +    tmpStorage.stopAndWait();
    +    long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
    +            .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
    +    chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
    +            globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
    +    assertTrue(chunkCreator != null);
    +  }
    +
    +  @AfterClass
    +  public static void shutdownAfterClass() throws Exception {
    +    dfsCluster.shutdown();
    +  }
    +
    +  @Test
    +  public void testDataJanitorRegionScanner() throws Exception {
    +    String tableName = "TestRegionScanner";
    +    byte[] familyBytes = Bytes.toBytes("f");
    +    byte[] columnBytes = Bytes.toBytes("c");
    +    HRegion region = createRegion(tableName, familyBytes, TimeUnit.HOURS.toMillis(3));
    +    try {
    +      region.initialize();
    +      TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
    +      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
    +
    +      for (int i = 1; i <= 8; i++) {
    +        for (int k = 1; k <= i; k++) {
    +          Put p = new Put(Bytes.toBytes(i));
    +          p.addColumn(familyBytes, columnBytes, V[k], Bytes.toBytes(V[k]));
    +          region.put(p);
    +        }
    +      }
    +
    +      List<Cell> results = Lists.newArrayList();
    +
    +      // force a flush to clear the data
    +      // during flush, the coprocessor should drop all KeyValues with timestamps in the
invalid set
    +
    +      LOG.info("Flushing region " + region.getRegionInfo().getRegionNameAsString());
    +      FlushResultImpl flushResult = region.flushcache(true, false, new FlushLifeCycleTracker()
{ });
    +      Assert.assertTrue("Unexpected flush result: " + flushResult, flushResult.isFlushSucceeded());
    +
    +      // now a normal scan should only return the valid rows
    +      // do not use a filter here to test that cleanup works on flush
    +      Scan scan = new Scan();
    +      scan.setMaxVersions(10);
    +      RegionScanner regionScanner = region.getScanner(scan);
    +
    +      // first returned value should be "4" with version "4"
    +      results.clear();
    +      assertTrue(regionScanner.next(results));
    +      assertKeyValueMatches(results, 4, new long[]{V[4]});
    +
    +      results.clear();
    +      assertTrue(regionScanner.next(results));
    +      assertKeyValueMatches(results, 5, new long[] {V[4]});
    +
    +      results.clear();
    +      assertTrue(regionScanner.next(results));
    +      assertKeyValueMatches(results, 6, new long[]{V[6], V[4]});
    +
    +      results.clear();
    +      assertTrue(regionScanner.next(results));
    +      assertKeyValueMatches(results, 7, new long[]{V[6], V[4]});
    +
    +      results.clear();
    +      assertFalse(regionScanner.next(results));
    +      assertKeyValueMatches(results, 8, new long[] {V[8], V[6], V[4]});
    +    } finally {
    +      //region.close();
    +    }
    +  }
    +
    +  @Test
    +  public void testDeleteFiltering() throws Exception {
    +    String tableName = "TestDeleteFiltering";
    +    byte[] familyBytes = Bytes.toBytes("f");
    +    byte[] columnBytes = Bytes.toBytes("c");
    +    HRegion region = createRegion(tableName, familyBytes, 0);
    +    try {
    +      region.initialize();
    +      TransactionStateCache cache = new TransactionStateCacheSupplier(conf).get();
    +      LOG.info("Coprocessor is using transaction state: " + waitForTransactionState(cache));
    +
    +      byte[] row = Bytes.toBytes(1);
    +      for (int i = 4; i < V.length; i++) {
    +        Put p = new Put(row);
    +        p.addColumn(familyBytes, columnBytes, V[i], Bytes.toBytes(V[i]));
    +        region.put(p);
    +      }
    +
    +      // delete from the third entry back
    +      // take that cell's timestamp + 1 to simulate a delete in a new tx
    +      long deleteTs = V[5] + 1;
    +      Delete d = new Delete(row, deleteTs);
    +      LOG.info("Issuing delete at timestamp " + deleteTs);
    +      // row deletes are not yet supported (TransactionAwareHTable normally handles this)
    +      d.addColumns(familyBytes, columnBytes);
    +      region.delete(d);
    +
    +      List<Cell> results = Lists.newArrayList();
    +
    +      // force a flush to clear the data
    +      // during flush, we should drop the deleted version, but not the others
    +      LOG.info("Flushing region " + region.getRegionInfo().getRegionNameAsString());
    +      region.flushcache(true, false, new FlushLifeCycleTracker() { });
    +
    +      // now a normal scan should return row with versions at: V[8], V[6].
    +      // V[7] is invalid and V[5] and prior are deleted.
    +      Scan scan = new Scan();
    +      scan.setMaxVersions(10);
    +      RegionScanner regionScanner = region.getScanner(scan);
    +      // should be only one row
    +      assertFalse(regionScanner.next(results));
    +      assertKeyValueMatches(results, 1,
    +          new long[]{V[8], V[6], deleteTs},
    +          new byte[][]{Bytes.toBytes(V[8]), Bytes.toBytes(V[6]), new byte[0]});
    +    } finally {
    +      //region.close();
    +    }
    +  }
    +
    +  @Test
    +  public void testDeleteMarkerCleanup() throws Exception {
    +    String tableName = "TestDeleteMarkerCleanup";
    +    byte[] familyBytes = Bytes.toBytes("f");
    +    HRegion region = createRegion(tableName, familyBytes, 0);
    +    try {
    +      region.initialize();
    +
    +      // all puts use a timestamp before the tx snapshot's visibility upper bound, making
them eligible for removal
    +      long writeTs = txVisibilityState.getVisibilityUpperBound() - 10;
    +      // deletes are performed after the writes, but still before the visibility upper
bound
    +      long deleteTs = writeTs + 1;
    +      // write separate columns to confirm that delete markers survive across flushes
    +      byte[] row = Bytes.toBytes(100);
    +      Put p = new Put(row);
    +
    +      LOG.info("Writing columns at timestamp " + writeTs);
    +      for (int i = 0; i < 5; i++) {
    +        byte[] iBytes = Bytes.toBytes(i);
    +        p.addColumn(familyBytes, iBytes, writeTs, iBytes);
    +      }
    +      region.put(p);
    +      // read all back
    +      Scan scan = new Scan(row);
    +      RegionScanner regionScanner = region.getScanner(scan);
    +      List<Cell> results = Lists.newArrayList();
    +      assertFalse(regionScanner.next(results));
    +      
    +      for (int i = 0; i < 5; i++) {
    +        Cell cell = results.get(i);
    +        assertArrayEquals(row, CellUtil.cloneRow(cell));
    +        byte[] idxBytes = Bytes.toBytes(i);
    +        assertArrayEquals(idxBytes, CellUtil.cloneQualifier(cell));
    +        assertArrayEquals(idxBytes, CellUtil.cloneValue(cell));
    +      }
    +
    +      // force a flush to clear the memstore
    +      LOG.info("Before delete, flushing region " + region.getRegionInfo().getRegionNameAsString());
    +      region.flushcache(false, false, new FlushLifeCycleTracker() { });
    +      // delete the odd entries
    +      for (int i = 0; i < 5; i++) {
    +        if (i % 2 == 1) {
    +          // deletes are performed as puts with empty values
    +          Put deletePut = new Put(row);
    +          deletePut.addColumn(familyBytes, Bytes.toBytes(i), deleteTs, new byte[0]);
    +          region.put(deletePut);
    +        }
    +      }
    +      
    +      // read all back
    +      scan = new Scan(row);
    +      scan.readVersions(1);
    +      scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState),
    +                                                            new TreeMap<byte[], Long>(),
false, ScanType.USER_SCAN));
    +      regionScanner = region.getScanner(scan);
    +      results = Lists.newArrayList();
    +      assertFalse(regionScanner.next(results));
    +      assertEquals(3, results.size());
    +      // only even columns should exist
    +      for (int i = 0; i < 3; i++) {
    +        Cell cell = results.get(i);
    +        LOG.info("Got cell " + cell);
    +        assertArrayEquals(row, CellUtil.cloneRow(cell));
    +        byte[] idxBytes = Bytes.toBytes(i * 2);
    +        assertArrayEquals(idxBytes, CellUtil.cloneQualifier(cell));
    +        assertArrayEquals(idxBytes, CellUtil.cloneValue(cell));
    +      }
    +
    +      // force another flush on the delete markers
    +      // during flush, we should retain the delete markers, since they can only safely
be dropped by a major compaction
    +      LOG.info("After delete, flushing region " + region.getRegionInfo().getRegionNameAsString());
    +      FlushResultImpl flushResultImpl = region.flushcache(true, false, new FlushLifeCycleTracker()
{ });
    +      assertTrue(flushResultImpl
    +                    .getResult() == FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED
    +                    || flushResultImpl.getResult() == FlushResult.Result.FLUSHED_COMPACTION_NEEDED);
    +      scan = new Scan(row);
    +      scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState),
    +                                                            new TreeMap<byte[], Long>(),
false, ScanType.USER_SCAN));
    +      scan.readVersions(1);
    +      regionScanner = region.getScanner(scan);
    +      results = Lists.newArrayList();
    +      assertFalse(regionScanner.next(results));
    +      assertEquals(3, results.size());
    +      // only even columns should exist
    +      for (int i = 0; i < 3; i++) {
    +        Cell cell = results.get(i);
    +        assertArrayEquals(row, CellUtil.cloneRow(cell));
    +        byte[] idxBytes = Bytes.toBytes(i * 2);
    +        assertArrayEquals(idxBytes, CellUtil.cloneQualifier(cell));
    +        assertArrayEquals(idxBytes, CellUtil.cloneValue(cell));
    +      }
    +
    +      // force a major compaction
    +      LOG.info("Forcing major compaction of region " + region.getRegionInfo().getRegionNameAsString());
    +      region.compact(true);
    +
    +      // perform a raw scan (no filter) to confirm that the delete markers are now gone
    +      scan = new Scan(row);
    +      System.out.println("scan started");
    +      regionScanner = region.getScanner(scan);
    +      results = Lists.newArrayList();
    +      assertFalse(regionScanner.next(results));
    +      assertEquals(3, results.size());
    +      // only even columns should exist
    +      for (int i = 0; i < 3; i++) {
    +        Cell cell = results.get(i);
    +        assertArrayEquals(row, CellUtil.cloneRow(cell));
    +        byte[] idxBytes = Bytes.toBytes(i * 2);
    +        assertArrayEquals(idxBytes, CellUtil.cloneQualifier(cell));
    +        assertArrayEquals(idxBytes, CellUtil.cloneValue(cell));
    +      }
    +    } finally {
    +//      region.close();
    +    }
    +  }
    +
    +  /**
    +   * Test that we correctly preserve the timestamp set for column family delete markers.
 This is not
    +   * directly required for the TransactionAwareHTable usage, but is the right thing to
do and ensures
    +   * that we make it easy to interoperate with other systems.
    +   */
    +  @Test
    +  public void testFamilyDeleteTimestamp() throws Exception {
    +    String tableName = "TestFamilyDeleteTimestamp";
    +    byte[] family1Bytes = Bytes.toBytes("f1");
    +    byte[] columnBytes = Bytes.toBytes("c");
    +    byte[] rowBytes = Bytes.toBytes("row");
    +    byte[] valBytes = Bytes.toBytes("val");
    +    HRegion region = createRegion(tableName, family1Bytes, 0);
    +    try {
    +      region.initialize();
    +
    +      long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS;
    +      Put p = new Put(rowBytes);
    +      p.addColumn(family1Bytes, columnBytes, now - 10, valBytes);
    +      region.put(p);
    +
    +      // issue a family delete with an explicit timestamp
    +      Delete delete = new Delete(rowBytes, now);
    +      delete.addFamily(family1Bytes, now - 5);
    +      region.delete(delete);
    +
    +      // test that the delete marker preserved the timestamp
    +      Scan scan = new Scan();
    +      scan.setMaxVersions();
    +      RegionScanner scanner = region.getScanner(scan);
    +      List<Cell> results = Lists.newArrayList();
    +      scanner.next(results);
    +      assertEquals(2, results.size());
    +      // delete marker should appear first
    +      Cell cell = results.get(0);
    +      assertArrayEquals(new byte[0], CellUtil.cloneQualifier(cell));
    +      assertArrayEquals(new byte[0], CellUtil.cloneValue(cell));
    +      assertEquals(now - 5, cell.getTimestamp());
    +      // since this is an unfiltered scan against the region, the original put should
be next
    +      cell = results.get(1);
    +      assertArrayEquals(valBytes, CellUtil.cloneValue(cell));
    +      assertEquals(now - 10, cell.getTimestamp());
    +      scanner.close();
    +
    +
    +      // with a filtered scan the original put should disappear
    +      scan = new Scan();
    +      scan.setMaxVersions();
    +      scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState),
    +                                                            new TreeMap<byte[], Long>(),
false, ScanType.USER_SCAN));
    +      scanner = region.getScanner(scan);
    +      results = Lists.newArrayList();
    +      scanner.next(results);
    +      assertEquals(0, results.size());
    +      scanner.close();
    +    } finally {
    +      region.close();
    +    }
    +  }
    +
    +  @Test
    +  public void testPreExistingData() throws Exception {
    +    String tableName = "TestPreExistingData";
    +    byte[] familyBytes = Bytes.toBytes("f");
    +    long ttlMillis = TimeUnit.DAYS.toMillis(14);
    +    HRegion region = createRegion(tableName, familyBytes, ttlMillis);
    +    try {
    +      region.initialize();
    +
    +      // timestamps for pre-existing, non-transactional data
    +      long now = txVisibilityState.getVisibilityUpperBound() / TxConstants.MAX_TX_PER_MS;
    +      long older = now - ttlMillis / 2;
    +      long newer = now - ttlMillis / 3;
    +      // timestamps for transactional data
    +      long nowTx = txVisibilityState.getVisibilityUpperBound();
    +      long olderTx = nowTx - (ttlMillis / 2) * TxConstants.MAX_TX_PER_MS;
    +      long newerTx = nowTx - (ttlMillis / 3) * TxConstants.MAX_TX_PER_MS;
    +
    +      Map<byte[], Long> ttls = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
    +      ttls.put(familyBytes, ttlMillis);
    +
    +      List<Cell> cells = new ArrayList<>();
    +      cells.add(new KeyValue(Bytes.toBytes("r1"), familyBytes, Bytes.toBytes("c1"), older,
Bytes.toBytes("v11")));
    +      cells.add(new KeyValue(Bytes.toBytes("r1"), familyBytes, Bytes.toBytes("c2"), newer,
Bytes.toBytes("v12")));
    +      cells.add(new KeyValue(Bytes.toBytes("r2"), familyBytes, Bytes.toBytes("c1"), older,
Bytes.toBytes("v21")));
    +      cells.add(new KeyValue(Bytes.toBytes("r2"), familyBytes, Bytes.toBytes("c2"), newer,
Bytes.toBytes("v22")));
    +      cells.add(new KeyValue(Bytes.toBytes("r3"), familyBytes, Bytes.toBytes("c1"), olderTx,
Bytes.toBytes("v31")));
    +      cells.add(new KeyValue(Bytes.toBytes("r3"), familyBytes, Bytes.toBytes("c2"), newerTx,
Bytes.toBytes("v32")));
    +
    +      // Write non-transactional and transactional data
    +      for (Cell c : cells) {
    +        region.put(new Put(CellUtil.cloneRow(c)).addColumn(CellUtil.cloneFamily(c), CellUtil.cloneQualifier(c),
    +                          c.getTimestamp(), CellUtil.cloneValue(c)));
    +      }
    +
    +      Scan rawScan = new Scan();
    +      rawScan.setMaxVersions();
    +
    +      Transaction dummyTransaction = TxUtils.createDummyTransaction(txVisibilityState);
    +      Scan txScan = new Scan();
    +      txScan.setMaxVersions();
    +      txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true),
    +                          TxUtils.getMaxVisibleTimestamp(dummyTransaction));
    +      txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls,
false, ScanType.USER_SCAN));
    +
    +      // read all back with raw scanner
    +      scanAndAssert(region, cells, rawScan);
    +
    +      // read all back with transaction filter
    +      scanAndAssert(region, cells, txScan);
    +
    +      // force a flush to clear the memstore
    +      region.flushcache(true, false, new FlushLifeCycleTracker() { });
    +      scanAndAssert(region, cells, txScan);
    +
    +      // force a major compaction to remove any expired cells
    +      region.compact(true);
    +      scanAndAssert(region, cells, txScan);
    +
    +      // Reduce TTL, this should make cells with timestamps older and olderTx expire
    +      long newTtl = ttlMillis / 2 - 1;
    +      region = updateTtl(region, familyBytes, newTtl);
    +      ttls.put(familyBytes, newTtl);
    +      txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true),
    +                          TxUtils.getMaxVisibleTimestamp(dummyTransaction));
    +      txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls,
false, ScanType.USER_SCAN));
    +
    +      // Raw scan should still give all cells
    +      scanAndAssert(region, cells, rawScan);
    +      // However, tx scan should not return expired cells
    +      scanAndAssert(region, select(cells, 1, 3, 5), txScan);
    +
    +      region.flushcache(true, false, new FlushLifeCycleTracker() { });
    +      scanAndAssert(region, cells, rawScan);
    +
    +      // force a major compaction to remove any expired cells
    +      region.compact(true);
    +      // This time raw scan too should not return expired cells, as they would be dropped
during major compaction
    +      scanAndAssert(region, select(cells, 1, 3, 5), rawScan);
    +
    +      // Reduce TTL again to 1 ms, this should expire all cells
    +      newTtl = 1;
    +      region = updateTtl(region, familyBytes, newTtl);
    +      ttls.put(familyBytes, newTtl);
    +      txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls, dummyTransaction, true),
    +                          TxUtils.getMaxVisibleTimestamp(dummyTransaction));
    +      txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls,
false, ScanType.USER_SCAN));
    +
    +      // force a major compaction to remove expired cells
    +      region.compact(true);
    +      // This time raw scan should not return any cells, as all cells have expired.
    +      scanAndAssert(region, Collections.<Cell>emptyList(), rawScan);
    +    } finally {
    +      region.close();
    +    }
    +  }
    +
    +  private List<Cell> select(List<Cell> cells, int... indexes) {
    +    List<Cell> newCells = new ArrayList<>();
    +    for (int i : indexes) {
    +      newCells.add(cells.get(i));
    +    }
    +    return newCells;
    +  }
    +
    +  @SuppressWarnings("StatementWithEmptyBody")
    +  private void scanAndAssert(HRegion region, List<Cell> expected, Scan scan) throws
Exception {
    +    try (RegionScanner regionScanner = region.getScanner(scan)) {
    +      List<Cell> results = Lists.newArrayList();
    +      while (regionScanner.next(results)) { }
    +      assertEquals(expected, results);
    +    }
    +  }
    +
    +    private HRegion updateTtl(HRegion region, byte[] family, long ttl) throws Exception
{
    --- End diff --
    
    This method needs to use 2 spaces for indentation.


---

Mime
View raw message