cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [5/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11
Date Wed, 20 Sep 2017 16:48:58 GMT
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2bae4ca9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2bae4ca9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2bae4ca9

Branch: refs/heads/cassandra-3.11
Commit: 2bae4ca907ac4d2ab53c899e5cf5c9e4de631f52
Parents: c1efaf3 f93e6e3
Author: Aleksey Yeschenko <aleksey@yeschenko.com>
Authored: Wed Sep 20 17:41:07 2017 +0100
Committer: Aleksey Yeschenko <aleksey@yeschenko.com>
Committed: Wed Sep 20 17:41:07 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |   5 +
 .../apache/cassandra/service/DataResolver.java  | 304 +++++++++++--------
 3 files changed, 187 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bae4ca9/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 39270e5,07742ef..8d07cbc
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,15 -1,5 +1,16 @@@
 -3.0.15
 +3.11.1
 + * AbstractTokenTreeBuilder#serializedSize returns wrong value when there is a single leaf
and overflow collisions (CASSANDRA-13869)
 + * Add a compaction option to TWCS to ignore sstables overlapping checks (CASSANDRA-13418)
 + * BTree.Builder memory leak (CASSANDRA-13754)
 + * Revert CASSANDRA-10368 of supporting non-pk column filtering due to correctness (CASSANDRA-13798)
 + * Fix cassandra-stress hang issues when an error during cluster connection happens (CASSANDRA-12938)
 + * Better bootstrap failure message when blocked by (potential) range movement (CASSANDRA-13744)
 + * "ignore" option is ignored in sstableloader (CASSANDRA-13721)
 + * Deadlock in AbstractCommitLogSegmentManager (CASSANDRA-13652)
 + * Duplicate the buffer before passing it to analyser in SASI operation (CASSANDRA-13512)
 + * Properly evict pstmts from prepared statements cache (CASSANDRA-13641)
 +Merged from 3.0:
+  * Improve short read protection performance (CASSANDRA-13794)
   * Fix sstable reader to support range-tombstone-marker for multi-slices (CASSANDRA-13787)
   * Fix short read protection for tables with no clustering columns (CASSANDRA-13880)
   * Make isBuilt volatile in PartitionUpdate (CASSANDRA-13619)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bae4ca9/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bae4ca9/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index 4b0bd3c,9a98ee5..32fc015
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -43,12 -43,10 +43,12 @@@ public class DataResolver extends Respo
  {
      @VisibleForTesting
      final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new
ArrayList<>());
 +    private final long queryStartNanoTime;
  
-     public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency,
int maxResponseCount, long queryStartNanoTime)
 -    DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int
maxResponseCount)
++    DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int
maxResponseCount, long queryStartNanoTime)
      {
          super(keyspace, command, consistency, maxResponseCount);
 +        this.queryStartNanoTime = queryStartNanoTime;
      }
  
      public PartitionIterator getData()
@@@ -122,10 -123,23 +125,23 @@@
          if (!command.limits().isUnlimited())
          {
              for (int i = 0; i < results.size(); i++)
-                 results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i],
resultCounter, queryStartNanoTime)));
+             {
+                 DataLimits.Counter singleResultCounter =
+                     command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition()).onlyCount();
+ 
+                 ShortReadResponseProtection protection =
 -                    new ShortReadResponseProtection(sources[i], singleResultCounter, mergedResultCounter);
++                    new ShortReadResponseProtection(sources[i], singleResultCounter, mergedResultCounter,
queryStartNanoTime);
+ 
+                 /*
+                  * The order of transformations is important here. See ShortReadResponseProtection.applyToPartition()
+                  * comments for details. We want singleResultCounter.applyToPartition()
to be called after SRRP applies
+                  * its transformations, so that this order is preserved when calling applyToRows()
too.
+                  */
+                 results.set(i, Transformation.apply(Transformation.apply(results.get(i),
protection), singleResultCounter));
+             }
          }
  
-         return UnfilteredPartitionIterators.merge(results, command.nowInSec(), listener);
+         return UnfilteredPartitionIterators.merge(results, command.nowInSec(), new RepairMergeListener(sources));
      }
  
      private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener
@@@ -209,9 -223,9 +225,9 @@@
              // For each source, the time of the current deletion as known by the source.
              private final DeletionTime[] sourceDeletionTime = new DeletionTime[sources.length];
              // For each source, record if there is an open range to send as repair, and
