cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ifesdj...@apache.org
Subject [3/6] cassandra git commit: Make concat work with iterators that have different subsets of columns
Date Wed, 12 Jul 2017 13:53:35 GMT
Make concat work with iterators that have different subsets of columns

Patch by Alex Petrov; reviewed by Sylvain Lebresne for CASSANDRA-13482.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7251c955
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7251c955
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7251c955

Branch: refs/heads/trunk
Commit: 7251c9559805d83423ca5ddbe4f955ce668c3d9a
Parents: 2400d07
Author: Alex Petrov <oleksandr.petrov@gmail.com>
Authored: Thu Jun 8 16:59:24 2017 +0200
Committer: Alex Petrov <oleksandr.petrov@gmail.com>
Committed: Wed Jul 12 15:44:06 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/SinglePartitionReadCommand.java          |  30 ++-
 .../db/rows/UnfilteredRowIterators.java         |   3 +-
 .../cassandra/db/transform/BaseIterator.java    |  13 +-
 .../cassandra/db/transform/BasePartitions.java  |   2 +-
 .../apache/cassandra/db/transform/BaseRows.java |   4 +-
 .../apache/cassandra/db/transform/MoreRows.java |   6 +
 .../db/transform/StoppingTransformation.java    |  31 ++--
 .../cassandra/db/transform/Transformation.java  |  24 +++
 .../cassandra/db/transform/UnfilteredRows.java  |  13 ++
 .../apache/cassandra/db/RowCacheCQLTest.java    |  70 +++++++
 .../db/rows/UnfilteredRowIteratorsTest.java     | 185 +++++++++++++++++++
 12 files changed, 362 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ce2324d..bf36769 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.15
+ * Make concat work with iterators that have different subsets of columns (CASSANDRA-13482)
  * Set test.runners based on cores and memory size (CASSANDRA-13078)
  * Allow different NUMACTL_ARGS to be passed in (CASSANDRA-13557)
  * Allow native function calls in CQLSSTableWriter (CASSANDRA-12606)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 72b4465..b4211bb 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -429,13 +429,39 @@ public class SinglePartitionReadCommand extends ReadCommand
 
             try
             {
-                int rowsToCache = metadata().params.caching.rowsPerPartitionToCache();
+                final int rowsToCache = metadata().params.caching.rowsPerPartitionToCache();
+
                 @SuppressWarnings("resource") // we close on exception or upon closing the
result of this method
                 UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(metadata(),
nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp);
                 try
                 {
+                    // Use a custom iterator instead of DataLimits to avoid stopping the
original iterator
+                    UnfilteredRowIterator toCacheIterator = new WrappingUnfilteredRowIterator(iter)
+                    {
+                        private int rowsCounted = 0;
+
+                        @Override
+                        public boolean hasNext()
+                        {
+                            return rowsCounted < rowsToCache && super.hasNext();
+                        }
+
+                        @Override
+                        public Unfiltered next()
+                        {
+                            Unfiltered unfiltered = super.next();
+                            if (unfiltered.isRow())
+                            {
+                                Row row = (Row) unfiltered;
+                                if (row.hasLiveData(nowInSec()))
+                                    rowsCounted++;
+                            }
+                            return unfiltered;
+                        }
+                    };
+
                     // We want to cache only rowsToCache rows
-                    CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter,
nowInSec()), nowInSec());
+                    CachedPartition toCache = CachedBTreePartition.create(toCacheIterator,
nowInSec());
                     if (sentinelSuccess && !toCache.isEmpty())
                     {
                         Tracing.trace("Caching {} rows", toCache.rowCount());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 43653a9..5c27363 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -194,7 +194,6 @@ public abstract class UnfilteredRowIterators
             && iter1.partitionKey().equals(iter2.partitionKey())
             && iter1.partitionLevelDeletion().equals(iter2.partitionLevelDeletion())
             && iter1.isReverseOrder() == iter2.isReverseOrder()
-            && iter1.columns().equals(iter2.columns())
             && iter1.staticRow().equals(iter2.staticRow());
 
         class Extend implements MoreRows<UnfilteredRowIterator>
@@ -209,7 +208,7 @@ public abstract class UnfilteredRowIterators
             }
         }
 
