cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [1/2] cassandra git commit: Garbage-collecting compaction operation and schema option.
Date Mon, 01 Aug 2016 12:29:19 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 293908060 -> d40ac784d


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/test/long/org/apache/cassandra/cql3/GcCompactionBench.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/cql3/GcCompactionBench.java b/test/long/org/apache/cassandra/cql3/GcCompactionBench.java
new file mode 100644
index 0000000..ca39b55
--- /dev/null
+++ b/test/long/org/apache/cassandra/cql3/GcCompactionBench.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+
+import com.google.common.collect.Iterables;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.config.Config.CommitLogSync;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class GcCompactionBench extends CQLTester
+{
+    private static final String SIZE_TIERED_STRATEGY = "SizeTieredCompactionStrategy', 'min_sstable_size' : '0";
+    private static final String LEVELED_STRATEGY = "LeveledCompactionStrategy', 'sstable_size_in_mb' : '16";
+
+    private static final int DEL_SECTIONS = 1000;
+    private static final int FLUSH_FREQ = 10000;
+    private static final int RANGE_FREQUENCY_INV = 16;
+    static final int COUNT = 90000;
+    static final int ITERS = 9;
+
+    static final int KEY_RANGE = 10;
+    static final int CLUSTERING_RANGE = 210000;
+
+    static final int EXTRA_SIZE = 1025;
+
+    // The name of this method is important!
+    // CommitLog settings must be applied before CQLTester sets up; by using the same name as its @BeforeClass method we
+    // are effectively overriding it.
+    @BeforeClass
+    public static void setUpClass()     // overrides CQLTester.setUpClass()
+    {
+        DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic);
+        DatabaseDescriptor.setCommitLogSyncPeriod(100);
+        CQLTester.setUpClass();
+    }
+
+    String hashQuery;
+
+    @Before
+    public void before() throws Throwable
+    {
+        createTable("CREATE TABLE %s(" +
+                    "  key int," +
+                    "  column int," +
+                    "  data int," +
+                    "  extra text," +
+                    "  PRIMARY KEY(key, column)" +
+                    ")"
+                   );
+
+        String hashIFunc = parseFunctionName(createFunction(KEYSPACE, "int, int",
+                " CREATE FUNCTION %s (state int, val int)" +
+                " CALLED ON NULL INPUT" +
+                " RETURNS int" +
+                " LANGUAGE java" +
+                " AS 'return val != null ? state * 17 + val : state;'")).name;
+        String hashTFunc = parseFunctionName(createFunction(KEYSPACE, "int, text",
+                " CREATE FUNCTION %s (state int, val text)" +
+                " CALLED ON NULL INPUT" +
+                " RETURNS int" +
+                " LANGUAGE java" +
+                " AS 'return val != null ? state * 17 + val.hashCode() : state;'")).name;
+
+        String hashInt = createAggregate(KEYSPACE, "int",
+                " CREATE AGGREGATE %s (int)" +
+                " SFUNC " + hashIFunc +
+                " STYPE int" +
+                " INITCOND 1");
+        String hashText = createAggregate(KEYSPACE, "text",
+                " CREATE AGGREGATE %s (text)" +
+                " SFUNC " + hashTFunc +
+                " STYPE int" +
+                " INITCOND 1");
+
+        hashQuery = String.format("SELECT count(column), %s(key), %s(column), %s(data), %s(extra), avg(key), avg(column), avg(data) FROM %%s",
+                                  hashInt, hashInt, hashInt, hashText);
+    }
+    AtomicLong id = new AtomicLong();
+    long compactionTimeNanos = 0;
+
+    void pushData(Random rand, int count) throws Throwable
+    {
+        for (int i = 0; i < count; ++i)
+        {
+            long ii = id.incrementAndGet();
+            if (ii % 1000 == 0)
+                System.out.print('.');
+            int key = rand.nextInt(KEY_RANGE);
+            int column = rand.nextInt(CLUSTERING_RANGE);
+            execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", key, column, (int) ii, genExtra(rand));
+            maybeCompact(ii);
+        }
+    }
+
+    private String genExtra(Random rand)
+    {
+        StringBuilder builder = new StringBuilder(EXTRA_SIZE);
+        for (int i = 0; i < EXTRA_SIZE; ++i)
+            builder.append((char) ('a' + rand.nextInt('z' - 'a' + 1)));
+        return builder.toString();
+    }
+
+    void deleteData(Random rand, int count) throws Throwable
+    {
+        for (int i = 0; i < count; ++i)
+        {
+            int key;
+            UntypedResultSet res;
+            long ii = id.incrementAndGet();
+            if (ii % 1000 == 0)
+                System.out.print('-');
+            if (rand.nextInt(RANGE_FREQUENCY_INV) != 1)
+            {
+                do
+                {
+                    key = rand.nextInt(KEY_RANGE);
+                    long cid = rand.nextInt(DEL_SECTIONS);
+                    int cstart = (int) (cid * CLUSTERING_RANGE / DEL_SECTIONS);
+                    int cend = (int) ((cid + 1) * CLUSTERING_RANGE / DEL_SECTIONS);
+                    res = execute("SELECT column FROM %s WHERE key = ? AND column >= ? AND column < ? LIMIT 1", key, cstart, cend);
+                } while (res.size() == 0);
+                UntypedResultSet.Row r = Iterables.get(res, rand.nextInt(res.size()));
+                int clustering = r.getInt("column");
+                execute("DELETE FROM %s WHERE key = ? AND column = ?", key, clustering);
+            }
+            else
+            {
+                key = rand.nextInt(KEY_RANGE);
+                long cid = rand.nextInt(DEL_SECTIONS);
+                int cstart = (int) (cid * CLUSTERING_RANGE / DEL_SECTIONS);
+                int cend = (int) ((cid + 1) * CLUSTERING_RANGE / DEL_SECTIONS);
+                res = execute("DELETE FROM %s WHERE key = ? AND column >= ? AND column < ?", key, cstart, cend);
+            }
+            maybeCompact(ii);
+        }
+    }
+
+    private void maybeCompact(long ii)
+    {
+        if (ii % FLUSH_FREQ == 0)
+        {
+            System.out.print("F");
+            flush();
+            if (ii % (FLUSH_FREQ * 10) == 0)
+            {
+                System.out.println("C");
+                long startTime = System.nanoTime();
+                getCurrentColumnFamilyStore().enableAutoCompaction(true);
+                long endTime = System.nanoTime();
+                compactionTimeNanos += endTime - startTime;
+                getCurrentColumnFamilyStore().disableAutoCompaction();
+            }
+        }
+    }
+
+    public void testGcCompaction(TombstoneOption tombstoneOption, TombstoneOption backgroundTombstoneOption, String compactionClass) throws Throwable
+    {
+        id.set(0);
+        compactionTimeNanos = 0;
+        alterTable("ALTER TABLE %s WITH compaction = { 'class' :  '" + compactionClass + "', 'provide_overlapping_tombstones' : '" + backgroundTombstoneOption + "'  };");
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        cfs.disableAutoCompaction();
+
+        long onStartTime = System.currentTimeMillis();
+        ExecutorService es = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
+        List<Future<?>> tasks = new ArrayList<>();
+        for (int ti = 0; ti < 1; ++ti)
+        {
+            Random rand = new Random(ti);
+            tasks.add(es.submit(() -> 
+            {
+                for (int i = 0; i < ITERS; ++i)
+                    try
+                    {
+                        pushData(rand, COUNT);
+                        deleteData(rand, COUNT / 3);
+                    }
+                    catch (Throwable e)
+                    {
+                        throw new AssertionError(e);
+                    }
+            }));
+        }
+        for (Future<?> task : tasks)
+            task.get();
+
+        flush();
+        long onEndTime = System.currentTimeMillis();
+        int startRowCount = countRows(cfs);
+        int startTombCount = countTombstoneMarkers(cfs);
+        int startRowDeletions = countRowDeletions(cfs);
+        int startTableCount = cfs.getLiveSSTables().size();
+        long startSize = SSTableReader.getTotalBytes(cfs.getLiveSSTables());
+        System.out.println();
+
+        String hashesBefore = getHashes();
+
+        long startTime = System.currentTimeMillis();
+        CompactionManager.instance.performGarbageCollection(cfs, tombstoneOption, 0);
+        long endTime = System.currentTimeMillis();
+
+        int endRowCount = countRows(cfs);
+        int endTombCount = countTombstoneMarkers(cfs);
+        int endRowDeletions = countRowDeletions(cfs);
+        int endTableCount = cfs.getLiveSSTables().size();
+        long endSize = SSTableReader.getTotalBytes(cfs.getLiveSSTables());
+
+        System.out.println(cfs.getCompactionParametersJson());
+        System.out.println(String.format("%s compactions completed in %.3fs",
+                tombstoneOption.toString(), (endTime - startTime) * 1e-3));
+        System.out.println(String.format("Operations completed in %.3fs, out of which %.3f for ongoing " + backgroundTombstoneOption + " background compactions",
+                (onEndTime - onStartTime) * 1e-3, compactionTimeNanos * 1e-9));
+        System.out.println(String.format("At start: %12d tables %12d bytes %12d rows %12d deleted rows %12d tombstone markers",
+                startTableCount, startSize, startRowCount, startRowDeletions, startTombCount));
+        System.out.println(String.format("At end:   %12d tables %12d bytes %12d rows %12d deleted rows %12d tombstone markers",
+                endTableCount, endSize, endRowCount, endRowDeletions, endTombCount));
+
+        String hashesAfter = getHashes();
+        Assert.assertEquals(hashesBefore, hashesAfter);
+    }
+
+    private String getHashes() throws Throwable
+    {
+        long startTime = System.currentTimeMillis();
+        String hashes = Arrays.toString(getRows(execute(hashQuery))[0]);
+        long endTime = System.currentTimeMillis();
+        System.out.println(String.format("Hashes: %s, retrieved in %.3fs", hashes, (endTime - startTime) * 1e-3));
+        return hashes;
+    }
+
+    @Test
+    public void testCellAtEnd() throws Throwable
+    {
+        testGcCompaction(TombstoneOption.CELL, TombstoneOption.NONE, LEVELED_STRATEGY);
+    }
+
+    @Test
+    public void testRowAtEnd() throws Throwable
+    {
+        testGcCompaction(TombstoneOption.CELL, TombstoneOption.NONE, LEVELED_STRATEGY);
+    }
+
+    @Test
+    public void testCellThroughout() throws Throwable
+    {
+        testGcCompaction(TombstoneOption.CELL, TombstoneOption.CELL, LEVELED_STRATEGY);
+    }
+
+    @Test
+    public void testRowThroughout() throws Throwable
+    {
+        testGcCompaction(TombstoneOption.ROW, TombstoneOption.ROW, LEVELED_STRATEGY);
+    }
+
+    @Test
+    public void testCopyCompaction() throws Throwable
+    {
+        testGcCompaction(TombstoneOption.NONE, TombstoneOption.NONE, LEVELED_STRATEGY);
+    }
+
+    @Test
+    public void testCellAtEndSizeTiered() throws Throwable
+    {
+        testGcCompaction(TombstoneOption.CELL, TombstoneOption.NONE, SIZE_TIERED_STRATEGY);
+    }
+
+    @Test
+    public void testRowAtEndSizeTiered() throws Throwable
+    {
+        testGcCompaction(TombstoneOption.ROW, TombstoneOption.NONE, SIZE_TIERED_STRATEGY);
+    }
+
+    @Test
+    public void testCellThroughoutSizeTiered() throws Throwable
+    {
+        testGcCompaction(TombstoneOption.CELL, TombstoneOption.CELL, SIZE_TIERED_STRATEGY);
+    }
+
+    @Test
+    public void testRowThroughoutSizeTiered() throws Throwable
+    {
+        testGcCompaction(TombstoneOption.ROW, TombstoneOption.ROW, SIZE_TIERED_STRATEGY);
+    }
+
+    @Test
+    public void testCopyCompactionSizeTiered() throws Throwable
+    {
+        testGcCompaction(TombstoneOption.NONE, TombstoneOption.NONE, SIZE_TIERED_STRATEGY);
+    }
+
+    int countTombstoneMarkers(ColumnFamilyStore cfs)
+    {
+        return count(cfs, x -> x.isRangeTombstoneMarker());
+    }
+
+    int countRowDeletions(ColumnFamilyStore cfs)
+    {
+        return count(cfs, x -> x.isRow() && !((Row) x).deletion().isLive());
+    }
+
+    int countRows(ColumnFamilyStore cfs)
+    {
+        int nowInSec = FBUtilities.nowInSeconds();
+        return count(cfs, x -> x.isRow() && ((Row) x).hasLiveData(nowInSec));
+    }
+
+    private int count(ColumnFamilyStore cfs, Predicate<Unfiltered> predicate)
+    {
+        int count = 0;
+        for (SSTableReader reader : cfs.getLiveSSTables())
+            count += count(reader, predicate);
+        return count;
+    }
+
+    int count(SSTableReader reader, Predicate<Unfiltered> predicate)
+    {
+        int instances = 0;
+        try (ISSTableScanner partitions = reader.getScanner())
+        {
+            while (partitions.hasNext())
+            {
+                try (UnfilteredRowIterator iter = partitions.next())
+                {
+                    while (iter.hasNext())
+                    {
+                        Unfiltered atom = iter.next();
+                        if (predicate.test(atom))
+                            ++instances;
+                    }
+                }
+            }
+        }
+        return instances;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java b/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
new file mode 100644
index 0000000..6fed033
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/GcCompactionTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Function;
+
+import com.google.common.collect.Iterables;
+import org.junit.Test;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class GcCompactionTest extends CQLTester
+{
+    static final int KEY_COUNT = 10;
+    static final int CLUSTERING_COUNT = 20;
+
+    @Test
+    public void testGcCompactionPartitions() throws Throwable
+    {
+        runCompactionTest("CREATE TABLE %s(" +
+                          "  key int," +
+                          "  column int," +
+                          "  data int," +
+                          "  extra text," +
+                          "  PRIMARY KEY((key, column), data)" +
+                          ") WITH compaction = { 'class' :  'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'row'  };"
+                          );
+
+    }
+
+    @Test
+    public void testGcCompactionRows() throws Throwable
+    {
+        runCompactionTest("CREATE TABLE %s(" +
+                          "  key int," +
+                          "  column int," +
+                          "  data int," +
+                          "  extra text," +
+                          "  PRIMARY KEY(key, column)" +
+                          ") WITH compaction = { 'class' :  'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'row'  };"
+                          );
+
+    }
+
+    @Test
+    public void testGcCompactionRanges() throws Throwable
+    {
+
+        runCompactionTest("CREATE TABLE %s(" +
+                          "  key int," +
+                          "  column int," +
+                          "  col2 int," +
+                          "  data int," +
+                          "  extra text," +
+                          "  PRIMARY KEY(key, column, data)" +
+                          ") WITH compaction = { 'class' :  'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'row'  };"
+                          );
+    }
+
+    private void runCompactionTest(String tableDef) throws Throwable
+    {
+        createTable(tableDef);
+
+        for (int i = 0; i < KEY_COUNT; ++i)
+            for (int j = 0; j < CLUSTERING_COUNT; ++j)
+                execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", i, j, i+j, "" + i + ":" + j);
+
+        Set<SSTableReader> readers = new HashSet<>();
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+        flush();
+        assertEquals(1, cfs.getLiveSSTables().size());
+        SSTableReader table0 = getNewTable(readers);
+        assertEquals(0, countTombstoneMarkers(table0));
+        int rowCount = countRows(table0);
+
+        deleteWithSomeInserts(3, 5, 10);
+        flush();
+        assertEquals(2, cfs.getLiveSSTables().size());
+        SSTableReader table1 = getNewTable(readers);
+        assertTrue(countRows(table1) > 0);
+        assertTrue(countTombstoneMarkers(table1) > 0);
+
+        deleteWithSomeInserts(5, 6, 0);
+        flush();
+        assertEquals(3, cfs.getLiveSSTables().size());
+        SSTableReader table2 = getNewTable(readers);
+        assertEquals(0, countRows(table2));
+        assertTrue(countTombstoneMarkers(table2) > 0);
+
+        CompactionManager.instance.forceUserDefinedCompaction(table0.getFilename());
+
+        assertEquals(3, cfs.getLiveSSTables().size());
+        SSTableReader table3 = getNewTable(readers);
+        assertEquals(0, countTombstoneMarkers(table3));
+        assertTrue(rowCount > countRows(table3));
+    }
+
+    @Test
+    public void testGcCompactionCells() throws Throwable
+    {
+        createTable("CREATE TABLE %s(" +
+                          "  key int," +
+                          "  column int," +
+                          "  data int," +
+                          "  extra text," +
+                          "  PRIMARY KEY(key)" +
+                          ") WITH compaction = { 'class' :  'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'cell'  };"
+                          );
+
+        for (int i = 0; i < KEY_COUNT; ++i)
+            for (int j = 0; j < CLUSTERING_COUNT; ++j)
+                execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", i, j, i+j, "" + i + ":" + j);
+
+        Set<SSTableReader> readers = new HashSet<>();
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+        flush();
+        assertEquals(1, cfs.getLiveSSTables().size());
+        SSTableReader table0 = getNewTable(readers);
+        assertEquals(0, countTombstoneMarkers(table0));
+        int cellCount = countCells(table0);
+
+        deleteWithSomeInserts(3, 0, 2);
+        flush();
+        assertEquals(2, cfs.getLiveSSTables().size());
+        SSTableReader table1 = getNewTable(readers);
+        assertTrue(countCells(table1) > 0);
+        assertEquals(0, countTombstoneMarkers(table0));
+
+        CompactionManager.instance.forceUserDefinedCompaction(table0.getFilename());
+
+        assertEquals(2, cfs.getLiveSSTables().size());
+        SSTableReader table3 = getNewTable(readers);
+        assertEquals(0, countTombstoneMarkers(table3));
+        assertTrue(cellCount > countCells(table3));
+    }
+
+    @Test
+    public void testGcCompactionStatic() throws Throwable
+    {
+        createTable("CREATE TABLE %s(" +
+                          "  key int," +
+                          "  column int," +
+                          "  data int static," +
+                          "  extra text," +
+                          "  PRIMARY KEY(key, column)" +
+                          ") WITH compaction = { 'class' :  'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'cell'  };"
+                          );
+
+        for (int i = 0; i < KEY_COUNT; ++i)
+            for (int j = 0; j < CLUSTERING_COUNT; ++j)
+                execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", i, j, i+j, "" + i + ":" + j);
+
+        Set<SSTableReader> readers = new HashSet<>();
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+        flush();
+        assertEquals(1, cfs.getLiveSSTables().size());
+        SSTableReader table0 = getNewTable(readers);
+        assertEquals(0, countTombstoneMarkers(table0));
+        int cellCount = countStaticCells(table0);
+        assertEquals(KEY_COUNT, cellCount);
+
+        execute("DELETE data FROM %s WHERE key = 0");   // delete static cell
+        execute("INSERT INTO %s (key, data) VALUES (1, 0)");  // overwrite static cell
+        flush();
+        assertEquals(2, cfs.getLiveSSTables().size());
+        SSTableReader table1 = getNewTable(readers);
+        assertTrue(countStaticCells(table1) > 0);
+        assertEquals(0, countTombstoneMarkers(table0));
+
+        CompactionManager.instance.forceUserDefinedCompaction(table0.getFilename());
+
+        assertEquals(2, cfs.getLiveSSTables().size());
+        SSTableReader table3 = getNewTable(readers);
+        assertEquals(0, countTombstoneMarkers(table3));
+        assertEquals(cellCount - 2, countStaticCells(table3));
+    }
+
+    @Test
+    public void testGcCompactionComplexColumn() throws Throwable
+    {
+        createTable("CREATE TABLE %s(" +
+                          "  key int," +
+                          "  data map<int, int>," +
+                          "  extra text," +
+                          "  PRIMARY KEY(key)" +
+                          ") WITH compaction = { 'class' :  'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones' : 'cell'  };"
+                          );
+
+        for (int i = 0; i < KEY_COUNT; ++i)
+            for (int j = 0; j < CLUSTERING_COUNT; ++j)
+                execute("UPDATE %s SET data[?] = ? WHERE key = ?", j, i+j, i);
+
+        Set<SSTableReader> readers = new HashSet<>();
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+        flush();
+        assertEquals(1, cfs.getLiveSSTables().size());
+        SSTableReader table0 = getNewTable(readers);
+        assertEquals(0, countTombstoneMarkers(table0));
+        int cellCount = countComplexCells(table0);
+
+        deleteWithSomeInsertsComplexColumn(3, 5, 8);
+        flush();
+        assertEquals(2, cfs.getLiveSSTables().size());
+        SSTableReader table1 = getNewTable(readers);
+        assertTrue(countComplexCells(table1) > 0);
+        assertEquals(0, countTombstoneMarkers(table0));
+
+        CompactionManager.instance.forceUserDefinedCompaction(table0.getFilename());
+
+        assertEquals(2, cfs.getLiveSSTables().size());
+        SSTableReader table3 = getNewTable(readers);
+        assertEquals(0, countTombstoneMarkers(table3));
+        assertEquals(cellCount - 23, countComplexCells(table3));
+    }
+
+    @Test
+    public void testLocalDeletionTime() throws Throwable
+    {
+        createTable("create table %s (k int, c1 int, primary key (k, c1)) with compaction = {'class': 'SizeTieredCompactionStrategy', 'provide_overlapping_tombstones':'row'}");
+        execute("delete from %s where k = 1");
+        Set<SSTableReader> readers = new HashSet<>(getCurrentColumnFamilyStore().getLiveSSTables());
+        getCurrentColumnFamilyStore().forceBlockingFlush();
+        SSTableReader oldSSTable = getNewTable(readers);
+        Thread.sleep(2000);
+        execute("delete from %s where k = 1");
+        getCurrentColumnFamilyStore().forceBlockingFlush();
+        SSTableReader newTable = getNewTable(readers);
+
+        CompactionManager.instance.forceUserDefinedCompaction(oldSSTable.getFilename());
+
+        // Old table now doesn't contain any data and should disappear.
+        assertEquals(Collections.singleton(newTable), getCurrentColumnFamilyStore().getLiveSSTables());
+    }
+
+    private SSTableReader getNewTable(Set<SSTableReader> readers)
+    {
+        Set<SSTableReader> newOnes = new HashSet<>(getCurrentColumnFamilyStore().getLiveSSTables());
+        newOnes.removeAll(readers);
+        assertEquals(1, newOnes.size());
+        readers.addAll(newOnes);
+        return Iterables.get(newOnes, 0);
+    }
+
+    void deleteWithSomeInserts(int key_step, int delete_step, int readd_step) throws Throwable
+    {
+        for (int i = 0; i < KEY_COUNT; i += key_step)
+        {
+            if (delete_step > 0)
+                for (int j = i % delete_step; j < CLUSTERING_COUNT; j += delete_step)
+                {
+                    execute("DELETE FROM %s WHERE key = ? AND column = ?", i, j);
+                }
+            if (readd_step > 0)
+                for (int j = i % readd_step; j < CLUSTERING_COUNT; j += readd_step)
+                {
+                    execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", i, j, i-j, "readded " + i + ":" + j);
+                }
+        }
+    }
+
+    void deleteWithSomeInsertsComplexColumn(int key_step, int delete_step, int readd_step) throws Throwable
+    {
+        for (int i = 0; i < KEY_COUNT; i += key_step)
+        {
+            if (delete_step > 0)
+                for (int j = i % delete_step; j < CLUSTERING_COUNT; j += delete_step)
+                {
+                    execute("DELETE data[?] FROM %s WHERE key = ?", j, i);
+                }
+            if (readd_step > 0)
+                for (int j = i % readd_step; j < CLUSTERING_COUNT; j += readd_step)
+                {
+                    execute("UPDATE %s SET data[?] = ? WHERE key = ?", j, -(i+j), i);
+                }
+        }
+    }
+
+    int countTombstoneMarkers(SSTableReader reader)
+    {
+        int nowInSec = FBUtilities.nowInSeconds();
+        return count(reader, x -> x.isRangeTombstoneMarker() || x.isRow() && ((Row) x).hasDeletion(nowInSec) ? 1 : 0, x -> x.partitionLevelDeletion().isLive() ? 0 : 1);
+    }
+
+    int countRows(SSTableReader reader)
+    {
+        int nowInSec = FBUtilities.nowInSeconds();
+        return count(reader, x -> x.isRow() && ((Row) x).hasLiveData(nowInSec) ? 1 : 0, x -> 0);
+    }
+
+    int countCells(SSTableReader reader)
+    {
+        return count(reader, x -> x.isRow() ? Iterables.size((Row) x) : 0, x -> 0);
+    }
+
+    int countStaticCells(SSTableReader reader)
+    {
+        return count(reader, x -> 0, x -> Iterables.size(x.staticRow()));
+    }
+
+    int countComplexCells(SSTableReader reader)
+    {
+        return count(reader, x -> x.isRow() ? ((Row) x).stream().mapToInt(this::countComplex).sum() : 0, x -> 0);
+    }
+
+    int countComplex(ColumnData c)
+    {
+        if (!(c instanceof ComplexColumnData))
+            return 0;
+        ComplexColumnData ccd = (ComplexColumnData) c;
+        return ccd.cellsCount();
+    }
+
+    int count(SSTableReader reader, Function<Unfiltered, Integer> predicate, Function<UnfilteredRowIterator, Integer> partitionPredicate)
+    {
+        int instances = 0;
+        try (ISSTableScanner partitions = reader.getScanner())
+        {
+            while (partitions.hasNext())
+            {
+                try (UnfilteredRowIterator iter = partitions.next())
+                {
+                    instances += partitionPredicate.apply(iter);
+                    while (iter.hasNext())
+                    {
+                        Unfiltered atom = iter.next();
+                        instances += predicate.apply(atom);
+                    }
+                }
+            }
+        }
+        return instances;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java b/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
index c930b2a..ece2d1d 100644
--- a/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
+++ b/test/unit/org/apache/cassandra/cql3/selection/SelectionColumnMappingTest.java
@@ -51,7 +51,7 @@ public class SelectionColumnMappingTest extends CQLTester
     String functionName;
 
     @BeforeClass
-    public static void setUpClass()
+    public static void setUpClass()     // overrides CQLTester.setUpClass()
     {
         DatabaseDescriptor.setPartitionerUnsafe(ByteOrderedPartitioner.instance);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
new file mode 100644
index 0000000..2189e15
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionIteratorTest.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import static org.junit.Assert.*;
+
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.collect.*;
+
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.schema.KeyspaceParams;
+
+public class CompactionIteratorTest
+{
+
+    private static final int NOW = 1000;
+    private static final int GC_BEFORE = 100;
+    private static final String KSNAME = "CompactionIteratorTest";
+    private static final String CFNAME = "Integer1";
+
+    static final DecoratedKey kk = Util.dk("key");
+    static final CFMetaData metadata;
+    private static final int RANGE = 1000;
+    private static final int COUNT = 100;
+
+    Map<List<Unfiltered>, DeletionTime> deletionTimes = new HashMap<>();
+
+    static {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KSNAME,
+                                    KeyspaceParams.simple(1),
+                                    metadata = SchemaLoader.standardCFMD(KSNAME,
+                                                                         CFNAME,
+                                                                         1,
+                                                                         UTF8Type.instance,
+                                                                         Int32Type.instance,
+                                                                         Int32Type.instance));
+    }
+
+    // See org.apache.cassandra.db.rows.UnfilteredRowsGenerator.parse for the syntax used in these tests.
+
+    @Test
+    public void testGcCompactionSupersedeLeft()
+    {
+        testCompaction(new String[] {
+            "5<=[140] 10[150] [140]<20 22<[130] [130]<25 30[150]"
+        }, new String[] {
+            "7<[160] 15[180] [160]<30 40[120]"
+        },
+        3);
+    }
+
+    @Test
+    public void testGcCompactionSupersedeMiddle()
+    {
+        testCompaction(new String[] {
+            "5<=[140] 10[150] [140]<40 60[150]"
+        }, new String[] {
+            "7<=[160] 15[180] [160]<=30 40[120]"
+        },
+        3);
+    }
+
+    @Test
+    public void testGcCompactionSupersedeRight()
+    {
+        testCompaction(new String[] {
+            "9<=[140] 10[150] [140]<40 60[150]"
+        }, new String[] {
+            "7<[160] 15[180] [160]<30 40[120]"
+        },
+        3);
+    }
+
+    @Test
+    public void testGcCompactionSwitchInSuperseded()
+    {
+        testCompaction(new String[] {
+            "5<=[140] 10[150] [140]<20 20<=[170] [170]<=50 60[150]"
+        }, new String[] {
+            "7<[160] 15[180] [160]<30 40[120]"
+        },
+        5);
+    }
+
+    @Test
+    public void testGcCompactionBoundaries()
+    {
+        testCompaction(new String[] {
+            "5<=[120] [120]<9 9<=[140] 10[150] [140]<40 40<=[120] 60[150] [120]<90"
+        }, new String[] {
+            "7<[160] 15[180] [160]<30 40[120] 45<[140] [140]<80 88<=[130] [130]<100"
+        },
+        7);
+    }
+
+    @Test
+    public void testGcCompactionMatches()
+    {
+        testCompaction(new String[] {
+            "5<=[120] [120]<=9 9<[140] 10[150] [140]<40 40<=[120] 60[150] [120]<90 120<=[100] [100]<130"
+        }, new String[] {
+            "9<[160] 15[180] [160]<40 40[120] 45<[140] [140]<90 90<=[110] [110]<100 120<=[100] [100]<130"
+        },
+        5);
+    }
+
+    @Test
+    public void testGcCompactionRowDeletion()
+    {
+        testCompaction(new String[] {
+            "10[150] 20[160] 25[160] 30[170] 40[120] 50[120]"
+        }, new String[] {
+            "10<=[155] 20[200D180] 30[200D160] [155]<=30 40[150D130] 50[150D100]"
+        },
+        "25[160] 30[170] 50[120]");
+    }
+
+    @Test
+    public void testGcCompactionPartitionDeletion()
+    {
+        testCompaction(new String[] {
+            "10[150] 20[160] 25[160] 30[170] 40[120] 50[120]"
+        }, new String[] {
+            // Dxx| stands for partition deletion at time xx
+            "D165|10<=[155] 20[200D180] 30[200D160] [155]<=30 40[150D130] 50[150D100]"
+        },
+        "30[170]");
+    }
+
+    void testCompaction(String[] inputs, String[] tombstones, String expected)
+    {
+        testNonGcCompaction(inputs, tombstones);
+
+        UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false);
+        List<List<Unfiltered>> inputLists = parse(inputs, generator);
+        List<List<Unfiltered>> tombstoneLists = parse(tombstones, generator);
+        List<Unfiltered> result = compact(inputLists, tombstoneLists);
+        System.out.println("GC compaction resulted in " + size(result) + " Unfiltereds");
+        generator.verifyValid(result);
+        verifyEquivalent(inputLists, result, tombstoneLists, generator);
+        List<Unfiltered> expectedResult = generator.parse(expected, NOW - 1);
+        if (!expectedResult.equals(result))
+            fail("Expected " + expected + ", got " + generator.str(result));
+    }
+
+    void testCompaction(String[] inputs, String[] tombstones, int expectedCount)
+    {
+        testNonGcCompaction(inputs, tombstones);
+
+        UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false);
+        List<List<Unfiltered>> inputLists = parse(inputs, generator);
+        List<List<Unfiltered>> tombstoneLists = parse(tombstones, generator);
+        List<Unfiltered> result = compact(inputLists, tombstoneLists);
+        System.out.println("GC compaction resulted in " + size(result) + " Unfiltereds");
+        generator.verifyValid(result);
+        verifyEquivalent(inputLists, result, tombstoneLists, generator);
+        if (size(result) > expectedCount)
+            fail("Expected compaction with " + expectedCount + " elements, got " + size(result) + ": " + generator.str(result));
+    }
+
+    int testNonGcCompaction(String[] inputs, String[] tombstones)
+    {
+        UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false);
+        List<List<Unfiltered>> inputLists = parse(inputs, generator);
+        List<List<Unfiltered>> tombstoneLists = parse(tombstones, generator);
+        List<Unfiltered> result = compact(inputLists, Collections.emptyList());
+        System.out.println("Non-GC compaction resulted in " + size(result) + " Unfiltereds");
+        generator.verifyValid(result);
+        verifyEquivalent(inputLists, result, tombstoneLists, generator);
+        return size(result);
+    }
+
+    private static int size(List<Unfiltered> data)
+    {
+        return data.stream().mapToInt(x -> x instanceof RangeTombstoneBoundaryMarker ? 2 : 1).sum();
+    }
+
+    private void verifyEquivalent(List<List<Unfiltered>> sources, List<Unfiltered> result, List<List<Unfiltered>> tombstoneSources, UnfilteredRowsGenerator generator)
+    {
+        // sources + tombstoneSources must be the same as result + tombstoneSources
+        List<Unfiltered> expected = compact(Iterables.concat(sources, tombstoneSources), Collections.emptyList());
+        List<Unfiltered> actual = compact(Iterables.concat(ImmutableList.of(result), tombstoneSources), Collections.emptyList());
+        if (!expected.equals(actual))
+        {
+            System.out.println("Equivalence test failure between sources:");
+            for (List<Unfiltered> partition : sources)
+                generator.dumpList(partition);
+            System.out.println("and compacted " + generator.str(result));
+            System.out.println("with tombstone sources:");
+            for (List<Unfiltered> partition : tombstoneSources)
+                generator.dumpList(partition);
+            System.out.println("expected " + generator.str(expected));
+            System.out.println("got " + generator.str(actual));
+            fail("Failed equivalence test.");
+        }
+    }
+
+    private List<List<Unfiltered>> parse(String[] inputs, UnfilteredRowsGenerator generator)
+    {
+        return ImmutableList.copyOf(Lists.transform(Arrays.asList(inputs), x -> parse(x, generator)));
+    }
+
+    private List<Unfiltered> parse(String input, UnfilteredRowsGenerator generator)
+    {
+        Matcher m = Pattern.compile("D(\\d+)\\|").matcher(input);
+        if (m.lookingAt())
+        {
+            int del = Integer.parseInt(m.group(1));
+            input = input.substring(m.end());
+            List<Unfiltered> list = generator.parse(input, NOW - 1);
+            deletionTimes.put(list, new DeletionTime(del, del));
+            return list;
+        }
+        else
+            return generator.parse(input, NOW - 1);
+    }
+
+    private List<Unfiltered> compact(Iterable<List<Unfiltered>> sources, Iterable<List<Unfiltered>> tombstoneSources)
+    {
+        List<Iterable<UnfilteredRowIterator>> content = ImmutableList.copyOf(Iterables.transform(sources, list -> ImmutableList.of(listToIterator(list, kk))));
+        Map<DecoratedKey, Iterable<UnfilteredRowIterator>> transformedSources = new TreeMap<>();
+        transformedSources.put(kk, Iterables.transform(tombstoneSources, list -> listToIterator(list, kk)));
+        try (CompactionController controller = new Controller(Keyspace.openAndGetStore(metadata), transformedSources, GC_BEFORE);
+             CompactionIterator iter = new CompactionIterator(OperationType.COMPACTION,
+                                                              Lists.transform(content, x -> new Scanner(x)),
+                                                              controller, NOW, null))
+        {
+            List<Unfiltered> result = new ArrayList<>();
+            assertTrue(iter.hasNext());
+            try (UnfilteredRowIterator partition = iter.next())
+            {
+                Iterators.addAll(result, partition);
+            }
+            assertFalse(iter.hasNext());
+            return result;
+        }
+    }
+
+    private UnfilteredRowIterator listToIterator(List<Unfiltered> list, DecoratedKey key)
+    {
+        return UnfilteredRowsGenerator.source(list, metadata, key, deletionTimes.getOrDefault(list, DeletionTime.LIVE));
+    }
+
+    NavigableMap<DecoratedKey, List<Unfiltered>> generateContent(Random rand, UnfilteredRowsGenerator generator,
+                                                                 List<DecoratedKey> keys, int pcount, int rcount)
+    {
+        NavigableMap<DecoratedKey, List<Unfiltered>> map = new TreeMap<>();
+        for (int i = 0; i < pcount; ++i)
+        {
+            DecoratedKey key = keys.get(rand.nextInt(keys.size()));
+            map.put(key, generator.generateSource(rand, rcount, RANGE, NOW - 5, x -> NOW - 1));
+        }
+        return map;
+    }
+
+    @Test
+    public void testRandom()
+    {
+        UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(metadata.comparator, false);
+        for (int seed = 1; seed < 100; ++seed)
+        {
+            Random rand = new Random(seed);
+            List<List<Unfiltered>> sources = new ArrayList<>();
+            for (int i = 0; i < 10; ++i)
+                sources.add(generator.generateSource(rand, COUNT, RANGE, NOW - 5, x -> NOW - 15));
+            int srcSz = sources.stream().mapToInt(CompactionIteratorTest::size).sum();
+            List<List<Unfiltered>> tombSources = new ArrayList<>();
+            for (int i = 0; i < 10; ++i)
+                sources.add(generator.generateSource(rand, COUNT, RANGE, NOW - 5, x -> NOW - 15));
+            List<Unfiltered> result = compact(sources, tombSources);
+            verifyEquivalent(sources, result, tombSources, generator);
+            assertTrue(size(result) < srcSz);
+        }
+    }
+
+    class Controller extends CompactionController
+    {
+        private final Map<DecoratedKey, Iterable<UnfilteredRowIterator>> tombstoneSources;
+
+        public Controller(ColumnFamilyStore cfs, Map<DecoratedKey, Iterable<UnfilteredRowIterator>> tombstoneSources, int gcBefore)
+        {
+            super(cfs, Collections.emptySet(), gcBefore);
+            this.tombstoneSources = tombstoneSources;
+        }
+
+        @Override
+        public Iterable<UnfilteredRowIterator> shadowSources(DecoratedKey key, boolean tombstoneOnly)
+        {
+            assert tombstoneOnly;
+            return tombstoneSources.get(key);
+        }
+    }
+
+    class Scanner extends AbstractUnfilteredPartitionIterator implements ISSTableScanner
+    {
+        Iterator<UnfilteredRowIterator> iter;
+
+        Scanner(Iterable<UnfilteredRowIterator> content)
+        {
+            iter = content.iterator();
+        }
+
+        @Override
+        public boolean isForThrift()
+        {
+            return false;
+        }
+
+        @Override
+        public CFMetaData metadata()
+        {
+            return metadata;
+        }
+
+        @Override
+        public boolean hasNext()
+        {
+            return iter.hasNext();
+        }
+
+        @Override
+        public UnfilteredRowIterator next()
+        {
+            return iter.next();
+        }
+
+        @Override
+        public long getLengthInBytes()
+        {
+            return 0;
+        }
+
+        @Override
+        public long getCurrentPosition()
+        {
+            return 0;
+        }
+
+        @Override
+        public String getBackingFiles()
+        {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
index 0eeb379..3335c02 100644
--- a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
@@ -96,9 +96,11 @@ public class UnfilteredRowIteratorsMergeTest
     @SuppressWarnings("unused")
     public void testTombstoneMerge(boolean reversed, boolean iterations)
     {
+        this.reversed = reversed;
+        UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(comparator, reversed);
+
         for (int seed = 1; seed <= 100; ++seed)
         {
-            this.reversed = reversed;
             if (ITEMS <= 20)
                 System.out.println("\nSeed " + seed);
 
@@ -112,27 +114,29 @@ public class UnfilteredRowIteratorsMergeTest
             if (ITEMS <= 20)
                 System.out.println("Merging");
             for (int i=0; i<ITERATORS; ++i)
-                sources.add(generateSource(r, timeGenerators.get(r.nextInt(timeGenerators.size()))));
+                sources.add(generator.generateSource(r, ITEMS, RANGE, DEL_RANGE, timeGenerators.get(r.nextInt(timeGenerators.size()))));
             List<Unfiltered> merged = merge(sources, iterations);
     
             if (ITEMS <= 20)
                 System.out.println("results in");
             if (ITEMS <= 20)
-                dumpList(merged);
-            verifyEquivalent(sources, merged);
-            verifyValid(merged);
+                generator.dumpList(merged);
+            verifyEquivalent(sources, merged, generator);
+            generator.verifyValid(merged);
             if (reversed)
             {
                 Collections.reverse(merged);
-                this.reversed = false;
-                verifyValid(merged);
+                generator.verifyValid(merged, false);
             }
         }
     }
 
     private List<Unfiltered> merge(List<List<Unfiltered>> sources, boolean iterations)
     {
-        List<UnfilteredRowIterator> us = sources.stream().map(l -> new Source(l.iterator())).collect(Collectors.toList());
+        List<UnfilteredRowIterator> us = sources.
+                stream().
+                map(l -> new UnfilteredRowsGenerator.Source(l.iterator(), metadata, partitionKey, DeletionTime.LIVE, reversed)).
+                collect(Collectors.toList());
         List<Unfiltered> merged = new ArrayList<>();
         Iterators.addAll(merged, mergeIterators(us, iterations));
         return merged;
@@ -285,24 +289,24 @@ public class UnfilteredRowIteratorsMergeTest
         }
     }
 
-    void verifyEquivalent(List<List<Unfiltered>> sources, List<Unfiltered> merged)
+    void verifyEquivalent(List<List<Unfiltered>> sources, List<Unfiltered> merged, UnfilteredRowsGenerator generator)
     {
         try
         {
             for (int i=0; i<RANGE; ++i)
             {
-                Clusterable c = clusteringFor(i);
+                Clusterable c = UnfilteredRowsGenerator.clusteringFor(i);
                 DeletionTime dt = DeletionTime.LIVE;
                 for (List<Unfiltered> source : sources)
                 {
                     dt = deletionFor(c, source, dt);
                 }
-                Assert.assertEquals("Deletion time mismatch for position " + str(c), dt, deletionFor(c, merged));
+                Assert.assertEquals("Deletion time mismatch for position " + i, dt, deletionFor(c, merged));
                 if (dt == DeletionTime.LIVE)
                 {
                     Optional<Unfiltered> sourceOpt = sources.stream().map(source -> rowFor(c, source)).filter(x -> x != null).findAny();
                     Unfiltered mergedRow = rowFor(c, merged);
-                    Assert.assertEquals("Content mismatch for position " + str(c), str(sourceOpt.orElse(null)), str(mergedRow));
+                    Assert.assertEquals("Content mismatch for position " + i, clustering(sourceOpt.orElse(null)), clustering(mergedRow));
                 }
             }
         }
@@ -310,13 +314,20 @@ public class UnfilteredRowIteratorsMergeTest
         {
             System.out.println(e);
             for (List<Unfiltered> list : sources)
-                dumpList(list);
+                generator.dumpList(list);
             System.out.println("merged");
-            dumpList(merged);
+            generator.dumpList(merged);
             throw e;
         }
     }
 
+    String clustering(Clusterable curr)
+    {
+        if (curr == null)
+            return "null";
+        return Int32Type.instance.getString(curr.clustering().get(0));
+    }
+
     private Unfiltered rowFor(Clusterable pointer, List<Unfiltered> list)
     {
         int index = Collections.binarySearch(list, pointer, reversed ? comparator.reversed() : comparator);
@@ -424,21 +435,23 @@ public class UnfilteredRowIteratorsMergeTest
 
     public void testForInput(String... inputs)
     {
+        reversed = false;
+        UnfilteredRowsGenerator generator = new UnfilteredRowsGenerator(comparator, false);
+
         List<List<Unfiltered>> sources = new ArrayList<>();
         for (String input : inputs)
         {
-            List<Unfiltered> source = parse(input);
-            attachBoundaries(source);
-            dumpList(source);
-            verifyValid(source);
+            List<Unfiltered> source = generator.parse(input, DEL_RANGE);
+            generator.dumpList(source);
+            generator.verifyValid(source);
             sources.add(source);
         }
 
         List<Unfiltered> merged = merge(sources, false);
         System.out.println("Merge to:");
-        dumpList(merged);
-        verifyEquivalent(sources, merged);
-        verifyValid(merged);
+        generator.dumpList(merged);
+        verifyEquivalent(sources, merged, generator);
+        generator.verifyValid(merged);
         System.out.println();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d40ac784/test/unit/org/apache/cassandra/db/rows/UnfilteredRowsGenerator.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowsGenerator.java b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowsGenerator.java
new file mode 100644
index 0000000..7cdccdb
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowsGenerator.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.junit.Assert;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.rows.Unfiltered.Kind;
+import org.apache.cassandra.utils.btree.BTree;
+
+public class UnfilteredRowsGenerator
+{
+    final boolean reversed;
+    final Comparator<Clusterable> comparator;
+
+    public UnfilteredRowsGenerator(Comparator<Clusterable> comparator, boolean reversed)
+    {
+        this.reversed = reversed;
+        this.comparator = comparator;
+    }
+
+    String str(Clusterable curr)
+    {
+        if (curr == null)
+            return "null";
+        String val = Int32Type.instance.getString(curr.clustering().get(0));
+        if (curr instanceof RangeTombstoneMarker)
+        {
+            RangeTombstoneMarker marker = (RangeTombstoneMarker) curr;
+            if (marker.isClose(reversed))
+                val = "[" + marker.closeDeletionTime(reversed).markedForDeleteAt() + "]" + (marker.closeIsInclusive(reversed) ? "<=" : "<") + val;
+            if (marker.isOpen(reversed))
+                val = val + (marker.openIsInclusive(reversed) ? "<=" : "<") + "[" + marker.openDeletionTime(reversed).markedForDeleteAt() + "]";
+        }
+        else if (curr instanceof Row)
+        {
+            Row row = (Row) curr;
+            String delTime = "";
+            if (!row.deletion().time().isLive())
+                delTime = "D" + row.deletion().time().markedForDeleteAt();
+            val = val + "[" + row.primaryKeyLivenessInfo().timestamp() + delTime + "]";
+        }
+        return val;
+    }
+
+    public void verifyValid(List<Unfiltered> list)
+    {
+        verifyValid(list, reversed);
+    }
+
+    void verifyValid(List<Unfiltered> list, boolean reversed)
+    {
+        int reversedAsMultiplier = reversed ? -1 : 1;
+        try {
+            RangeTombstoneMarker prev = null;
+            Unfiltered prevUnfiltered = null;
+            for (Unfiltered unfiltered : list)
+            {
+                Assert.assertTrue("Order violation prev " + str(prevUnfiltered) + " curr " + str(unfiltered),
+                                  prevUnfiltered == null || comparator.compare(prevUnfiltered, unfiltered) * reversedAsMultiplier < 0);
+                prevUnfiltered = unfiltered;
+
+                if (unfiltered.kind() == Kind.RANGE_TOMBSTONE_MARKER)
+                {
+                    RangeTombstoneMarker curr = (RangeTombstoneMarker) unfiltered;
+                    if (prev != null)
+                    {
+                        if (curr.isClose(reversed))
+                        {
+                            Assert.assertTrue(str(unfiltered) + " follows another close marker " + str(prev), prev.isOpen(reversed));
+                            Assert.assertEquals("Deletion time mismatch for open " + str(prev) + " and close " + str(unfiltered),
+                                                prev.openDeletionTime(reversed),
+                                                curr.closeDeletionTime(reversed));
+                        }
+                        else
+                            Assert.assertFalse(str(curr) + " follows another open marker " + str(prev), prev.isOpen(reversed));
+                    }
+
+                    prev = curr;
+                }
+            }
+            Assert.assertFalse("Cannot end in open marker " + str(prev), prev != null && prev.isOpen(reversed));
+
+        } catch (AssertionError e) {
+            System.out.println(e);
+            dumpList(list);
+            throw e;
+        }
+    }
+
+    public List<Unfiltered> generateSource(Random r, int items, int range, int del_range, Function<Integer, Integer> timeGenerator)
+    {
+        int[] positions = new int[items + 1];
+        for (int i=0; i<items; ++i)
+            positions[i] = r.nextInt(range);
+        positions[items] = range;
+        Arrays.sort(positions);
+
+        List<Unfiltered> content = new ArrayList<>(items);
+        int prev = -1;
+        for (int i=0; i<items; ++i)
+        {
+            int pos = positions[i];
+            int sz = positions[i + 1] - pos;
+            if (sz == 0 && pos == prev)
+                // Filter out more than two of the same position.
+                continue;
+            if (r.nextBoolean() || pos == prev)
+            {
+                int span;
+                boolean includesStart;
+                boolean includesEnd;
+                if (pos > prev)
+                {
+                    span = r.nextInt(sz + 1);
+                    includesStart = span > 0 ? r.nextBoolean() : true;
+                    includesEnd = span > 0 ? r.nextBoolean() : true;
+                }
+                else
+                {
+                    span = 1 + r.nextInt(sz);
+                    includesStart = false;
+                    includesEnd = r.nextBoolean();
+                }
+                int deltime = r.nextInt(del_range);
+                DeletionTime dt = new DeletionTime(deltime, deltime);
+                content.add(new RangeTombstoneBoundMarker(boundFor(pos, true, includesStart), dt));
+                content.add(new RangeTombstoneBoundMarker(boundFor(pos + span, false, includesEnd), dt));
+                prev = pos + span - (includesEnd ? 0 : 1);
+            }
+            else
+            {
+                content.add(emptyRowAt(pos, timeGenerator));
+                prev = pos;
+            }
+        }
+
+        attachBoundaries(content);
+        if (reversed)
+        {
+            Collections.reverse(content);
+        }
+        verifyValid(content);
+        if (items <= 20)
+            dumpList(content);
+        return content;
+    }
+
+    /**
+     * Constructs a list of unfiltereds with integer clustering according to the specification string.
+     *
+     * The string is a space-delimited sorted list that can contain:
+     *  * open tombstone markers, e.g. xx<[yy] where xx is the clustering, yy is the deletion time, and "<" stands for
+     *    non-inclusive (<= for inclusive).
+     *  * close tombstone markers, e.g. [yy]<=xx. Adjacent close and open markers (e.g. [yy]<=xx xx<[zz]) are combined
+     *    into boundary markers.
+     *  * empty rows, e.g. xx or xx[yy] or xx[yyDzz] where xx is the clustering, yy is the live time and zz is deletion
+     *    time.
+     *
+     * @param input Specification.
+     * @param default_liveness Liveness to use for rows if not explicitly specified.
+     * @return Parsed list.
+     */
+    public List<Unfiltered> parse(String input, int default_liveness)
+    {
+        String[] split = input.split(" ");
+        Pattern open = Pattern.compile("(\\d+)<(=)?\\[(\\d+)\\]");
+        Pattern close = Pattern.compile("\\[(\\d+)\\]<(=)?(\\d+)");
+        Pattern row = Pattern.compile("(\\d+)(\\[(\\d+)(?:D(\\d+))?\\])?");
+        List<Unfiltered> out = new ArrayList<>(split.length);
+        for (String s : split)
+        {
+            Matcher m = open.matcher(s);
+            if (m.matches())
+            {
+                out.add(openMarker(Integer.parseInt(m.group(1)), Integer.parseInt(m.group(3)), m.group(2) != null));
+                continue;
+            }
+            m = close.matcher(s);
+            if (m.matches())
+            {
+                out.add(closeMarker(Integer.parseInt(m.group(3)), Integer.parseInt(m.group(1)), m.group(2) != null));
+                continue;
+            }
+            m = row.matcher(s);
+            if (m.matches())
+            {
+                int live = m.group(3) != null ? Integer.parseInt(m.group(3)) : default_liveness;
+                int delTime = m.group(4) != null ? Integer.parseInt(m.group(4)) : -1;
+                out.add(emptyRowAt(Integer.parseInt(m.group(1)), live, delTime));
+                continue;
+            }
+            Assert.fail("Can't parse " + s);
+        }
+        attachBoundaries(out);
+        return out;
+    }
+
+    static Row emptyRowAt(int pos, Function<Integer, Integer> timeGenerator)
+    {
+        final Clustering clustering = clusteringFor(pos);
+        final LivenessInfo live = LivenessInfo.create(timeGenerator.apply(pos), UnfilteredRowIteratorsMergeTest.nowInSec);
+        return BTreeRow.noCellLiveRow(clustering, live);
+    }
+
+    static Row emptyRowAt(int pos, int time, int deletionTime)
+    {
+        final Clustering clustering = clusteringFor(pos);
+        final LivenessInfo live = LivenessInfo.create(time, UnfilteredRowIteratorsMergeTest.nowInSec);
+        final DeletionTime delTime = deletionTime == -1 ? DeletionTime.LIVE : new DeletionTime(deletionTime, deletionTime);
+        return BTreeRow.create(clustering, live, Row.Deletion.regular(delTime), BTree.empty());
+    }
+
+    static Clustering clusteringFor(int i)
+    {
+        return Clustering.make(Int32Type.instance.decompose(i));
+    }
+
+    static ClusteringBound boundFor(int pos, boolean start, boolean inclusive)
+    {
+        return ClusteringBound.create(ClusteringBound.boundKind(start, inclusive), new ByteBuffer[] {Int32Type.instance.decompose(pos)});
+    }
+
+    static void attachBoundaries(List<Unfiltered> content)
+    {
+        int di = 0;
+        RangeTombstoneMarker prev = null;
+        for (int si = 0; si < content.size(); ++si)
+        {
+            Unfiltered currUnfiltered = content.get(si);
+            RangeTombstoneMarker curr = currUnfiltered.kind() == Kind.RANGE_TOMBSTONE_MARKER ?
+                                        (RangeTombstoneMarker) currUnfiltered :
+                                        null;
+            if (prev != null && curr != null && prev.isClose(false) && curr.isOpen(false) && prev.clustering().invert().equals(curr.clustering()))
+            {
+                // Join. Prefer not to use merger to check its correctness.
+                ClusteringBound b = (ClusteringBound) prev.clustering();
+                ClusteringBoundary boundary = ClusteringBoundary.create(
+                        b.isInclusive() ? ClusteringBound.Kind.INCL_END_EXCL_START_BOUNDARY : ClusteringBound.Kind.EXCL_END_INCL_START_BOUNDARY,
+                        b.getRawValues());
+                prev = new RangeTombstoneBoundaryMarker(boundary, prev.closeDeletionTime(false), curr.openDeletionTime(false));
+                currUnfiltered = prev;
+                --di;
+            }
+            content.set(di++, currUnfiltered);
+            prev = curr;
+        }
+        for (int pos = content.size() - 1; pos >= di; --pos)
+            content.remove(pos);
+    }
+
+    static RangeTombstoneMarker openMarker(int pos, int delTime, boolean inclusive)
+    {
+        return marker(pos, delTime, true, inclusive);
+    }
+
+    static RangeTombstoneMarker closeMarker(int pos, int delTime, boolean inclusive)
+    {
+        return marker(pos, delTime, false, inclusive);
+    }
+
+    private static RangeTombstoneMarker marker(int pos, int delTime, boolean isStart, boolean inclusive)
+    {
+        return new RangeTombstoneBoundMarker(ClusteringBound.create(ClusteringBound.boundKind(isStart, inclusive),
+                                                                    new ByteBuffer[] {clusteringFor(pos).get(0)}),
+                                             new DeletionTime(delTime, delTime));
+    }
+
+    public static UnfilteredRowIterator source(Iterable<Unfiltered> content, CFMetaData metadata, DecoratedKey partitionKey)
+    {
+        return source(content, metadata, partitionKey, DeletionTime.LIVE);
+    }
+
+    public static UnfilteredRowIterator source(Iterable<Unfiltered> content, CFMetaData metadata, DecoratedKey partitionKey, DeletionTime delTime)
+    {
+        return new Source(content.iterator(), metadata, partitionKey, delTime, false);
+    }
+
+    static class Source extends AbstractUnfilteredRowIterator implements UnfilteredRowIterator
+    {
+        Iterator<Unfiltered> content;
+
+        protected Source(Iterator<Unfiltered> content, CFMetaData metadata, DecoratedKey partitionKey, DeletionTime partitionLevelDeletion, boolean reversed)
+        {
+            super(metadata,
+                  partitionKey,
+                  partitionLevelDeletion,
+                  metadata.partitionColumns(),
+                  Rows.EMPTY_STATIC_ROW,
+                  reversed,
+                  EncodingStats.NO_STATS);
+            this.content = content;
+        }
+
+        @Override
+        protected Unfiltered computeNext()
+        {
+            return content.hasNext() ? content.next() : endOfData();
+        }
+    }
+
+    public String str(List<Unfiltered> list)
+    {
+        StringBuilder builder = new StringBuilder();
+        for (Unfiltered u : list)
+        {
+            builder.append(str(u));
+            builder.append(' ');
+        }
+        return builder.toString();
+    }
+
+    public void dumpList(List<Unfiltered> list)
+    {
+        System.out.println(str(list));
+    }
+}
\ No newline at end of file


Mime
View raw message