from where.
 -            private final Slice.Bound[] markerToRepair = new Slice.Bound[sources.length];
 +            private final ClusteringBound[] markerToRepair = new ClusteringBound[sources.length];
  
-             public MergeListener(DecoratedKey partitionKey, PartitionColumns columns, boolean
isReversed)
+             private MergeListener(DecoratedKey partitionKey, PartitionColumns columns, boolean
isReversed)
              {
                  this.partitionKey = partitionKey;
                  this.columns = columns;
@@@ -471,19 -473,18 +487,24 @@@
          }
      }
  
-     private class ShortReadProtection extends Transformation<UnfilteredRowIterator>
+     private class ShortReadResponseProtection extends Transformation<UnfilteredRowIterator>
      {
          private final InetAddress source;
-         private final DataLimits.Counter counter;
-         private final DataLimits.Counter postReconciliationCounter;
+ 
+         private final DataLimits.Counter singleResultCounter; // unmerged per-source counter
+         private final DataLimits.Counter mergedResultCounter; // merged end-result counter
+ 
 -        private ShortReadResponseProtection(InetAddress source, DataLimits.Counter singleResultCounter,
DataLimits.Counter mergedResultCounter)
 +        private final long queryStartNanoTime;
 +
-         private ShortReadProtection(InetAddress source, DataLimits.Counter postReconciliationCounter,
long queryStartNanoTime)
++        private ShortReadResponseProtection(InetAddress source,
++                                            DataLimits.Counter singleResultCounter,
++                                            DataLimits.Counter mergedResultCounter,
++                                            long queryStartNanoTime)
          {
              this.source = source;
-             this.counter = command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition()).onlyCount();
-             this.postReconciliationCounter = postReconciliationCounter;
+             this.singleResultCounter = singleResultCounter;
+             this.mergedResultCounter = mergedResultCounter;
 +            this.queryStartNanoTime = queryStartNanoTime;
          }
  
          @Override
@@@ -523,100 -525,133 +545,141 @@@
                  return row;
              }
  
-             @Override
+             /*
+              * We have a potential short read if the result from a given node contains the
requested number of rows
+              * for that partition (i.e. it has stopped returning results due to the limit),
but some of them haven't
+              * made it into the final post-reconciliation result due to other nodes' tombstones.
+              *
+              * If that is the case, then that node may have more rows that we should fetch,
as otherwise we could
+              * ultimately return fewer rows than required. Also, those additional rows may
contain tombstones which
+              * which we also need to fetch as they may shadow rows from other replicas'
results, which we would
+              * otherwise return incorrectly.
+              *
+              * Also note that we only get here once all the rows for this partition have
been iterated over, and so
+              * if the node had returned the requested number of rows but we still get here,
then some results were
+              * skipped during reconciliation.
+              */
              public UnfilteredRowIterator moreContents()
              {
-                 assert !postReconciliationCounter.isDoneForPartition();
- 
-                 // We have a short read if the node this is the result of has returned the
requested number of
-                 // rows for that partition (i.e. it has stopped returning results due to
the limit), but some of
-                 // those results haven't made it in the final result post-reconciliation
due to other nodes
-                 // tombstones. If that is the case, then the node might have more results
that we should fetch
-                 // as otherwise we might return less results than required, or results that
shouldn't be returned
-                 // (because the node has tombstone that hides future results from other
nodes but that haven't
-                 // been returned due to the limit).
-                 // Also note that we only get here once all the results for this node have
been returned, and so
-                 // if the node had returned the requested number but we still get there,
it imply some results were
-                 // skipped during reconciliation.
-                 if (lastCount == counted(counter) || !counter.isDoneForPartition())
+                 // never try to request additional rows from replicas if our reconciled
partition is already filled to the limit
+                 assert !mergedResultCounter.isDoneForPartition();
+ 
+                 // we do not apply short read protection when we have no limits at all
+                 assert !command.limits().isUnlimited();
+ 
+                 // if the returned partition doesn't have enough rows to satisfy even the
original limit, don't ask for more
+                 if (!singleResultCounter.isDoneForPartition())
                      return null;
  
