cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [1/4] cassandra git commit: Implement short read protection on partition boundaries
Date Sat, 30 Sep 2017 09:57:21 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.11 be2117492 -> e819fec89


Implement short read protection on partition boundaries

patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for
CASSANDRA-13595


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/68a67469
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/68a67469
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/68a67469

Branch: refs/heads/cassandra-3.11
Commit: 68a67469f8b25534d086b29b8fe0fa4ec3f9d1ec
Parents: 1efdf33
Author: Aleksey Yeschenko <aleksey@yeschenko.com>
Authored: Thu Sep 21 14:29:05 2017 +0100
Committer: Aleksey Yeschenko <aleksey@yeschenko.com>
Committed: Sat Sep 30 10:33:30 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/PartitionRangeReadCommand.java |  14 ++
 .../org/apache/cassandra/db/ReadCommand.java    |   2 +
 .../db/SinglePartitionReadCommand.java          |   5 +
 .../apache/cassandra/db/filter/DataLimits.java  |   5 +-
 .../apache/cassandra/service/DataResolver.java  | 216 ++++++++++++++-----
 6 files changed, 180 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/68a67469/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1c53aa5..c5e54d4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.15
+ * Implement short read protection on partition boundaries (CASSANDRA-13595)
  * Fix ISE thrown by UPI.Serializer.hasNext() for some SELECT queries (CASSANDRA-13911)
  * Filter header only commit logs before recovery (CASSANDRA-13918)
  * AssertionError prepending to a list (CASSANDRA-13149)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68a67469/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 9e557e0..84e3c7d 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.BaseRowIterator;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -400,6 +401,19 @@ public class PartitionRangeReadCommand extends ReadCommand
         return DataRange.serializer.serializedSize(dataRange(), version, metadata());
     }
 
+    /*
+     * We are currently using PartitionRangeReadCommand for most index queries, even if they
are explicitly restricted
+     * to a single partition key. Return true if that is the case.
+     *
+     * See CASSANDRA-11617 and CASSANDRA-11872 for details.
+     */
+    public boolean isLimitedToOnePartition()
+    {
+        return dataRange.keyRange instanceof Bounds
+            && dataRange.startKey().kind() == PartitionPosition.Kind.ROW_KEY
+            && dataRange.startKey().equals(dataRange.stopKey());
+    }
+
     private static class Deserializer extends SelectionDeserializer
     {
         public ReadCommand deserialize(DataInputPlus in,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68a67469/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 160b104..2d399d8 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -170,6 +170,8 @@ public abstract class ReadCommand implements ReadQuery
     protected abstract void serializeSelection(DataOutputPlus out, int version) throws IOException;
     protected abstract long selectionSerializedSize(int version);
 
+    public abstract boolean isLimitedToOnePartition();
+
     /**
      * The metadata for the table queried.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68a67469/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 7a66eca..4b10530 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -1096,6 +1096,11 @@ public class SinglePartitionReadCommand extends ReadCommand
              + ClusteringIndexFilter.serializer.serializedSize(clusteringIndexFilter(), version);
     }
 
+    public boolean isLimitedToOnePartition()
+    {
+        return true;
+    }
+
     /**
      * Groups multiple single partition read commands.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68a67469/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index 6b74293..4c57a76 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -312,10 +312,7 @@ public abstract class DataLimits
 
         public DataLimits forShortReadRetry(int toFetch)
         {
-            // When we do a short read retry, we're only ever querying the single partition
on which we have a short read. So
-            // we use toFetch as the row limit and use no perPartitionLimit (it would be
equivalent in practice to use toFetch
-            // for both argument or just for perPartitionLimit with no limit on rowLimit).
-            return new CQLLimits(toFetch, NO_LIMIT, isDistinct);
+            return new CQLLimits(toFetch, perPartitionLimit, isDistinct);
         }
 
         public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData,
boolean enforceStrictLiveness)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/68a67469/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 7d8ffc5..5fb34c6 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -34,6 +34,9 @@ import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.transform.*;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.ExcludingBounds;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.tracing.Tracing;
@@ -91,7 +94,7 @@ public class DataResolver extends ResponseResolver
          * have more rows than the client requested. To make sure that we still conform to
the original limit,
          * we apply a top-level post-reconciliation counter to the merged partition iterator.
          *
-         * Short read protection logic (ShortReadRowProtection.moreContents()) relies on
this counter to be applied
+         * Short read protection logic (ShortReadRowsProtection.moreContents()) relies on
this counter to be applied
          * to the current partition to work. For this reason we have to apply the counter
transformation before
          * empty partition discard logic kicks in - for it will eagerly consume the iterator.
          *
@@ -121,26 +124,13 @@ public class DataResolver extends ResponseResolver
         if (results.size() == 1)
             return results.get(0);
 
-        // So-called "short reads" stems from nodes returning only a subset of the results
they have for a partition due to the limit,
-        // but that subset not being enough post-reconciliation. So if we don't have limit,
don't bother.
+        /*
+         * So-called short reads stems from nodes returning only a subset of the results
they have due to the limit,
+         * but that subset not being enough post-reconciliation. So if we don't have a limit,
don't bother.
+         */
         if (!command.limits().isUnlimited())
-        {
             for (int i = 0; i < results.size(); i++)
-            {
-                DataLimits.Counter singleResultCounter =
-                    command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition(),
enforceStrictLiveness).onlyCount();
-
-                ShortReadResponseProtection protection =
-                    new ShortReadResponseProtection(sources[i], singleResultCounter, mergedResultCounter);
-
-                /*
-                 * The order of transformations is important here. See ShortReadResponseProtection.applyToPartition()
-                 * comments for details. We want singleResultCounter.applyToPartition() to
be called after SRRP applies
-                 * its transformations, so that this order is preserved when calling applyToRows()
too.
-                 */
-                results.set(i, Transformation.apply(Transformation.apply(results.get(i),
protection), singleResultCounter));
-            }
-        }
+                results.set(i, extendWithShortReadProtection(results.get(i), sources[i],
mergedResultCounter));
 
         return UnfilteredPartitionIterators.merge(results, command.nowInSec(), new RepairMergeListener(sources));
     }
