cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pa...@apache.org
Subject cassandra git commit: Throttle base partitions during MV repair streaming to prevent OOM
Date Thu, 28 Sep 2017 14:41:33 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 974d8fc09 -> 8ef71f3f2


Throttle base partitions during MV repair streaming to prevent OOM

Patch by Zhao Yang; Reviewed by Paulo Motta for CASSANDRA-13299


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8ef71f3f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8ef71f3f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8ef71f3f

Branch: refs/heads/trunk
Commit: 8ef71f3f29fb040cce18ba158ff5f289b388c30b
Parents: 974d8fc
Author: Zhao Yang <zhaoyangsingapore@gmail.com>
Authored: Fri Aug 11 13:04:28 2017 +0800
Committer: Paulo Motta <paulo@apache.org>
Committed: Thu Sep 28 05:45:25 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/rows/ThrottledUnfilteredIterator.java    | 251 ++++++++
 .../cassandra/streaming/StreamReceiveTask.java  |  36 +-
 .../cassandra/utils/AbstractIterator.java       |   7 +-
 .../rows/ThrottledUnfilteredIteratorTest.java   | 613 +++++++++++++++++++
 5 files changed, 890 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef71f3f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 97e6b03..081ed72 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Throttle base partitions during MV repair streaming to prevent OOM (CASSANDRA-13299)
  * Use compaction threshold for STCS in L0 (CASSANDRA-13861)
  * Fix problem with min_compress_ratio: 1 and disallow ratio < 1 (CASSANDRA-13703)
  * Add extra information to SASI timeout exception (CASSANDRA-13677)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef71f3f/src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java b/src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java