-                 // clustering of the last row returned is empty, meaning that there is only
one row per partition,
-                 // and we already have it.
-                 if (lastClustering == Clustering.EMPTY)
+                 /*
+                  * If the replica has no live rows in the partition, don't try to fetch
more.
+                  *
+                  * Note that the previous branch [if (!singleResultCounter.isDoneForPartition())
return null] doesn't
+                  * always cover this scenario:
+                  * isDoneForPartition() is defined as [isDone() || rowInCurrentPartition
>= perPartitionLimit],
+                  * and will return true if isDone() returns true, even if there are 0 rows
counted in the current partition.
+                  *
+                  * This can happen with a range read if after 1+ rounds of short read protection
requests we managed to fetch
+                  * enough extra rows for other partitions to satisfy the singleResultCounter's
total row limit, but only
+                  * have tombstones in the current partition.
+                  *
+                  * One other way we can hit this condition is when the partition only has
a live static row and no regular
+                  * rows. In that scenario the counter will remain at 0 until the partition
is closed - which happens after
+                  * the moreContents() call.
+                  */
 -                if (singleResultCounter.countedInCurrentPartition() == 0)
++                if (countedInCurrentPartition(singleResultCounter) == 0)
                      return null;
  
-                 lastCount = counted(counter);
- 
-                 // We need to try to query enough additional results to fulfill our query,
but because we could still
-                 // get short reads on that additional query, just querying the number of
results we miss may not be
-                 // enough. But we know that when this node answered n rows (counter.countedInCurrentPartition),
only
-                 // x rows (postReconciliationCounter.countedInCurrentPartition()) made it
in the final result.
-                 // So our ratio of live rows to requested rows is x/n, so since we miss
n-x rows, we estimate that
-                 // we should request m rows so that m * x/n = n-x, that is m = (n^2/x) -
n.
-                 // Also note that it's ok if we retrieve more results that necessary since
our top level iterator is a
-                 // counting iterator.
-                 int n = countedInCurrentPartition(postReconciliationCounter);
-                 int x = countedInCurrentPartition(counter);
-                 int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1);
- 
-                 DataLimits retryLimits = command.limits().forShortReadRetry(toQuery);
-                 ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
-                 ClusteringIndexFilter retryFilter = lastClustering == null ? filter : filter.forPaging(metadata.comparator,
lastClustering, false);
-                 SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(command.metadata(),
-                                                                                    command.nowInSec(),
-                                                                                    command.columnFilter(),
-                                                                                    command.rowFilter(),
-                                                                                    retryLimits,
-                                                                                    partitionKey,
-                                                                                    retryFilter);
- 
-                 Tracing.trace("Requesting {} extra rows from {} for short read protection",
toQuery, source);
-                 Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().cfId).metric.shortReadProtectionRequests.mark();
- 
-                 return doShortReadRetry(cmd);
-             }
+                 /*
+                  * This is a table with no clustering columns, and has at most one row per
partition - with EMPTY clustering.
+                  * We already have the row, so there is no point in asking for more from
the partition.
+                  */
+                 if (Clustering.EMPTY == lastClustering)
+                     return null;
  
