cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [4/4] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11
Date Sat, 30 Sep 2017 09:57:24 GMT
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e819fec8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e819fec8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e819fec8

Branch: refs/heads/cassandra-3.11
Commit: e819fec89c38d4b6347ea9a683798e050c65dd86
Parents: be21174 15cee48
Author: Aleksey Yeschenko <aleksey@yeschenko.com>
Authored: Sat Sep 30 10:53:06 2017 +0100
Committer: Aleksey Yeschenko <aleksey@yeschenko.com>
Committed: Sat Sep 30 10:56:07 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   8 +-
 .../cassandra/db/PartitionRangeReadCommand.java |  14 ++
 .../org/apache/cassandra/db/ReadCommand.java    |   2 +
 .../db/SinglePartitionReadCommand.java          |   5 +
 .../apache/cassandra/db/filter/DataLimits.java  |   5 +-
 .../UnfilteredPartitionIterators.java           |  19 +-
 .../apache/cassandra/service/DataResolver.java  | 239 ++++++++++++++-----
 7 files changed, 219 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e819fec8/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index a782333,4a45469..61f3405
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,20 -1,11 +1,22 @@@
 -3.0.15
 +3.11.1
 + * Fix the computation of cdc_total_space_in_mb for exabyte filesystems (CASSANDRA-13808)
-  * Handle limit correctly on tables with strict liveness (CASSANDRA-13883)
 + * AbstractTokenTreeBuilder#serializedSize returns wrong value when there is a single leaf
and overflow collisions (CASSANDRA-13869)
 + * Add a compaction option to TWCS to ignore sstables overlapping checks (CASSANDRA-13418)
 + * BTree.Builder memory leak (CASSANDRA-13754)
 + * Revert CASSANDRA-10368 of supporting non-pk column filtering due to correctness (CASSANDRA-13798)
 + * Fix cassandra-stress hang issues when an error during cluster connection happens (CASSANDRA-12938)
 + * Better bootstrap failure message when blocked by (potential) range movement (CASSANDRA-13744)
 + * "ignore" option is ignored in sstableloader (CASSANDRA-13721)
 + * Deadlock in AbstractCommitLogSegmentManager (CASSANDRA-13652)
 + * Duplicate the buffer before passing it to analyser in SASI operation (CASSANDRA-13512)
 + * Properly evict pstmts from prepared statements cache (CASSANDRA-13641)
 +Merged from 3.0:
+  * Implement short read protection on partition boundaries (CASSANDRA-13595)
+  * Fix ISE thrown by UPI.Serializer.hasNext() for some SELECT queries (CASSANDRA-13911)
   * Filter header only commit logs before recovery (CASSANDRA-13918)
   * AssertionError prepending to a list (CASSANDRA-13149)
   * Fix support for SuperColumn tables (CASSANDRA-12373)
+  * Handle limit correctly on tables with strict liveness (CASSANDRA-13883)
 - * Fix missing original update in TriggerExecutor (CASSANDRA-13894)
   * Remove non-rpc-ready nodes from counter leader candidates (CASSANDRA-13043)
   * Improve short read protection performance (CASSANDRA-13794)
   * Fix sstable reader to support range-tombstone-marker for multi-slices (CASSANDRA-13787)
@@@ -155,35 -115,12 +157,39 @@@ Merged from 3.0
   * Propagate row deletions in 2i tables on upgrade (CASSANDRA-13320)
   * Slice.isEmpty() returns false for some empty slices (CASSANDRA-13305)
   * Add formatted row output to assertEmpty in CQL Tester (CASSANDRA-13238)
 - * Legacy caching options can prevent 3.0 upgrade (CASSANDRA-13384)
 + * Prevent data loss on upgrade 2.1 - 3.0 by adding component separator to LogRecord absolute