-        return MoreRows.extend(iter1, new Extend());
+        return MoreRows.extend(iter1, new Extend(), iter1.columns().mergeTo(iter2.columns()));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/src/java/org/apache/cassandra/db/transform/BaseIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/BaseIterator.java b/src/java/org/apache/cassandra/db/transform/BaseIterator.java
index dd928eb..d00e406 100644
--- a/src/java/org/apache/cassandra/db/transform/BaseIterator.java
+++ b/src/java/org/apache/cassandra/db/transform/BaseIterator.java
@@ -33,7 +33,15 @@ abstract class BaseIterator<V, I extends CloseableIterator<? extends
V>, O exten
 {
     I input;
     V next;
-    Stop stop; // applies at the end of the current next()
+
+    // We require two stop signals for correctness, since the `stop` reference of the base
iterator can "leak"
+    // into the transformations stack. Using a single `stop` signal may result into the inconsistent
state,
+    // since stopping transformation would stop only the child iterator.
+
+    // Signals that the base iterator has been signalled to stop. Applies at the end of the
current next().
+    Stop stop;
+    // Signals that the current child iterator has been signalled to stop.
+    Stop stopChild;
 
     static class Stop
     {
@@ -49,12 +57,14 @@ abstract class BaseIterator<V, I extends CloseableIterator<? extends
V>, O exten
         this.input = copyFrom.input;
         this.next = copyFrom.next;
         this.stop = copyFrom.stop;
+        this.stopChild = copyFrom.stopChild;
     }
 
     BaseIterator(I input)
     {
         this.input = input;
         this.stop = new Stop();
+        this.stopChild = this.stop;
     }
 
     /**
@@ -122,6 +132,7 @@ abstract class BaseIterator<V, I extends CloseableIterator<? extends
V>, O exten
                 BaseIterator abstr = (BaseIterator) newContents;
                 prefix = abstr;
                 input = (I) abstr.input;
+                stopChild = abstr.stop;
                 next = apply((V) abstr.next, holder.length); // must apply all remaining
functions to the next, if any
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/src/java/org/apache/cassandra/db/transform/BasePartitions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/BasePartitions.java b/src/java/org/apache/cassandra/db/transform/BasePartitions.java
index 026a39d..2f76452 100644
--- a/src/java/org/apache/cassandra/db/transform/BasePartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/BasePartitions.java
@@ -102,7 +102,7 @@ implements BasePartitionIterator<R>
                     }
                 }
 
-                if (stop.isSignalled || !hasMoreContents())
+                if (stop.isSignalled || stopChild.isSignalled || !hasMoreContents())
                     return false;
             }
             return true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/src/java/org/apache/cassandra/db/transform/BaseRows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/BaseRows.java b/src/java/org/apache/cassandra/db/transform/BaseRows.java
index 0586840..e6ce1da 100644
--- a/src/java/org/apache/cassandra/db/transform/BaseRows.java
+++ b/src/java/org/apache/cassandra/db/transform/BaseRows.java
@@ -126,7 +126,7 @@ implements BaseRowIterator<R>
             Transformation[] fs = stack;
             int len = length;
 
-            while (!stop.isSignalled && input.hasNext())
+            while (!stop.isSignalled && !stopChild.isSignalled && input.hasNext())
             {
                 Unfiltered next = input.next();
 
@@ -152,7 +152,7 @@ implements BaseRowIterator<R>
                 }
             }
 
-            if (stop.isSignalled || !hasMoreContents())
+            if (stop.isSignalled || stopChild.isSignalled || !hasMoreContents())
                 return false;
         }
         return true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/src/java/org/apache/cassandra/db/transform/MoreRows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/MoreRows.java b/src/java/org/apache/cassandra/db/transform/MoreRows.java
index 786e215..118739b 100644
--- a/src/java/org/apache/cassandra/db/transform/MoreRows.java
+++ b/src/java/org/apache/cassandra/db/transform/MoreRows.java
@@ -20,6 +20,7 @@
  */
 package org.apache.cassandra.db.transform;
 
+import org.apache.cassandra.db.PartitionColumns;
 import org.apache.cassandra.db.rows.BaseRowIterator;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -47,6 +48,11 @@ public interface MoreRows<I extends BaseRowIterator<?>> extends
MoreContents<I>
         return add(mutable(iterator), more);
     }
 