-             /**
-              * Returns the number of results counted by the counter.
-              *
-              * @param counter the counter.
-              * @return the number of results counted by the counter
-              */
-             private int counted(Counter counter)
-             {
-                 // We are interested by the number of rows but for GROUP BY queries 'counted'
returns the number of
-                 // groups.
-                 if (command.limits().isGroupByLimit())
-                     return counter.rowCounted();
 -                lastFetched = singleResultCounter.countedInCurrentPartition() - lastCounted;
 -                lastCounted = singleResultCounter.countedInCurrentPartition();
++                lastFetched = countedInCurrentPartition(singleResultCounter) - lastCounted;
++                lastCounted = countedInCurrentPartition(singleResultCounter);
+ 
+                 // getting back fewer rows than we asked for means the partition on the
replica has been fully consumed
+                 if (lastQueried > 0 && lastFetched < lastQueried)
+                     return null;
  
-                 return counter.counted();
+                 /*
+                  * At this point we know that:
+                  *     1. the replica returned [repeatedly?] as many rows as we asked for
and potentially has more
+                  *        rows in the partition
+                  *     2. at least one of those returned rows was shadowed by a tombstone
returned from another
+                  *        replica
+                  *     3. we haven't satisfied the client's limits yet, and should attempt
to query for more rows to
+                  *        avoid a short read
+                  *
+                  * In the ideal scenario, we would get exactly min(a, b) or fewer rows from
the next request, where a and b
+                  * are defined as follows:
+                  *     [a] limits.count() - mergedResultCounter.counted()
+                  *     [b] limits.perPartitionCount() - mergedResultCounter.countedInCurrentPartition()
+                  *
+                  * It would be naive to query for exactly that many rows, as it's possible
and not unlikely
+                  * that some of the returned rows would also be shadowed by tombstones from
other hosts.
+                  *
+                  * Note: we don't know, nor do we care, how many rows from the replica made
it into the reconciled result;
+                  * we can only tell how many in total we queried for, and that [0, mrc.countedInCurrentPartition())
made it.
+                  *
+                  * In general, our goal should be to minimise the number of extra requests
- *not* to minimise the number
+                  * of rows fetched: there is a high transactional cost for every individual
request, but a relatively low
+                  * marginal cost for each extra row requested.
+                  *
+                  * As such it's better to overfetch than to underfetch extra rows from a
host; but at the same
+                  * time we want to respect paging limits and not blow up spectacularly.
+                  *
+                  * Note: it's ok to retrieve more rows that necessary since singleResultCounter
is not stopping and only
+                  * counts.
+                  *
+                  * With that in mind, we'll just request the minimum of (count(), perPartitionCount())
limits,
+                  * but no fewer than 8 rows (an arbitrary round lower bound), to ensure
that we won't fetch row by row
+                  * for SELECT DISTINCT queries (that set per partition limit to 1).
+                  *
+                  * See CASSANDRA-13794 for more details.
+                  */
+                 lastQueried = Math.max(Math.min(command.limits().count(), command.limits().perPartitionCount()),
8);
+ 
+                 ColumnFamilyStore.metricsFor(metadata.cfId).shortReadProtectionRequests.mark();
+                 Tracing.trace("Requesting {} extra rows from {} for short read protection",
lastQueried, source);
+ 
+                 return executeReadCommand(makeFetchAdditionalRowsReadCommand(lastQueried));
              }
  
-             /**
-              * Returns the number of results counted in the partition by the counter.
-              *
-              * @param counter the counter.
-              * @return the number of results counted in the partition by the counter
-              */
++            // Counts the number of rows for regular queries and the number of groups for
GROUP BY queries
 +            private int countedInCurrentPartition(Counter counter)
 +            {
-                 // We are interested by the number of rows but for GROUP BY queries 'countedInCurrentPartition'
returns
-                 // the number of groups in the current partition.
-                 if (command.limits().isGroupByLimit())
-                     return counter.rowCountedInCurrentPartition();
++                return command.limits().isGroupByLimit()
++                     ? counter.rowCountedInCurrentPartition()
++                     : counter.countedInCurrentPartition();
++            }
 +
-                 return counter.countedInCurrentPartition();
+             private SinglePartitionReadCommand makeFetchAdditionalRowsReadCommand(int toQuery)
+             {
+                 ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
+                 if (null != lastClustering)
+                     filter = filter.forPaging(metadata.comparator, lastClustering, false);
+ 
+                 return SinglePartitionReadCommand.create(command.metadata(),
+                                                          command.nowInSec(),
+                                                          command.columnFilter(),
+                                                          command.rowFilter(),
+                                                          command.limits().forShortReadRetry(toQuery),
+                                                          partitionKey,
+                                                          filter);
              }
  
-             private UnfilteredRowIterator doShortReadRetry(SinglePartitionReadCommand retryCommand)
+             private UnfilteredRowIterator executeReadCommand(SinglePartitionReadCommand
cmd)
              {
-                 DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE,
1, queryStartNanoTime);
-                 ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE,
retryCommand, Collections.singletonList(source), queryStartNanoTime);
 -                DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE,
1);
 -                ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE,
cmd, Collections.singletonList(source));
++                DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE,
1, queryStartNanoTime);
++                ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE,
cmd, Collections.singletonList(source), queryStartNanoTime);
+ 
                  if (StorageProxy.canDoLocalRequest(source))
-                     StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand,
handler));
+                     StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd,
handler));
                  else
-                     MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version),
source, handler);
+                     MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version),
source, handler);
  
                  // We don't call handler.get() because we want to preserve tombstones since
we're still in the middle of merging node results.
                  handler.awaitResults();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message