@@ -476,14 +466,60 @@ public class DataResolver extends ResponseResolver
         }
     }
 
-    private class ShortReadResponseProtection extends Transformation<UnfilteredRowIterator>
+    private UnfilteredPartitionIterator extendWithShortReadProtection(UnfilteredPartitionIterator
partitions,
+                                                                      InetAddress source,
+                                                                      DataLimits.Counter
mergedResultCounter)
+    {
+        DataLimits.Counter singleResultCounter =
+            command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition(),
enforceStrictLiveness).onlyCount();
+
+        ShortReadPartitionsProtection protection =
+            new ShortReadPartitionsProtection(source, singleResultCounter, mergedResultCounter);
+
+        /*
+         * The order of extention and transformations is important here. Extending with more
partitions has to happen
+         * first due to the way BaseIterator.hasMoreContents() works: only transformations
applied after extension will
+         * be called on the first partition of the extended iterator.
+         *
+         * Additionally, we want singleResultCounter to be applied after SRPP, so that its
applyToPartition() method will
+         * be called last, after the extension done by SRRP.applyToPartition() call. That
way we preserve the same order
+         * when it comes to calling SRRP.moreContents() and applyToRow() callbacks.
+         *
+         * See ShortReadPartitionsProtection.applyToPartition() for more details.
+         */
+
+        // extend with moreContents() only if it's a range read command with no partition
key specified
+        if (!command.isLimitedToOnePartition())
+            partitions = MorePartitions.extend(partitions, protection);     // register SRPP.moreContents()
+
+        partitions = Transformation.apply(partitions, protection);          // register SRPP.applyToPartition()
+        partitions = Transformation.apply(partitions, singleResultCounter); // register the
per-source counter
+
+        return partitions;
+    }
+
+    /*
+     * We have a potential short read if the result from a given node contains the requested
number of rows
+     * (i.e. it has stopped returning results due to the limit), but some of them haven't
+     * made it into the final post-reconciliation result due to other nodes' row, range,
and/or partition tombstones.
+     *
+     * If that is the case, then that node may have more rows that we should fetch, as otherwise
we could
+     * ultimately return fewer rows than required. Also, those additional rows may contain
tombstones which
+     * which we also need to fetch as they may shadow rows or partitions from other replicas'
results, which we would
+     * otherwise return incorrectly.
+     */
+    private class ShortReadPartitionsProtection extends Transformation<UnfilteredRowIterator>
implements MorePartitions<UnfilteredPartitionIterator>
     {
         private final InetAddress source;
 
         private final DataLimits.Counter singleResultCounter; // unmerged per-source counter
         private final DataLimits.Counter mergedResultCounter; // merged end-result counter
 
-        private ShortReadResponseProtection(InetAddress source, DataLimits.Counter singleResultCounter,
DataLimits.Counter mergedResultCounter)
+        private DecoratedKey lastPartitionKey; // key of the last observed partition
+
+        private boolean partitionsFetched; // whether we've seen any new partitions since
iteration start or last moreContents() call
+
+        private ShortReadPartitionsProtection(InetAddress source, DataLimits.Counter singleResultCounter,
DataLimits.Counter mergedResultCounter)
         {
             this.source = source;
             this.singleResultCounter = singleResultCounter;
@@ -493,29 +529,100 @@ public class DataResolver extends ResponseResolver
         @Override
         public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
         {
-            ShortReadRowProtection protection = new ShortReadRowProtection(partition.metadata(),
partition.partitionKey());
+            partitionsFetched = true;
+
+            lastPartitionKey = partition.partitionKey();
 
             /*
-             * Extend for moreContents() then apply protection to track lastClustering.
+             * Extend for moreContents() then apply protection to track lastClustering by
applyToRow().
              *
              * If we don't apply the transformation *after* extending the partition with
MoreRows,
              * applyToRow() method of protection will not be called on the first row of the
new extension iterator.
              */
+            ShortReadRowsProtection protection = new ShortReadRowsProtection(partition.metadata(),
partition.partitionKey());
             return Transformation.apply(MoreRows.extend(partition, protection), protection);
         }
 
-        private class ShortReadRowProtection extends Transformation implements MoreRows<UnfilteredRowIterator>
+        /*
+         * We only get here once all the rows and partitions in this iterator have been iterated
over, and so
+         * if the node had returned the requested number of rows but we still get here, then
some results were
+         * skipped during reconciliation.
+         */
+        public UnfilteredPartitionIterator moreContents()
+        {
+            // never try to request additional partitions from replicas if our reconciled
partitions are already filled to the limit
+            assert !mergedResultCounter.isDone();
+
+            // we do not apply short read protection when we have no limits at all
+            assert !command.limits().isUnlimited();
+
+            /*
+             * If this is a single partition read command or an (indexed) partition range
read command with
+             * a partition key specified, then we can't and shouldn't try fetch more partitions.
+             */
+            assert !command.isLimitedToOnePartition();
+
+            /*
+             * If the returned result doesn't have enough rows/partitions to satisfy even
the original limit, don't ask for more.
+             *
+             * Can only take the short cut if there is no per partition limit set. Otherwise
it's possible to hit false
+             * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911).
+             */
+            if (!singleResultCounter.isDone() && command.limits().perPartitionCount()
== DataLimits.NO_LIMIT)
+                return null;
+
+            /*
+             * Either we had an empty iterator as the initial response, or our moreContents()
call got us an empty iterator.
+             * There is no point to ask the replica for more rows - it has no more in the
requested range.
+             */
+            if (!partitionsFetched)
+                return null;
+            partitionsFetched = false;
+
+            /*
+             * We are going to fetch one partition at a time for thrift and potentially more
for CQL.
+             * The row limit will either be set to the per partition limit - if the command
has no total row limit set, or
+             * the total # of rows remaining - if it has some. If we don't grab enough rows
in some of the partitions,
+             * then future ShortReadRowsProtection.moreContents() calls will fetch the missing
ones.
+             */
+            int toQuery = command.limits().count() != DataLimits.NO_LIMIT
+                        ? command.limits().count() - mergedResultCounter.counted()
+                        : command.limits().perPartitionCount();
+
+            ColumnFamilyStore.metricsFor(command.metadata().cfId).shortReadProtectionRequests.mark();
+            Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery,
source);
+
+            PartitionRangeReadCommand cmd = makeFetchAdditionalPartitionReadCommand(toQuery);
+            return executeReadCommand(cmd);
+        }
+
+        private PartitionRangeReadCommand makeFetchAdditionalPartitionReadCommand(int toQuery)
+        {
+            PartitionRangeReadCommand cmd = (PartitionRangeReadCommand) command;
+
+            DataLimits newLimits = cmd.limits().forShortReadRetry(toQuery);
+
+            AbstractBounds<PartitionPosition> bounds = cmd.dataRange().keyRange();
+            AbstractBounds<PartitionPosition> newBounds = bounds.inclusiveRight()
+                                                        ? new Range<>(lastPartitionKey,
bounds.right)
+                                                        : new ExcludingBounds<>(lastPartitionKey,
bounds.right);
+            DataRange newDataRange = cmd.dataRange().forSubRange(newBounds);
+
+            return cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange);
+        }
+
+        private class ShortReadRowsProtection extends Transformation implements MoreRows<UnfilteredRowIterator>
         {
             private final CFMetaData metadata;
             private final DecoratedKey partitionKey;
 
-            private Clustering lastClustering;
+            private Clustering lastClustering; // clustering of the last observed row
 
             private int lastCounted = 0; // last seen recorded # before attempting to fetch
more rows
             private int lastFetched = 0; // # rows returned by last attempt to get more (or
by the original read command)
             private int lastQueried = 0; // # extra rows requested from the replica last