path (CASSANDRA-13294)
 + * Improve testing on macOS by eliminating sigar logging (CASSANDRA-13233)
 + * Cqlsh copy-from should error out when csv contains invalid data for collections (CASSANDRA-13071)
 + * Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232)
 + * Coalescing strategy sleeps too much (CASSANDRA-13090)
 + * Faster StreamingHistogram (CASSANDRA-13038)
 + * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237)
 + * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070)
 + * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185)
 + * Use keyspace replication settings on system.size_estimates table (CASSANDRA-9639)
 + * Add vm.max_map_count StartupCheck (CASSANDRA-13008)
 + * Hint related logging should include the IP address of the destination in addition to
 +   host ID (CASSANDRA-13205)
 + * Reloading logback.xml does not work (CASSANDRA-13173)
 + * Lightweight transactions temporarily fail after upgrade from 2.1 to 3.0 (CASSANDRA-13109)
 + * Duplicate rows after upgrading from 2.1.16 to 3.0.10/3.9 (CASSANDRA-13125)
 + * Fix UPDATE queries with empty IN restrictions (CASSANDRA-13152)
 + * Fix handling of partition with partition-level deletion plus
 +   live rows in sstabledump (CASSANDRA-13177)
 + * Provide user workaround when system_schema.columns does not contain entries
 +   for a table that's in system_schema.tables (CASSANDRA-13180)
   * Nodetool upgradesstables/scrub/compact ignores system tables (CASSANDRA-13410)
++<<<<<<< HEAD
 + * Fix schema version calculation for rolling upgrades (CASSANDRA-13441)
++=======
+  * Fix NPE issue in StorageService (CASSANDRA-13060)
++>>>>>>> cassandra-3.0
  Merged from 2.2:
 + * Nodes started with join_ring=False should be able to serve requests when authentication
is enabled (CASSANDRA-11381)
 + * cqlsh COPY FROM: increment error count only for failures, not for attempts (CASSANDRA-13209)
   * Avoid starting gossiper in RemoveTest (CASSANDRA-13407)
   * Fix weightedSize() for row-cache reported by JMX and NodeTool (CASSANDRA-13393)
 + * Fix JVM metric names (CASSANDRA-13103)
   * Honor truststore-password parameter in cassandra-stress (CASSANDRA-12773)
   * Discard in-flight shadow round responses (CASSANDRA-12653)
   * Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e819fec8/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e819fec8/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ReadCommand.java
index 4f3c66f,2d399d8..c733b21
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@@ -174,15 -170,9 +174,17 @@@ public abstract class ReadCommand exten
      protected abstract void serializeSelection(DataOutputPlus out, int version) throws IOException;
      protected abstract long selectionSerializedSize(int version);
  