+    public static UnfilteredRowIterator extend(UnfilteredRowIterator iterator, MoreRows<?
super UnfilteredRowIterator> more, PartitionColumns columns)
+    {
+        return add(Transformation.wrapIterator(iterator, columns), more);
+    }
+
     public static RowIterator extend(RowIterator iterator, MoreRows<? super RowIterator>
more)
     {
         return add(mutable(iterator), more);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java b/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java
index 534091e..79563e9 100644
--- a/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java
+++ b/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java
@@ -26,8 +26,8 @@ import org.apache.cassandra.db.rows.BaseRowIterator;
 // A Transformation that can stop an iterator earlier than its natural exhaustion
 public abstract class StoppingTransformation<I extends BaseRowIterator<?>> extends
Transformation<I>
 {
-    private BaseIterator.Stop stop;
-    private BaseIterator.Stop stopInPartition;
+    private BaseIterator rows;
+    private BaseIterator partitions;
 
     /**
      * If invoked by a subclass, any partitions iterator this transformation has been applied
to will terminate
@@ -36,8 +36,12 @@ public abstract class StoppingTransformation<I extends BaseRowIterator<?>>
exten
     @DontInline
     protected void stop()
     {
-        if (stop != null)
-            stop.isSignalled = true;
+        if (partitions != null)
+        {
+            partitions.stop.isSignalled = true;
+            partitions.stopChild.isSignalled = true;
+        }
+
         stopInPartition();
     }
 
@@ -48,33 +52,36 @@ public abstract class StoppingTransformation<I extends BaseRowIterator<?>>
exten
     @DontInline
     protected void stopInPartition()
     {
-        if (stopInPartition != null)
-            stopInPartition.isSignalled = true;
+        if (rows != null)
+        {
+            rows.stop.isSignalled = true;
+            rows.stopChild.isSignalled = true;
+        }
     }
 
     @Override
     protected void attachTo(BasePartitions partitions)
     {
-        assert this.stop == null;
-        this.stop = partitions.stop;
+        assert this.partitions == null;
+        this.partitions = partitions;
     }
 
     @Override
     protected void attachTo(BaseRows rows)
     {
-        assert this.stopInPartition == null;
-        this.stopInPartition = rows.stop;
+        assert this.rows == null;
+        this.rows = rows;
     }
 
     @Override
     protected void onClose()
     {
-        stop = null;
+        partitions = null;
     }
 
     @Override
     protected void onPartitionClose()
     {
-        stopInPartition = null;
+        rows = null;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/src/java/org/apache/cassandra/db/transform/Transformation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/Transformation.java b/src/java/org/apache/cassandra/db/transform/Transformation.java
index 6a31ece..06dd057 100644
--- a/src/java/org/apache/cassandra/db/transform/Transformation.java
+++ b/src/java/org/apache/cassandra/db/transform/Transformation.java
@@ -21,6 +21,7 @@
 package org.apache.cassandra.db.transform;
 
 import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.PartitionColumns;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.*;
@@ -151,6 +152,29 @@ public abstract class Transformation<I extends BaseRowIterator<?>>
                : new FilteredRows(iterator);
     }
 
+    /**
+     * Even though this method is sumilar to `mutable`, it supresses the optimisation of
avoiding creating an additional
+     * wrapping interator object (which both creates an extra object and grows the call stack
during the iteration), it
+     * should be used with caution.
+     *
+     * It is useful in cases when the input has to be checked for more contents rather than
directly checking if it
+     * is stopped. For example, when concatenating two iterators (pseudocode):
+     *
+     *    iter1 = [row(1), row(2), row(3)]
+     *    iter2 = [row(4), row(5), row(6)]
+     *
+     *    UnfilteredRowIterators.concat(DataLimits.cqlLimits(1).filter(iter1), DataLimits.cqlLimits(1).filter(iter1))
+     *
+     * Which should yield two rows: [row(1), row(4)].
+     *
+     * Using stacked transformations instead of wrapping would result into returning a single
row, since the first
+     * iterator will signal the iterator is stopped.
+     */
+    static UnfilteredRows wrapIterator(UnfilteredRowIterator iterator, PartitionColumns columns)
+    {
+        return new UnfilteredRows(iterator, columns);
+    }
+
     static <E extends BaseIterator> E add(E to, Transformation add)
     {
         to.add(add);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java b/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java
index f000fcf..c631f2e 100644
--- a/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java
+++ b/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java
@@ -21,20 +21,33 @@
 package org.apache.cassandra.db.transform;
 
 import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.PartitionColumns;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 
 final class UnfilteredRows extends BaseRows<Unfiltered, UnfilteredRowIterator> implements
UnfilteredRowIterator
 {
+    private PartitionColumns columns;
     private DeletionTime partitionLevelDeletion;
 
     public UnfilteredRows(UnfilteredRowIterator input)
     {
+        this(input, input.columns());
+    }
+
+    public UnfilteredRows(UnfilteredRowIterator input, PartitionColumns columns)
+    {
         super(input);
+        this.columns = columns;
         partitionLevelDeletion = input.partitionLevelDeletion();
     }
 
+    public PartitionColumns columns()
+    {
+        return columns;
+    }
+
     @Override
     void add(Transformation add)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/test/unit/org/apache/cassandra/db/RowCacheCQLTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowCacheCQLTest.java b/test/unit/org/apache/cassandra/db/RowCacheCQLTest.java
index a3c0e25..a8f7e3d 100644
--- a/test/unit/org/apache/cassandra/db/RowCacheCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheCQLTest.java
@@ -37,4 +37,74 @@ public class RowCacheCQLTest extends CQLTester
         assertEquals(1, res.size());
         assertEmpty(execute("SELECT * FROM %s WHERE p1 = ? and c1 > ?", 123L, 1000));
     }
+
+    /**
+     * Test for CASSANDRA-13482
+     */
+    @Test
+    public void testPartialCache() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck1 int, v1 int, v2 int, primary key (pk, ck1))"
+
+                    "WITH CACHING = { 'keys': 'ALL', 'rows_per_partition': '1' }");
+        assertEmpty(execute("select * from %s where pk = 10000"));
+
+        execute("DELETE FROM %s WHERE pk = 1 AND ck1 = 0");
+        execute("DELETE FROM %s WHERE pk = 1 AND ck1 = 1");
+        execute("DELETE FROM %s WHERE pk = 1 AND ck1 = 2");
+        execute("INSERT INTO %s (pk, ck1, v1, v2) VALUES (1, 1, 1, 1)");
+        execute("INSERT INTO %s (pk, ck1, v1, v2) VALUES (1, 2, 2, 2)");
+        execute("INSERT INTO %s (pk, ck1, v1, v2) VALUES (1, 3, 3, 3)");
+        execute("DELETE FROM %s WHERE pk = 1 AND ck1 = 2");
+        execute("DELETE FROM %s WHERE pk = 1 AND ck1 = 3");
+        execute("INSERT INTO %s (pk, ck1, v1, v2) VALUES (1, 4, 4, 4)");
+        execute("INSERT INTO %s (pk, ck1, v1, v2) VALUES (1, 5, 5, 5)");
+
+        assertRows(execute("select * from %s where pk = 1"),
+                   row(1, 1, 1, 1),
+                   row(1, 4, 4, 4),
+                   row(1, 5, 5, 5));
+        assertRows(execute("select * from %s where pk = 1 LIMIT 1"),
+                   row(1, 1, 1, 1));
+
+        assertRows(execute("select * from %s where pk = 1 and ck1 >=2"),
+                   row(1, 4, 4, 4),
+                   row(1, 5, 5, 5));
+        assertRows(execute("select * from %s where pk = 1 and ck1 >=2 LIMIT 1"),
+                   row(1, 4, 4, 4));
+
+        assertRows(execute("select * from %s where pk = 1 and ck1 >=2"),
+                   row(1, 4, 4, 4),
+                   row(1, 5, 5, 5));
+        assertRows(execute("select * from %s where pk = 1 and ck1 >=2 LIMIT 1"),
+                   row(1, 4, 4, 4));
+    }
+
+    @Test
+    public void testPartialCacheWithStatic() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck1 int, s int static, v1 int, primary key
(pk, ck1))" +
+                    "WITH CACHING = { 'keys': 'ALL', 'rows_per_partition': '1' }");
+        assertEmpty(execute("select * from %s where pk = 10000"));
+
+        execute("INSERT INTO %s (pk, s) VALUES (1, 1)");
+        execute("INSERT INTO %s (pk, ck1, v1) VALUES (1, 2, 2)");
+        execute("INSERT INTO %s (pk, ck1, v1) VALUES (1, 3, 3)");
+
+        execute("DELETE FROM %s WHERE pk = 2 AND ck1 = 0");
+        execute("DELETE FROM %s WHERE pk = 2 AND ck1 = 1");
+        execute("DELETE FROM %s WHERE pk = 3 AND ck1 = 2");
+        execute("INSERT INTO %s (pk, s) VALUES (2, 2)");
+        execute("INSERT INTO %s (pk, ck1, v1) VALUES (2, 1, 1)");
+        execute("INSERT INTO %s (pk, ck1, v1) VALUES (2, 2, 2)");
+        execute("INSERT INTO %s (pk, ck1, v1) VALUES (2, 3, 3)");
+
+        assertRows(execute("select * from %s WHERE pk = 1"),
+                   row(1, 2, 1, 2),
+                   row(1, 3, 1, 3));
+
+        assertRows(execute("select * from %s WHERE pk = 2"),
+                   row(2, 1, 2, 1),
+                   row(2, 2, 2, 2),
+                   row(2, 3, 2, 3));
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7251c955/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsTest.java b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsTest.java
new file mode 100644
index 0000000..43b9549
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.rows;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.BufferDecoratedKey;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.EmptyIterators;
+import org.apache.cassandra.db.PartitionColumns;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class UnfilteredRowIteratorsTest
+{
+    static final CFMetaData metadata;
+    static final ColumnDefinition v1Metadata;
+    static final ColumnDefinition v2Metadata;
+
+    static
+    {
+        metadata = CFMetaData.Builder.create("", "")
+                             .addPartitionKey("pk", Int32Type.instance)
+                                     .addClusteringColumn("ck", Int32Type.instance)
+                             .addRegularColumn("v1", Int32Type.instance)
+                             .addRegularColumn("v2", Int32Type.instance)
+                             .build();
+        v1Metadata = metadata.partitionColumns().columns(false).getSimple(0);
+        v2Metadata = metadata.partitionColumns().columns(false).getSimple(1);
+    }
+
+
+    @Test
+    public void concatTest()
+    {
+        UnfilteredRowIterator iter1, iter2, iter3, concat;
+        // simple concatenation
+        iter1 = rows(metadata.partitionColumns(), 1,
+                     row(1, cell(v1Metadata, 1), cell(v2Metadata, 1)),
+                     row(2, cell(v1Metadata, 2), cell(v2Metadata, 2)));
+        iter2 = rows(metadata.partitionColumns(), 1,
+                     row(3, cell(v1Metadata, 3), cell(v2Metadata, 3)),
+                     row(4, cell(v1Metadata, 4), cell(v2Metadata, 4)));
+        concat = UnfilteredRowIterators.concat(iter1, iter2);
+        Assert.assertEquals(concat.columns(), metadata.partitionColumns());
+        assertRows(concat,
+                   row(1, cell(v1Metadata, 1), cell(v2Metadata, 1)),
+                   row(2, cell(v1Metadata, 2), cell(v2Metadata, 2)),
+                   row(3, cell(v1Metadata, 3), cell(v2Metadata, 3)),
+                   row(4, cell(v1Metadata, 4), cell(v2Metadata, 4)));
+
+        // concat with RHS empty iterator
+        iter1 = rows(metadata.partitionColumns(), 1,
+                     row(1, cell(v1Metadata, 1), cell(v2Metadata, 1)),
+                     row(2, cell(v1Metadata, 2), cell(v2Metadata, 2)));
+        Assert.assertEquals(concat.columns(), metadata.partitionColumns());
+        assertRows(UnfilteredRowIterators.concat(iter1, EmptyIterators.unfilteredRow(metadata,
dk(1), false, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE)),
+                   row(1, cell(v1Metadata, 1), cell(v2Metadata, 1)),
+                   row(2, cell(v1Metadata, 2), cell(v2Metadata, 2)));
+
+        // concat with LHS empty iterator
+        iter1 = rows(metadata.partitionColumns(), 1,
+                     row(1, cell(v1Metadata, 1), cell(v2Metadata, 1)),
+                     row(2, cell(v1Metadata, 2), cell(v2Metadata, 2)));
+        Assert.assertEquals(concat.columns(), metadata.partitionColumns());
+        assertRows(UnfilteredRowIterators.concat(EmptyIterators.unfilteredRow(metadata, dk(1),
false, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE), iter1),
+                   row(1, cell(v1Metadata, 1), cell(v2Metadata, 1)),
+                   row(2, cell(v1Metadata, 2), cell(v2Metadata, 2)));
+
+        // concat with different columns
+        iter1 = rows(metadata.partitionColumns().without(v1Metadata), 1,
+                     row(1, cell(v2Metadata, 1)), row(2, cell(v2Metadata, 2)));
+        iter2 = rows(metadata.partitionColumns().without(v2Metadata), 1,
+                     row(3, cell(v1Metadata, 3)), row(4, cell(v1Metadata, 4)));
+        concat = UnfilteredRowIterators.concat(iter1, iter2);
+        Assert.assertEquals(concat.columns(), PartitionColumns.of(v1Metadata).mergeTo(PartitionColumns.of(v2Metadata)));
+        assertRows(concat,
+                   row(1, cell(v2Metadata, 1)), row(2, cell(v2Metadata, 2)),
+                   row(3, cell(v1Metadata, 3)), row(4, cell(v1Metadata, 4)));
+
+        // concat with CQL limits
+        iter1 = rows(metadata.partitionColumns(), 1,
+                     row(1, cell(v1Metadata, 1), cell(v2Metadata, 1)),
+                     row(2, cell(v1Metadata, 2), cell(v2Metadata, 2)));
+        iter2 = rows(metadata.partitionColumns(), 1,
+                     row(3, cell(v1Metadata, 3), cell(v2Metadata, 3)),
+                     row(4, cell(v1Metadata, 4), cell(v2Metadata, 4)));
+        concat = UnfilteredRowIterators.concat(DataLimits.cqlLimits(1).filter(iter1, FBUtilities.nowInSeconds()),
+                                               DataLimits.cqlLimits(1).filter(iter2, FBUtilities.nowInSeconds()));
+        Assert.assertEquals(concat.columns(), metadata.partitionColumns());
+        assertRows(concat,
+                   row(1, cell(v1Metadata, 1), cell(v2Metadata, 1)),
+                   row(3, cell(v1Metadata, 3), cell(v2Metadata, 3)));
+
+        // concat concatenated iterators
+        iter1 = rows(metadata.partitionColumns(), 1,
+                     row(1, cell(v1Metadata, 1), cell(v2Metadata, 1)),
+                     row(2, cell(v1Metadata, 2), cell(v2Metadata, 2)));
+        iter2 = rows(metadata.partitionColumns(), 1,
+                     row(3, cell(v1Metadata, 3), cell(v2Metadata, 3)),
+                     row(4, cell(v1Metadata, 4), cell(v2Metadata, 4)));
+
+        concat = UnfilteredRowIterators.concat(DataLimits.cqlLimits(1).filter(iter1, FBUtilities.nowInSeconds()),
+                                               DataLimits.cqlLimits(1).filter(iter2, FBUtilities.nowInSeconds()));
+
+        iter3 = rows(metadata.partitionColumns(), 1,
+                     row(4, cell(v1Metadata, 4), cell(v2Metadata, 4)),
+                     row(5, cell(v1Metadata, 5), cell(v2Metadata, 5)));
+        concat = UnfilteredRowIterators.concat(concat, DataLimits.cqlLimits(1).filter(iter3,
FBUtilities.nowInSeconds()));
+
+        Assert.assertEquals(concat.columns(), metadata.partitionColumns());
+        assertRows(concat,
+                   row(1, cell(v1Metadata, 1), cell(v2Metadata, 1)),
+                   row(3, cell(v1Metadata, 3), cell(v2Metadata, 3)),
+                   row(4, cell(v1Metadata, 4), cell(v2Metadata, 4)));
+    }
+
+    public static void assertRows(UnfilteredRowIterator iterator, Row... rows)
+    {
+        Iterator<Row> rowsIterator = Arrays.asList(rows).iterator();
+
+        while (iterator.hasNext() && rowsIterator.hasNext())
+            Assert.assertEquals(iterator.next(), rowsIterator.next());
+
+        Assert.assertTrue(iterator.hasNext() == rowsIterator.hasNext());
+    }
+
+    public static DecoratedKey dk(int pk)
+    {
+        return new BufferDecoratedKey(new Murmur3Partitioner.LongToken(pk), ByteBufferUtil.bytes(pk));
+    }
+
+    public static UnfilteredRowIterator rows(PartitionColumns columns, int pk, Row... rows)
+    {
+        Iterator<Row> rowsIterator = Arrays.asList(rows).iterator();
+        return new AbstractUnfilteredRowIterator(metadata, dk(pk), DeletionTime.LIVE, columns,
Rows.EMPTY_STATIC_ROW, false, EncodingStats.NO_STATS) {
+            protected Unfiltered computeNext()
+            {
+                return rowsIterator.hasNext() ? rowsIterator.next() : endOfData();
+            }
+        };
+    }
+
+    public Row row(int ck, Cell... columns)
+    {
+        BTreeRow.Builder builder = new BTreeRow.Builder(true);
+        builder.newRow(Util.clustering(metadata.comparator, ck));
+        for (Cell cell : columns)
+            builder.addCell(cell);
+        return builder.build();
+    }
+
+    public Cell cell(ColumnDefinition metadata, int v)
+    {
+        return new BufferCell(metadata,
+                              1L, BufferCell.NO_TTL, BufferCell.NO_DELETION_TIME, ByteBufferUtil.bytes(v),
null);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message