time
 
-            private ShortReadRowProtection(CFMetaData metadata, DecoratedKey partitionKey)
+            private ShortReadRowsProtection(CFMetaData metadata, DecoratedKey partitionKey)
             {
                 this.metadata = metadata;
                 this.partitionKey = partitionKey;
@@ -529,18 +636,9 @@ public class DataResolver extends ResponseResolver
             }
 
             /*
-             * We have a potential short read if the result from a given node contains the
requested number of rows
-             * for that partition (i.e. it has stopped returning results due to the limit),
but some of them haven't
-             * made it into the final post-reconciliation result due to other nodes' tombstones.
-             *
-             * If that is the case, then that node may have more rows that we should fetch,
as otherwise we could
-             * ultimately return fewer rows than required. Also, those additional rows may
contain tombstones which
-             * which we also need to fetch as they may shadow rows from other replicas' results,
which we would
-             * otherwise return incorrectly.
-             *
-             * Also note that we only get here once all the rows for this partition have
been iterated over, and so
-             * if the node had returned the requested number of rows but we still get here,
then some results were
-             * skipped during reconciliation.
+             * We only get here once all the rows in this iterator have been iterated over,
and so if the node
+             * had returned the requested number of rows but we still get here, then some
results were skipped
+             * during reconciliation.
              */
             public UnfilteredRowIterator moreContents()
             {
@@ -622,18 +720,17 @@ public class DataResolver extends ResponseResolver
                  * Note: it's ok to retrieve more rows that necessary since singleResultCounter
is not stopping and only
                  * counts.
                  *
-                 * With that in mind, we'll just request the minimum of (count(), perPartitionCount())
limits,
-                 * but no fewer than 8 rows (an arbitrary round lower bound), to ensure that
we won't fetch row by row
-                 * for SELECT DISTINCT queries (that set per partition limit to 1).
+                 * With that in mind, we'll just request the minimum of (count(), perPartitionCount())
limits.
                  *
                  * See CASSANDRA-13794 for more details.
                  */
-                lastQueried = Math.max(Math.min(command.limits().count(), command.limits().perPartitionCount()),
8);
+                lastQueried = Math.min(command.limits().count(), command.limits().perPartitionCount());
 
                 ColumnFamilyStore.metricsFor(metadata.cfId).shortReadProtectionRequests.mark();
                 Tracing.trace("Requesting {} extra rows from {} for short read protection",
lastQueried, source);
 
-                return executeReadCommand(makeFetchAdditionalRowsReadCommand(lastQueried));
+                SinglePartitionReadCommand cmd = makeFetchAdditionalRowsReadCommand(lastQueried);
+                return UnfilteredPartitionIterators.getOnlyElement(executeReadCommand(cmd),
cmd);
             }
 
             private SinglePartitionReadCommand makeFetchAdditionalRowsReadCommand(int toQuery)