+     public abstract boolean isLimitedToOnePartition();
+ 
      /**
 +     * Creates a new <code>ReadCommand</code> instance with new limits.
 +     *
 +     * @param newLimits the new limits
 +     * @return a new <code>ReadCommand</code> with the updated limits
 +     */
 +    public abstract ReadCommand withUpdatedLimit(DataLimits newLimits);
 +
 +    /**
       * The metadata for the table queried.
       *
       * @return the metadata for the table queried.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e819fec8/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e819fec8/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e819fec8/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e819fec8/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index 84e8685,5fb34c6..111d561
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -489,19 -466,60 +479,65 @@@ public class DataResolver extends Respo
          }
      }
  
-     private class ShortReadResponseProtection extends Transformation<UnfilteredRowIterator>
+     private UnfilteredPartitionIterator extendWithShortReadProtection(UnfilteredPartitionIterator
partitions,
+                                                                       InetAddress source,
+                                                                       DataLimits.Counter
mergedResultCounter)
+     {
+         DataLimits.Counter singleResultCounter =
+             command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition(),
enforceStrictLiveness).onlyCount();
+ 
+         ShortReadPartitionsProtection protection =
 -            new ShortReadPartitionsProtection(source, singleResultCounter, mergedResultCounter);
++            new ShortReadPartitionsProtection(source, singleResultCounter, mergedResultCounter,
queryStartNanoTime);
+ 
+         /*
+          * The order of extention and transformations is important here. Extending with
more partitions has to happen
+          * first due to the way BaseIterator.hasMoreContents() works: only transformations
applied after extension will
+          * be called on the first partition of the extended iterator.
+          *
+          * Additionally, we want singleResultCounter to be applied after SRPP, so that its
applyToPartition() method will
+          * be called last, after the extension done by SRRP.applyToPartition() call. That
way we preserve the same order
+          * when it comes to calling SRRP.moreContents() and applyToRow() callbacks.
+          *
+          * See ShortReadPartitionsProtection.applyToPartition() for more details.
+          */
+ 
+         // extend with moreContents() only if it's a range read command with no partition
key specified
+         if (!command.isLimitedToOnePartition())
+             partitions = MorePartitions.extend(partitions, protection);     // register
SRPP.moreContents()
+ 
+         partitions = Transformation.apply(partitions, protection);          // register
SRPP.applyToPartition()
+         partitions = Transformation.apply(partitions, singleResultCounter); // register
the per-source counter
+ 
+         return partitions;
+     }
+ 
+     /*
+      * We have a potential short read if the result from a given node contains the requested
number of rows
+      * (i.e. it has stopped returning results due to the limit), but some of them haven't
+      * made it into the final post-reconciliation result due to other nodes' row, range,
and/or partition tombstones.
+      *
+      * If that is the case, then that node may have more rows that we should fetch, as otherwise
we could
+      * ultimately return fewer rows than required. Also, those additional rows may contain
tombstones which
+      * which we also need to fetch as they may shadow rows or partitions from other replicas'
results, which we would
+      * otherwise return incorrectly.
+      */
+     private class ShortReadPartitionsProtection extends Transformation<UnfilteredRowIterator>
implements MorePartitions<UnfilteredPartitionIterator>
      {
          private final InetAddress source;
  
          private final DataLimits.Counter singleResultCounter; // unmerged per-source counter
          private final DataLimits.Counter mergedResultCounter; // merged end-result counter
  
+         private DecoratedKey lastPartitionKey; // key of the last observed partition
+ 
+         private boolean partitionsFetched; // whether we've seen any new partitions since
iteration start or last moreContents() call
+ 
 -        private ShortReadPartitionsProtection(InetAddress source, DataLimits.Counter singleResultCounter,
DataLimits.Counter mergedResultCounter)
 +        private final long queryStartNanoTime;
 +
-         private ShortReadResponseProtection(InetAddress source,
-                                             DataLimits.Counter singleResultCounter,
-                                             DataLimits.Counter mergedResultCounter,
-                                             long queryStartNanoTime)
++        private ShortReadPartitionsProtection(InetAddress source,
++                                              DataLimits.Counter singleResultCounter,
++                                              DataLimits.Counter mergedResultCounter,
++                                              long queryStartNanoTime)
          {
              this.source = source;
              this.singleResultCounter = singleResultCounter;
@@@ -523,7 -543,75 +562,83 @@@
              return Transformation.apply(MoreRows.extend(partition, protection), protection);
          }
  
-         private class ShortReadRowProtection extends Transformation implements MoreRows<UnfilteredRowIterator>
+         /*
+          * We only get here once all the rows and partitions in this iterator have been
iterated over, and so
+          * if the node had returned the requested number of rows but we still get here,
then some results were
+          * skipped during reconciliation.
+          */
+         public UnfilteredPartitionIterator moreContents()
+         {
+             // never try to request additional partitions from replicas if our reconciled
partitions are already filled to the limit
+             assert !mergedResultCounter.isDone();
+ 
+             // we do not apply short read protection when we have no limits at all
+             assert !command.limits().isUnlimited();
+ 
+             /*
+              * If this is a single partition read command or an (indexed) partition range
read command with
+              * a partition key specified, then we can't and shouldn't try fetch more partitions.
+              */
+             assert !command.isLimitedToOnePartition();
+ 
+             /*
+              * If the returned result doesn't have enough rows/partitions to satisfy even
the original limit, don't ask for more.
+              *
+              * Can only take the short cut if there is no per partition limit set. Otherwise
it's possible to hit false
+              * positives due to some rows being uncounted for in certain scenarios (see
CASSANDRA-13911).
+              */
+             if (!singleResultCounter.isDone() && command.limits().perPartitionCount()
== DataLimits.NO_LIMIT)
+                 return null;
+ 
+             /*
+              * Either we had an empty iterator as the initial response, or our moreContents()
call got us an empty iterator.
+              * There is no point to ask the replica for more rows - it has no more in the
requested range.
+              */
+             if (!partitionsFetched)
+                 return null;
+             partitionsFetched = false;
+ 
+             /*
+              * We are going to fetch one partition at a time for thrift and potentially
more for CQL.
+              * The row limit will either be set to the per partition limit - if the command
has no total row limit set, or
+              * the total # of rows remaining - if it has some. If we don't grab enough rows
in some of the partitions,
+              * then future ShortReadRowsProtection.moreContents() calls will fetch the missing
ones.
+              */
+             int toQuery = command.limits().count() != DataLimits.NO_LIMIT
 -                        ? command.limits().count() - mergedResultCounter.counted()
++                        ? command.limits().count() - counted(mergedResultCounter)
+                         : command.limits().perPartitionCount();
+ 
+             ColumnFamilyStore.metricsFor(command.metadata().cfId).shortReadProtectionRequests.mark();
+             Tracing.trace("Requesting {} extra rows from {} for short read protection",
toQuery, source);
+ 
+             PartitionRangeReadCommand cmd = makeFetchAdditionalPartitionReadCommand(toQuery);
+             return executeReadCommand(cmd);
+         }
+ 
++        // Counts the number of rows for regular queries and the number of groups for GROUP
BY queries
++        private int counted(Counter counter)
++        {
++            return command.limits().isGroupByLimit()
++                 ? counter.rowCounted()
++                 : counter.counted();
++        }
++
+         private PartitionRangeReadCommand makeFetchAdditionalPartitionReadCommand(int toQuery)
+         {
+             PartitionRangeReadCommand cmd = (PartitionRangeReadCommand) command;
+ 
+             DataLimits newLimits = cmd.limits().forShortReadRetry(toQuery);
+ 
+             AbstractBounds<PartitionPosition> bounds = cmd.dataRange().keyRange();
+             AbstractBounds<PartitionPosition> newBounds = bounds.inclusiveRight()
+                                                         ? new Range<>(lastPartitionKey,
bounds.right)
+                                                         : new ExcludingBounds<>(lastPartitionKey,
bounds.right);
+             DataRange newDataRange = cmd.dataRange().forSubRange(newBounds);
+ 
+             return cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange);
+         }
+ 
+         private class ShortReadRowsProtection extends Transformation implements MoreRows<UnfilteredRowIterator>
          {
              private final CFMetaData metadata;
              private final DecoratedKey partitionKey;
@@@ -647,17 -729,10 +756,18 @@@
                  ColumnFamilyStore.metricsFor(metadata.cfId).shortReadProtectionRequests.mark();
                  Tracing.trace("Requesting {} extra rows from {} for short read protection",
lastQueried, source);
  
-                 return executeReadCommand(makeFetchAdditionalRowsReadCommand(lastQueried));
+                 SinglePartitionReadCommand cmd = makeFetchAdditionalRowsReadCommand(lastQueried);
+                 return UnfilteredPartitionIterators.getOnlyElement(executeReadCommand(cmd),
cmd);
              }
  
 +            // Counts the number of rows for regular queries and the number of groups for