new file mode 100644
index 0000000..dd33b1e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/ThrottledUnfilteredIterator.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.CloseableIterator;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A utility class to split the given {@link#UnfilteredRowIterator} into smaller chunks each
+ * having at most {@link #throttle} + 1 unfiltereds.
+ *
+ * Only the first output contains partition level info: {@link UnfilteredRowIterator#partitionLevelDeletion}
+ * and {@link UnfilteredRowIterator#staticRow}.
+ *
+ * Besides splitting, this iterator will also ensure each chunk does not finish with an open
tombstone marker,
+ * by closing any opened tombstone markers and re-opening on the next chunk.
+ *
+ * The lifecycle of outputed {{@link UnfilteredRowIterator} only last till next call to {@link
#next()}.
+ *
+ * A subsequent {@link #next} call will exhaust the previously returned iterator before computing
the next,
+ * effectively skipping unfiltereds up to the throttle size.
+ *
+ * Closing this iterator will close the underlying iterator.
+ *
+ */
+public class ThrottledUnfilteredIterator extends AbstractIterator<UnfilteredRowIterator>
implements CloseableIterator<UnfilteredRowIterator>
+{
+    private final UnfilteredRowIterator origin;
+    private final int throttle;
+
+    // internal mutable state
+    private UnfilteredRowIterator throttledItr;
+
+    // extra unfiltereds from previous iteration
+    private Iterator<Unfiltered> overflowed = Collections.emptyIterator();
+
+    @VisibleForTesting
+    ThrottledUnfilteredIterator(UnfilteredRowIterator origin, int throttle)
+    {
+        assert origin != null;
+        assert throttle > 1 : "Throttle size must be higher than 1 to properly support
open and close tombstone boundaries.";
+        this.origin = origin;
+        this.throttle = throttle;
+        this.throttledItr = null;
+    }
+
+    @Override
+    protected UnfilteredRowIterator computeNext()
+    {
+        // exhaust previous throttled iterator
+        while (throttledItr != null && throttledItr.hasNext())
+            throttledItr.next();
+
+        if (!origin.hasNext())
+            return endOfData();
+
+        throttledItr = new WrappingUnfilteredRowIterator(origin)
+        {
+            private int count = 0;
+            private boolean isFirst = throttledItr == null;
+
+            // current batch's openMarker. if it's generated in previous batch,
+            // it must be consumed as first element of current batch
+            private RangeTombstoneMarker openMarker;
+
+            // current batch's closeMarker.
+            // it must be consumed as last element of current batch
+            private RangeTombstoneMarker closeMarker = null;
+
+            @Override
+            public boolean hasNext()
+            {
+                return (withinLimit() && wrapped.hasNext()) || closeMarker != null;
+            }
+
+            @Override
+            public Unfiltered next()
+            {
+                if (closeMarker != null)
+                {
+                    assert count == throttle;
+                    Unfiltered toReturn = closeMarker;
+                    closeMarker = null;
+                    return toReturn;
+                }
+
+                Unfiltered next;
+                assert withinLimit();
+                // in the beginning of the batch, there might be remaining unfiltereds from
previous iteration
+                if (overflowed.hasNext())
+                    next = overflowed.next();
+                else
+                    next = wrapped.next();
+                recordNext(next);
+                return next;
+            }
+
+            private void recordNext(Unfiltered unfiltered)
+            {
+                count++;
+                if (unfiltered.isRangeTombstoneMarker())
+                    updateMarker((RangeTombstoneMarker) unfiltered);
+                // when reach throttle with a remaining openMarker, we need to create corresponding
closeMarker.
+                if (count == throttle && openMarker != null)
+                {
+                    assert wrapped.hasNext();
+                    closeOpenMarker(wrapped.next());
+                }
+            }
+
+            private boolean withinLimit()
+            {
+                return count < throttle;
+            }
+
+            private void updateMarker(RangeTombstoneMarker marker)
+            {
+                openMarker = marker.isOpen(isReverseOrder()) ? marker : null;
+            }
+
+            /**
+             * There 3 cases for next, 1. if it's boundaryMarker, we split it as closeMarker
for current batch, next
+             * openMarker for next batch 2. if it's boundMakrer, it must be closeMarker.
3. if it's Row, create
+             * corresponding closeMarker for current batch, and create next openMarker for
next batch including current
+             * Row.
+             */
+            private void closeOpenMarker(Unfiltered next)
+            {
+                assert openMarker != null;
+
+                if (next.isRangeTombstoneMarker())
+                {
+                    RangeTombstoneMarker marker = (RangeTombstoneMarker) next;
+                    // if it's boundary, create closeMarker for current batch and openMarker
for next batch
+                    if (marker.isBoundary())
+                    {
+                        RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker)
marker;
+                        closeMarker = boundary.createCorrespondingCloseMarker(isReverseOrder());
+                        overflowed = Collections.singleton((Unfiltered)boundary.createCorrespondingOpenMarker(isReverseOrder())).iterator();
+                    }
+                    else
+                    {
+                        // if it's bound, it must be closeMarker.
+                        assert marker.isClose(isReverseOrder());
+                        updateMarker(marker);
+                        closeMarker = marker;
+                    }
+                }
+                else
+                {
+                    // it's Row, need to create closeMarker for current batch and openMarker
for next batch
+                    DeletionTime openDeletion = openMarker.openDeletionTime(isReverseOrder());
+                    ByteBuffer[] buffers = next.clustering().getRawValues();
+                    closeMarker = RangeTombstoneBoundMarker.exclusiveClose(isReverseOrder(),
buffers, openDeletion);
+
+                    // for next batch
+                    overflowed = Arrays.asList(RangeTombstoneBoundMarker.inclusiveOpen(isReverseOrder(),
+                                                                                       buffers,
+                                                                                       openDeletion),
next).iterator();
+                }
+            }
+
+            @Override
+            public DeletionTime partitionLevelDeletion()
+            {
+                return isFirst ? wrapped.partitionLevelDeletion() : DeletionTime.LIVE;
+            }
+
+            @Override
+            public Row staticRow()
+            {
+                return isFirst ? wrapped.staticRow() : Rows.EMPTY_STATIC_ROW;
+            }
+
+            @Override
+            public void close()
+            {
+                // no op
+            }
+        };
+        return throttledItr;
+    }
+
+    public void close()
+    {
+        if (origin != null)
+            origin.close();
+    }
+
+    /**
+     * Splits a {@link UnfilteredPartitionIterator} in {@link UnfilteredRowIterator} batches
with size no higher
+     * than <b>maxBatchSize</b>
+     */
+    public static CloseableIterator<UnfilteredRowIterator> throttle(UnfilteredPartitionIterator
partitionIterator, int maxBatchSize)
+    {
+        return new AbstractIterator<UnfilteredRowIterator>()
+        {
+            ThrottledUnfilteredIterator current = null;
+
+            protected UnfilteredRowIterator computeNext()
+            {
+                if (current != null && !current.hasNext())
+                {
+                    current.close();
+                    current = null;
+                }
+
+                if (current == null && partitionIterator.hasNext())
+                {
+                    current = new ThrottledUnfilteredIterator(partitionIterator.next(), maxBatchSize);
+                    assert current.hasNext() : "UnfilteredPartitionIterator should not contain
empty partitions";
+                }
+
+                if (current != null && current.hasNext())
+                    return current.next();
+
+                return endOfData();
+            }
+
+            public void close()
+            {
+                if (current != null)
+                    current.close();
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef71f3f/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 6aa70ad..988bc9e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -28,7 +28,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
@@ -37,6 +36,7 @@ import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.ThrottledUnfilteredIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.view.View;
 import org.apache.cassandra.dht.Bounds;
@@ -45,6 +45,7 @@ import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.concurrent.Refs;
@@ -58,6 +59,8 @@ public class StreamReceiveTask extends StreamTask
 
     private static final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("StreamReceiveTask"));
 
+    private static final int MAX_ROWS_PER_BATCH = Integer.getInteger("cassandra.repair.mutation_repair_rows_per_batch",
100);
+
     // number of files to receive
     private final int totalFiles;
     // total size of files to receive
@@ -174,29 +177,28 @@ public class StreamReceiveTask extends StreamTask
             return cfs.metadata().params.cdc;
         }
 
-        Mutation createMutation(ColumnFamilyStore cfs, UnfilteredRowIterator rowIterator)
-        {
-            return new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata())));
-        }
-
         private void sendThroughWritePath(ColumnFamilyStore cfs, Collection<SSTableReader>
readers) {
             boolean hasCdc = hasCDC(cfs);
+            ColumnFilter filter = ColumnFilter.all(cfs.metadata());
             for (SSTableReader reader : readers)
             {
                 Keyspace ks = Keyspace.open(reader.getKeyspaceName());
-                try (ISSTableScanner scanner = reader.getScanner())
+                // When doing mutation-based repair we split each partition into smaller
batches
+                // ({@link Stream MAX_ROWS_PER_BATCH}) to avoid OOMing and generating heap
pressure
+                try (ISSTableScanner scanner = reader.getScanner();
+                     CloseableIterator<UnfilteredRowIterator> throttledPartitions =
ThrottledUnfilteredIterator.throttle(scanner, MAX_ROWS_PER_BATCH))
                 {
-                    while (scanner.hasNext())
+                    while (throttledPartitions.hasNext())
                     {
-                        try (UnfilteredRowIterator rowIterator = scanner.next())
-                        {
-                            // MV *can* be applied unsafe if there's no CDC on the CFS as
we flush
-                            // before transaction is done.
-                            //
-                            // If the CFS has CDC, however, these updates need to be written
to the CommitLog
-                            // so they get archived into the cdc_raw folder
-                            ks.apply(createMutation(cfs, rowIterator), hasCdc, true, false);
-                        }
+                        // MV *can* be applied unsafe if there's no CDC on the CFS as we
flush
+                        // before transaction is done.
+                        //
+                        // If the CFS has CDC, however, these updates need to be written
to the CommitLog
+                        // so they get archived into the cdc_raw folder
+                        ks.apply(new Mutation(PartitionUpdate.fromIterator(throttledPartitions.next(),
filter)),
+                                 hasCdc,
+                                 true,
+                                 false);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef71f3f/src/java/org/apache/cassandra/utils/AbstractIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/AbstractIterator.java b/src/java/org/apache/cassandra/utils/AbstractIterator.java
index dd3d73c..7dd32b8 100644
--- a/src/java/org/apache/cassandra/utils/AbstractIterator.java
+++ b/src/java/org/apache/cassandra/utils/AbstractIterator.java
@@ -23,7 +23,7 @@ import java.util.NoSuchElementException;
 
 import com.google.common.collect.PeekingIterator;
 
-public abstract class AbstractIterator<V> implements Iterator<V>, PeekingIterator<V>
+public abstract class AbstractIterator<V> implements Iterator<V>, PeekingIterator<V>,
CloseableIterator<V>
 {
 
     private static enum State { MUST_FETCH, HAS_NEXT, DONE, FAILED }
@@ -80,4 +80,9 @@ public abstract class AbstractIterator<V> implements Iterator<V>,
PeekingIterato
     {
         throw new UnsupportedOperationException();
     }
+
+    public void close()
+    {
+        //no-op
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8ef71f3f/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java b/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java
new file mode 100644
index 0000000..2d2cce0
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/rows/ThrottledUnfilteredIteratorTest.java
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.rows;
+
+import static org.apache.cassandra.SchemaLoader.standardCFMD;
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import com.google.common.collect.Iterators;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.UpdateBuilder;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.db.AbstractReadCommandBuilder;
+import org.apache.cassandra.db.BufferDecoratedKey;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.ReadExecutionController;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
+import org.apache.cassandra.db.partitions.Partition;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class ThrottledUnfilteredIteratorTest extends CQLTester
+{
+    private static final String KSNAME = "ThrottledUnfilteredIteratorTest";
+    private static final String CFNAME = "StandardInteger1";
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KSNAME,
+                                    KeyspaceParams.simple(1),
+                                    standardCFMD(KSNAME, CFNAME, 1, UTF8Type.instance, Int32Type.instance,
Int32Type.instance));
+    }
+
+    static final TableMetadata metadata;
+    static final ColumnMetadata v1Metadata;
+    static final ColumnMetadata v2Metadata;
+
+    static
+    {
+        metadata = TableMetadata.builder("", "")
+                                .addPartitionKeyColumn("pk", Int32Type.instance)
+                                .addClusteringColumn("ck1", Int32Type.instance)
+                                .addClusteringColumn("ck2", Int32Type.instance)
+                                .addRegularColumn("v1", Int32Type.instance)
+                                .addRegularColumn("v2", Int32Type.instance)
+                                .build();
+        v1Metadata = metadata.regularAndStaticColumns().columns(false).getSimple(0);
+        v2Metadata = metadata.regularAndStaticColumns().columns(false).getSimple(1);
+    }
+
+    @Test
+    public void complexThrottleWithTombstoneTest() throws Throwable
+    {
+        // create cell tombstone, range tombstone, partition deletion
+        createTable("CREATE TABLE %s (pk int, ck1 int, ck2 int, v1 int, v2 int, PRIMARY KEY
(pk, ck1, ck2))");
+
+        for (int ck1 = 1; ck1 <= 150; ck1++)
+            for (int ck2 = 1; ck2 <= 150; ck2++)
+            {
+                int timestamp = ck1, v1 = ck1, v2 = ck2;
+                execute("INSERT INTO %s(pk,ck1,ck2,v1,v2) VALUES(1,?,?,?,?) using timestamp
"
+                        + timestamp, ck1, ck2, v1, v2);
+            }
+
+        for (int ck1 = 1; ck1 <= 100; ck1++)
+            for (int ck2 = 1; ck2 <= 100; ck2++)
+            {
+                if (ck1 % 2 == 0 || ck1 % 3 == 0) // range tombstone
+                    execute("DELETE FROM %s USING TIMESTAMP 170 WHERE pk=1 AND ck1=?", ck1);
+                else if (ck1 == ck2) // row tombstone
+                    execute("DELETE FROM %s USING TIMESTAMP 180 WHERE pk=1 AND ck1=? AND
ck2=?", ck1, ck2);
+                else if (ck1 == ck2 - 1) // cell tombstone
+                    execute("DELETE v2 FROM %s USING TIMESTAMP 190 WHERE pk=1 AND ck1=? AND
ck2=?", ck1, ck2);
+            }
+
+        // range deletion
+        execute("DELETE FROM %s USING TIMESTAMP 150 WHERE pk=1 AND ck1 > 100 AND ck1 <
120");
+        execute("DELETE FROM %s USING TIMESTAMP 150 WHERE pk=1 AND ck1 = 50 AND ck2 <
120");
+        // partition deletion
+        execute("DELETE FROM %s USING TIMESTAMP 160 WHERE pk=1");
+
+        // flush and generate 1 sstable
+        ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(currentTable());
+        cfs.forceBlockingFlush();
+        cfs.disableAutoCompaction();
+        cfs.forceMajorCompaction();
+
+        assertEquals(1, cfs.getLiveSSTables().size());
+        SSTableReader reader = cfs.getLiveSSTables().iterator().next();
+
+        try (ISSTableScanner scanner = reader.getScanner())
+        {
+            try (UnfilteredRowIterator rowIterator = scanner.next())
+            {
+                // only 1 partition data
+                assertFalse(scanner.hasNext());
+                List<Unfiltered> expectedUnfiltereds = new ArrayList<>();
+                rowIterator.forEachRemaining(expectedUnfiltereds::add);
+
+                // test different throttle
+                for (Integer throttle : Arrays.asList(2, 3, 4, 5, 11, 41, 99, 1000, 10001))
+                {
+                    try (ISSTableScanner scannerForThrottle = reader.getScanner())
+                    {
+                        assertTrue(scannerForThrottle.hasNext());
+                        try (UnfilteredRowIterator rowIteratorForThrottle = scannerForThrottle.next())
+                        {
+                            assertFalse(scannerForThrottle.hasNext());
+                            verifyThrottleIterator(expectedUnfiltereds,
+                                                   rowIteratorForThrottle,
+                                                   new ThrottledUnfilteredIterator(rowIteratorForThrottle,
throttle),
+                                                   throttle);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private void verifyThrottleIterator(List<Unfiltered> expectedUnfiltereds,
+                                        UnfilteredRowIterator rowIteratorForThrottle,
+                                        ThrottledUnfilteredIterator throttledIterator,
+                                        int throttle)
+    {
+        List<Unfiltered> output = new ArrayList<>();
+
+        boolean isRevered = rowIteratorForThrottle.isReverseOrder();
+        boolean isFirst = true;
+
+        while (throttledIterator.hasNext())
+        {
+            UnfilteredRowIterator splittedIterator = throttledIterator.next();
+            assertMetadata(rowIteratorForThrottle, splittedIterator, isFirst);
+
+            List<Unfiltered> splittedUnfiltereds = new ArrayList<>();
+
+            splittedIterator.forEachRemaining(splittedUnfiltereds::add);
+
+            int remain = expectedUnfiltereds.size() - output.size();
+            int expectedSize = remain >= throttle ? throttle : remain;
+            if (splittedUnfiltereds.size() != expectedSize)
+            {
+                assertEquals(expectedSize + 1, splittedUnfiltereds.size());
+                // the extra unfilter must be close bound marker
+                Unfiltered last = splittedUnfiltereds.get(expectedSize);
+                assertTrue(last.isRangeTombstoneMarker());
+                RangeTombstoneMarker marker = (RangeTombstoneMarker) last;
+                assertFalse(marker.isBoundary());
+                assertTrue(marker.isClose(isRevered));
+            }
+            output.addAll(splittedUnfiltereds);
+            if (isFirst)
+                isFirst = false;
+        }
+        int index = 0;
+        RangeTombstoneMarker openMarker = null;
+        for (int i = 0; i < expectedUnfiltereds.size(); i++)
+        {
+            Unfiltered expected = expectedUnfiltereds.get(i);
+            Unfiltered data = output.get(i);
+
+            // verify that all tombstone are paired
+            if (data.isRangeTombstoneMarker())
+            {
+                RangeTombstoneMarker marker = (RangeTombstoneMarker) data;
+                if (marker.isClose(isRevered))
+                {
+                    assertNotNull(openMarker);
+                    openMarker = null;
+                }
+                if (marker.isOpen(isRevered))
+                {
+                    assertNull(openMarker);
+                    openMarker = marker;
+                }
+            }
+            if (expected.equals(data))
+            {
+                index++;
+            }
+            else // because of created closeMarker and openMarker
+            {
+                assertNotNull(openMarker);
+                DeletionTime openDeletionTime = openMarker.openDeletionTime(isRevered);
+                // only boundary or row will create extra closeMarker and openMarker
+                if (expected.isRangeTombstoneMarker())
+                {
+                    RangeTombstoneMarker marker = (RangeTombstoneMarker) expected;
+                    assertTrue(marker.isBoundary());
+                    RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker)
marker;
+                    assertEquals(boundary.createCorrespondingCloseMarker(isRevered), data);
+                    assertEquals(boundary.createCorrespondingOpenMarker(isRevered), output.get(index
+ 1));
+                    assertEquals(openDeletionTime, boundary.endDeletionTime());
+
+                    openMarker = boundary.createCorrespondingOpenMarker(isRevered);
+                }
+                else
+                {
+                    ByteBuffer[] byteBuffers = expected.clustering().getRawValues();
+                    RangeTombstoneBoundMarker closeMarker = RangeTombstoneBoundMarker.exclusiveClose(isRevered,
+                                                                                        
            byteBuffers,
+                                                                                        
            openDeletionTime);
+
+                    RangeTombstoneBoundMarker nextOpenMarker = RangeTombstoneBoundMarker.inclusiveOpen(isRevered,
+                                                                                        
              byteBuffers,
+                                                                                        
              openDeletionTime);
+                    assertEquals(closeMarker, data);
+                    assertEquals(nextOpenMarker, output.get(index + 1));
+
+                    openMarker = nextOpenMarker;
+                }
+                index += 2;
+            }
+        }
+        assertNull(openMarker);
+        assertEquals(output.size(), index);
+    }
+
+    @Test
+    public void simpleThrottleTest()
+    {
+        simpleThrottleTest(false);
+    }
+
+    @Test
+    public void skipTest()
+    {
+        simpleThrottleTest(true);
+    }
+
+    public void simpleThrottleTest(boolean skipOdd)
+    {
+        // all live rows with partition deletion
+        ThrottledUnfilteredIterator throttledIterator;
+        UnfilteredRowIterator origin;
+
+        List<Row> rows = new ArrayList<>();
+        int rowCount = 1111;
+
+        for (int i = 0; i < rowCount; i++)
+            rows.add(createRow(i, createCell(v1Metadata, i), createCell(v2Metadata, i)));
+
+        // testing different throttle limit
+        for (int throttle = 2; throttle < 1200; throttle += 21)
+        {
+            origin = rows(metadata.regularAndStaticColumns(),
+                          1,
+                          new DeletionTime(0, 100),
+                          Rows.EMPTY_STATIC_ROW,
+                          rows.toArray(new Row[0]));
+            throttledIterator = new ThrottledUnfilteredIterator(origin, throttle);
+
+            int splittedCount = (int) Math.ceil(rowCount*1.0/throttle);
+            for (int i = 1; i <= splittedCount; i++)
+            {
+                UnfilteredRowIterator splitted = throttledIterator.next();
+                assertMetadata(origin, splitted, i == 1);
+                // no op
+                splitted.close();
+
+                int start = (i - 1) * throttle;
+                int end = i == splittedCount ? rowCount : i * throttle;
+                if (skipOdd && (i % 2) == 0)
+                {
+                    assertRows(splitted, rows.subList(start, end).toArray(new Row[0]));
+                }
+            }
+            assertTrue(!throttledIterator.hasNext());
+        }
+    }
+
+    @Test
+    public void throttledPartitionIteratorTest()
+    {
+        // all live rows with partition deletion
+        CloseableIterator<UnfilteredRowIterator> throttledIterator;
+        UnfilteredPartitionIterator origin;
+
+        SortedMap<Integer, List<Row>> partitions = new TreeMap<>();
+        int partitionCount = 13;
+        int baseRowsPerPartition = 1111;
+
+        for (int i = 1; i <= partitionCount; i++)
+        {
+            ArrayList<Row> rows = new ArrayList<>();
+            for (int j = 0; j < (baseRowsPerPartition + i); j++)
+                rows.add(createRow(i, createCell(v1Metadata, j), createCell(v2Metadata, j)));
+            partitions.put(i, rows);
+        }
+
+        // testing different throttle limit
+        for (int throttle = 2; throttle < 1200; throttle += 21)
+        {
+            origin = partitions(metadata.regularAndStaticColumns(),
+                                new DeletionTime(0, 100),
+                                Rows.EMPTY_STATIC_ROW,
+                                partitions);
+            throttledIterator = ThrottledUnfilteredIterator.throttle(origin, throttle);
+
+            int currentPartition = 0;
+            int rowsInPartition = 0;
+            int expectedSplitCount = 0;
+            int currentSplit = 1;
+            while (throttledIterator.hasNext())
+            {
+                UnfilteredRowIterator splitted = throttledIterator.next();
+                if (currentSplit > expectedSplitCount)
+                {
+                    currentPartition++;
+                    rowsInPartition = partitions.get(currentPartition).size();
+                    expectedSplitCount = (int) Math.ceil(rowsInPartition * 1.0 / throttle);
+                    currentSplit = 1;
+                }
+                UnfilteredRowIterator current = rows(metadata.regularAndStaticColumns(),
+                                                     currentPartition,
+                                                     new DeletionTime(0, 100),
+                                                     Rows.EMPTY_STATIC_ROW,
+                                                     partitions.get(currentPartition).toArray(new
Row[0]));
+                assertMetadata(current, splitted, currentSplit == 1);
+                // no op
+                splitted.close();
+
+                int start = (currentSplit - 1) * throttle;
+                int end = currentSplit == expectedSplitCount ? rowsInPartition : currentSplit
* throttle;
+                assertRows(splitted, partitions.get(currentPartition).subList(start, end).toArray(new
Row[0]));
+                currentSplit++;
+            }
+        }
+
+
+        origin = partitions(metadata.regularAndStaticColumns(),
+                            new DeletionTime(0, 100),
+                            Rows.EMPTY_STATIC_ROW,
+                            partitions);
+        try
+        {
+            try (CloseableIterator<UnfilteredRowIterator> throttled = ThrottledUnfilteredIterator.throttle(origin,
10))
+            {
+                int i = 0;
+                while (throttled.hasNext())
+                {
+                    assertEquals(dk(1), throttled.next().partitionKey());
+                    if (i++ == 10)
+                    {
+                        throw new RuntimeException("Dummy exception");
+                    }
+                }
+                fail("Should not reach here");
+            }
+        }
+        catch (RuntimeException rte)
+        {
+            int iteratedPartitions = 2;
+            while (iteratedPartitions <= partitionCount)
+            {
+                // check that original iterator was not closed
+                assertTrue(origin.hasNext());
+                // check it's possible to fetch second partition from original iterator
+                assertEquals(dk(iteratedPartitions++), origin.next().partitionKey());
+            }
+        }
+
+    }
+
+    private void assertMetadata(UnfilteredRowIterator origin, UnfilteredRowIterator splitted,
boolean isFirst)
+    {
+        assertEquals(splitted.columns(), origin.columns());
+        assertEquals(splitted.partitionKey(), origin.partitionKey());
+        assertEquals(splitted.isReverseOrder(), origin.isReverseOrder());
+        assertEquals(splitted.metadata(), origin.metadata());
+        assertEquals(splitted.stats(), origin.stats());
+
+        if (isFirst)
+        {
+            assertEquals(origin.partitionLevelDeletion(), splitted.partitionLevelDeletion());
+            assertEquals(origin.staticRow(), splitted.staticRow());
+        }
+        else
+        {
+            assertEquals(DeletionTime.LIVE, splitted.partitionLevelDeletion());
+            assertEquals(Rows.EMPTY_STATIC_ROW, splitted.staticRow());
+        }
+    }
+
+    public static void assertRows(UnfilteredRowIterator iterator, Row... rows)
+    {
+        Iterator<Row> rowsIterator = Arrays.asList(rows).iterator();
+
+        while (iterator.hasNext() && rowsIterator.hasNext())
+            assertEquals(iterator.next(), rowsIterator.next());
+
+        assertTrue(iterator.hasNext() == rowsIterator.hasNext());
+    }
+
+    private static DecoratedKey dk(int pk)
+    {
+        return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(pk), ByteBufferUtil.bytes(pk));
+    }
+
+    private static UnfilteredRowIterator rows(RegularAndStaticColumns columns,
+                                              int pk,
+                                              DeletionTime partitionDeletion,
+                                              Row staticRow,
+                                              Unfiltered... rows)
+    {
+        Iterator<Unfiltered> rowsIterator = Arrays.asList(rows).iterator();
+        return new AbstractUnfilteredRowIterator(metadata, dk(pk), partitionDeletion, columns,
staticRow, false, EncodingStats.NO_STATS) {
+            protected Unfiltered computeNext()
+            {
+                return rowsIterator.hasNext() ? rowsIterator.next() : endOfData();
+            }
+        };
+    }
+
+    private static UnfilteredPartitionIterator partitions(RegularAndStaticColumns columns,
+                                                          DeletionTime partitionDeletion,
+                                                          Row staticRow,
+                                                          SortedMap<Integer, List<Row>>
partitions)
+    {
+        Iterator<Map.Entry<Integer, List<Row>>> partitionIt = partitions.entrySet().iterator();
+        return new AbstractUnfilteredPartitionIterator() {
+            public boolean hasNext()
+            {
+                return partitionIt.hasNext();
+            }
+
+            public UnfilteredRowIterator next()
+            {
+                Map.Entry<Integer, List<Row>> next = partitionIt.next();
+                Iterator<Row> rowsIterator = next.getValue().iterator();
+                return new AbstractUnfilteredRowIterator(metadata, dk(next.getKey()), partitionDeletion,
columns, staticRow, false, EncodingStats.NO_STATS) {
+                    protected Unfiltered computeNext()
+                    {
+                        return rowsIterator.hasNext() ? rowsIterator.next() : endOfData();
+                    }
+                };
+            }
+
+            public TableMetadata metadata()
+            {
+                return metadata;
+            }
+        };
+    }
+
+
+    private static Row createRow(int ck, Cell... columns)
+    {
+        return createRow(ck, ck, columns);
+    }
+
+    private static Row createRow(int ck1, int ck2, Cell... columns)
+    {
+        BTreeRow.Builder builder = new BTreeRow.Builder(true);
+        builder.newRow(Util.clustering(metadata.comparator, ck1, ck2));
+        for (Cell cell : columns)
+            builder.addCell(cell);
+        return builder.build();
+    }
+
+    private static Cell createCell(ColumnMetadata metadata, int v)
+    {
+        return createCell(metadata, v, 100L, BufferCell.NO_DELETION_TIME);
+    }
+
+    private static Cell createCell(ColumnMetadata metadata, int v, long timestamp, int localDeletionTime)
+    {
+        return new BufferCell(metadata,
+                              timestamp,
+                              BufferCell.NO_TTL,
+                              localDeletionTime,
+                              ByteBufferUtil.bytes(v),
+                              null);
+    }
+
+    @Test
+    public void testThrottledIteratorWithRangeDeletions() throws Exception
+    {
+        Keyspace keyspace = Keyspace.open(KSNAME);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CFNAME);
+
+        // Inserting data
+        String key = "k1";
+
+        UpdateBuilder builder;
+
+        builder = UpdateBuilder.create(cfs.metadata(), key).withTimestamp(0);
+        for (int i = 0; i < 40; i += 2)
+            builder.newRow(i).add("val", i);
+        builder.applyUnsafe();
+
+        new RowUpdateBuilder(cfs.metadata(), 1, key).addRangeTombstone(10, 22).build().applyUnsafe();
+
+        cfs.forceBlockingFlush();
+
+        builder = UpdateBuilder.create(cfs.metadata(), key).withTimestamp(2);
+        for (int i = 1; i < 40; i += 2)
+            builder.newRow(i).add("val", i);
+        builder.applyUnsafe();
+
+        new RowUpdateBuilder(cfs.metadata(), 3, key).addRangeTombstone(19, 27).build().applyUnsafe();
+        // We don't flush to test with both a range tomsbtone in memtable and in sstable
+
+        // Queries by name
+        int[] live = new int[]{ 4, 9, 11, 17, 28 };
+        int[] dead = new int[]{ 12, 19, 21, 24, 27 };
+
+        AbstractReadCommandBuilder.PartitionRangeBuilder cmdBuilder = Util.cmd(cfs);
+
+        ReadCommand cmd = cmdBuilder.build();
+
+        for (int batchSize = 2; batchSize <= 40; batchSize++)
+        {
+            List<UnfilteredRowIterator> unfilteredRowIterators = new LinkedList<>();
+
+            try (ReadExecutionController executionController = cmd.executionController();
+                 UnfilteredPartitionIterator iterator = cmd.executeLocally(executionController))
+            {
+                assertTrue(iterator.hasNext());
+                Iterator<UnfilteredRowIterator> throttled = ThrottledUnfilteredIterator.throttle(iterator,
batchSize);
+                while (throttled.hasNext())
+                {
+                    UnfilteredRowIterator next = throttled.next();
+                    ImmutableBTreePartition materializedPartition = ImmutableBTreePartition.create(next);
+                    int unfilteredCount = Iterators.size(materializedPartition.unfilteredIterator());
+
+                    System.out.println("batchsize " + batchSize + " unfilteredCount " + unfilteredCount
+ " materializedPartition " + materializedPartition);
+
+                    if (throttled.hasNext())
+                    {
+                        if (unfilteredCount != batchSize)
+                        {
+                            //when there is extra unfiltered, it must be close bound marker
+                            assertEquals(batchSize + 1, unfilteredCount);
+                            Unfiltered last = Iterators.getLast(materializedPartition.unfilteredIterator());
+                            assertTrue(last.isRangeTombstoneMarker());
+                            RangeTombstoneMarker marker = (RangeTombstoneMarker) last;
+                            assertFalse(marker.isBoundary());
+                            assertTrue(marker.isClose(false));
+                        }
+                    }
+                    else
+                    {
+                        //only last batch can be smaller than batchSize
+                        assertTrue(unfilteredCount <= batchSize + 1);
+                    }
+                    unfilteredRowIterators.add(materializedPartition.unfilteredIterator());
+                }
+                assertFalse(iterator.hasNext());
+            }
+
+            // Verify throttled data after merge
+            Partition partition = ImmutableBTreePartition.create(UnfilteredRowIterators.merge(unfilteredRowIterators,
FBUtilities.nowInSeconds()));
+
+            int nowInSec = FBUtilities.nowInSeconds();
+
+            for (int i : live)
+                assertTrue("Row " + i + " should be live", partition.getRow(Clustering.make(ByteBufferUtil.bytes((i)))).hasLiveData(nowInSec,
cfs.metadata().enforceStrictLiveness()));
+            for (int i : dead)
+                assertFalse("Row " + i + " shouldn't be live", partition.getRow(Clustering.make(ByteBufferUtil.bytes((i)))).hasLiveData(nowInSec,
cfs.metadata().enforceStrictLiveness()));
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message