@@ -649,24 +746,25 @@ public class DataResolver extends ResponseResolver
                                                          command.rowFilter(),
                                                          command.limits().forShortReadRetry(toQuery),
                                                          partitionKey,
-                                                         filter);
+                                                         filter,
+                                                         command.indexMetadata());
             }
+        }
 
-            private UnfilteredRowIterator executeReadCommand(SinglePartitionReadCommand cmd)
-            {
-                DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE,
1);
-                ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, cmd,
Collections.singletonList(source));
-
-                if (StorageProxy.canDoLocalRequest(source))
-                    StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd,
handler));
-                else
-                    MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version),
source, handler);
-
-                // We don't call handler.get() because we want to preserve tombstones since
we're still in the middle of merging node results.
-                handler.awaitResults();
-                assert resolver.responses.size() == 1;
-                return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command),
cmd);
-            }
+        private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd)
+        {
+            DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE,
1);
+            ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, cmd,
Collections.singletonList(source));
+
+            if (StorageProxy.canDoLocalRequest(source))
+                StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd,
handler));
+            else
+                MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version),
source, handler);
+
+            // We don't call handler.get() because we want to preserve tombstones since we're
still in the middle of merging node results.
+            handler.awaitResults();
+            assert resolver.responses.size() == 1;
+            return resolver.responses.get(0).payload.makeIterator(command);
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message