GROUP BY queries
 +            private int countedInCurrentPartition(Counter counter)
 +            {
 +                return command.limits().isGroupByLimit()
 +                     ? counter.rowCountedInCurrentPartition()
 +                     : counter.countedInCurrentPartition();
 +            }
 +
              private SinglePartitionReadCommand makeFetchAdditionalRowsReadCommand(int toQuery)
              {
                  ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
@@@ -671,24 -746,25 +781,25 @@@
                                                           command.rowFilter(),
                                                           command.limits().forShortReadRetry(toQuery),
                                                           partitionKey,
-                                                          filter);
+                                                          filter,
+                                                          command.indexMetadata());
              }
+         }
  
-             private UnfilteredRowIterator executeReadCommand(SinglePartitionReadCommand
cmd)
-             {
-                 DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE,
1, queryStartNanoTime);
-                 ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE,
cmd, Collections.singletonList(source), queryStartNanoTime);
- 
-                 if (StorageProxy.canDoLocalRequest(source))
-                     StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd,
handler));
-                 else
-                     MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version),
source, handler);
- 
-                 // We don't call handler.get() because we want to preserve tombstones since
we're still in the middle of merging node results.
-                 handler.awaitResults();
-                 assert resolver.responses.size() == 1;
-                 return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command),
cmd);
-             }
+         private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd)
+         {
 -            DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE,
1);
 -            ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, cmd,
Collections.singletonList(source));
++            DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE,
1, queryStartNanoTime);
++            ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, cmd,
Collections.singletonList(source), queryStartNanoTime);
+ 
+             if (StorageProxy.canDoLocalRequest(source))
+                 StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd,
handler));
+             else
+                 MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version),
source, handler);
+ 
+             // We don't call handler.get() because we want to preserve tombstones since
we're still in the middle of merging node results.
+             handler.awaitResults();
+             assert resolver.responses.size() == 1;
+             return resolver.responses.get(0).payload.makeIterator(command);
          }
      }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message