cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [6/6] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
Date Tue, 29 Aug 2017 11:36:05 GMT
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/278906c6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/278906c6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/278906c6

Branch: refs/heads/trunk
Commit: 278906c6c0424c1ce0d922c24747c97978b0aa14
Parents: 326f3a7 826ae9c
Author: Aleksey Yeschenko <aleksey@yeschenko.com>
Authored: Tue Aug 29 12:33:33 2017 +0100
Committer: Aleksey Yeschenko <aleksey@yeschenko.com>
Committed: Tue Aug 29 12:33:50 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../UnfilteredPartitionIterators.java           |  7 ---
 .../db/transform/EmptyPartitionsDiscarder.java  | 35 +++++++++++++++
 .../apache/cassandra/db/transform/Filter.java   | 28 +++---------
 .../db/transform/FilteredPartitions.java        | 15 ++++---
 .../cassandra/db/transform/FilteredRows.java    |  2 +-
 .../apache/cassandra/metrics/TableMetrics.java  |  4 ++
 .../apache/cassandra/service/DataResolver.java  | 45 ++++++++++++++------
 .../apache/cassandra/db/ReadCommandTest.java    | 23 +++++-----
 9 files changed, 101 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
index ed643bb,ad9446d..fa12c9c
--- a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
+++ b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
@@@ -50,11 -50,19 +50,16 @@@ public final class FilteredPartitions e
      /**
       * Filter any RangeTombstoneMarker from the iterator's iterators, transforming it into
a PartitionIterator.
       */
-     public static PartitionIterator filter(UnfilteredPartitionIterator iterator, int nowInSecs)
+     public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, int nowInSecs)
      {
-         Filter filter = new Filter(true, nowInSecs);
-         if (iterator instanceof UnfilteredPartitions)
-             return new FilteredPartitions(filter, (UnfilteredPartitions) iterator);
-         return new FilteredPartitions(iterator, filter);
+         FilteredPartitions filtered = filter(iterator, new Filter(nowInSecs));
 -
 -        return iterator.isForThrift()
 -             ? filtered
 -             : (FilteredPartitions) Transformation.apply(filtered, new EmptyPartitionsDiscarder());
++        return (FilteredPartitions) Transformation.apply(filtered, new EmptyPartitionsDiscarder());
+     }
+ 
+     public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, Filter
filter)
+     {
+         return iterator instanceof UnfilteredPartitions
+              ? new FilteredPartitions(filter, (UnfilteredPartitions) iterator)
+              : new FilteredPartitions(iterator, filter);
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/metrics/TableMetrics.java
index 58b017e,b0f667c..7e6ca25
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@@ -240,33 -201,8 +240,35 @@@ public class TableMetric
          }
      });
  
 +    public static final Gauge<Long> globalBytesRepaired = Metrics.register(globalFactory.createMetricName("BytesRepaired"),
 +                                                                           new Gauge<Long>()
 +    {
 +        public Long getValue()
 +        {
 +            return totalNonSystemTablesSize(SSTableReader::isRepaired).left;
 +        }
 +    });
 +
 +    public static final Gauge<Long> globalBytesUnrepaired = Metrics.register(globalFactory.createMetricName("BytesUnrepaired"),
 +                                                                             new Gauge<Long>()
 +    {
 +        public Long getValue()
 +        {
 +            return totalNonSystemTablesSize(s -> !s.isRepaired() && !s.isPendingRepair()).left;
 +        }
 +    });
 +
 +    public static final Gauge<Long> globalBytesPendingRepair = Metrics.register(globalFactory.createMetricName("BytesPendingRepair"),
 +                                                                                new Gauge<Long>()
 +    {
 +        public Long getValue()
 +        {
 +            return totalNonSystemTablesSize(SSTableReader::isPendingRepair).left;
 +        }
 +    });
 +
+     public final Meter shortReadProtectionRequests;
+ 
      public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
      /**
       * stores metrics that will be rolled into a single global metric
@@@ -810,25 -697,7 +812,27 @@@
          casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
          casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
  
 +        repairsStarted = createTableCounter("RepairJobsStarted");
 +        repairsCompleted = createTableCounter("RepairJobsCompleted");
 +
 +        anticompactionTime = createTableTimer("AnticompactionTime", cfs.keyspace.metric.anticompactionTime);
 +        validationTime = createTableTimer("ValidationTime", cfs.keyspace.metric.validationTime);
 +        syncTime = createTableTimer("SyncTime", cfs.keyspace.metric.repairSyncTime);
 +
 +        bytesValidated = createTableHistogram("BytesValidated", cfs.keyspace.metric.bytesValidated,
false);
 +        partitionsValidated = createTableHistogram("PartitionsValidated", cfs.keyspace.metric.partitionsValidated,
false);
 +        bytesAnticompacted = createTableCounter("BytesAnticompacted");
 +        bytesMutatedAnticompaction = createTableCounter("BytesMutatedAnticompaction");
 +        mutatedAnticompactionGauge = createTableGauge("MutatedAnticompactionGauge", () ->
 +        {
 +            double bytesMutated = bytesMutatedAnticompaction.getCount();
 +            double bytesAnticomp = bytesAnticompacted.getCount();
 +            if (bytesAnticomp + bytesMutated > 0)
 +                return bytesMutated / (bytesAnticomp + bytesMutated);
 +            return 0.0;
 +        });
++
+         shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
      }
  
      public void updateSSTableIterated(int count)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index 78bbe16,32b6d79..f4a472d
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -27,13 -27,9 +27,12 @@@ import com.google.common.collect.Iterab
  
  import org.apache.cassandra.concurrent.Stage;
  import org.apache.cassandra.concurrent.StageManager;
 -import org.apache.cassandra.config.*;
 +import org.apache.cassandra.schema.ColumnMetadata;
++import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.*;
- import org.apache.cassandra.db.filter.ClusteringIndexFilter;
- import org.apache.cassandra.db.filter.ColumnFilter;
- import org.apache.cassandra.db.filter.DataLimits;
+ import org.apache.cassandra.db.filter.*;
  import org.apache.cassandra.db.filter.DataLimits.Counter;
  import org.apache.cassandra.db.partitions.*;
  import org.apache.cassandra.db.rows.*;
@@@ -76,10 -71,29 +74,26 @@@ public class DataResolver extends Respo
              sources[i] = msg.from;
          }
  
-         // Even though every responses should honor the limit, we might have more than requested
post reconciliation,
-         // so ensure we're respecting the limit.
+         /*
+          * Even though every response, individually, will honor the limit, it is possible
that we will, after the merge,
+          * have more rows than the client requested. To make sure that we still conform
to the original limit,
+          * we apply a top-level post-reconciliation counter to the merged partition iterator.
+          *
+          * Short read protection logic (ShortReadRowProtection.moreContents()) relies on
this counter to be applied
+          * to the current partition to work. For this reason we have to apply the counter
transformation before
+          * empty partition discard logic kicks in - for it will eagerly consume the iterator.
+          *
+          * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard
empty partitions
+          *
+          * See CASSANDRA-13747 for more details.
+          */
+ 
          DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true,
command.selectsFullPartition());
-         return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter));
+ 
+         UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources,
counter);
+         FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec()));
+         PartitionIterator counted = counter.applyTo(filtered);
 -
 -        return command.isForThrift()
 -             ? counted
 -             : Transformation.apply(counted, new EmptyPartitionsDiscarder());
++        return Transformation.apply(counted, new EmptyPartitionsDiscarder());
      }
  
      public void compareResponses()
@@@ -541,6 -557,9 +557,9 @@@
                                                                                     partitionKey,
                                                                                     retryFilter);
  
+                 Tracing.trace("Requesting {} extra rows from {} for short read protection",
toQuery, source);
 -                Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().cfId).metric.shortReadProtectionRequests.mark();
++                Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().id).metric.shortReadProtectionRequests.mark();
+ 
                  return doShortReadRetry(cmd);
              }
  
@@@ -581,9 -600,9 +600,9 @@@
                  DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE,
1, queryStartNanoTime);
                  ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE,
retryCommand, Collections.singletonList(source), queryStartNanoTime);
                  if (StorageProxy.canDoLocalRequest(source))
-                       StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand,
handler));
+                     StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand,
handler));
                  else
 -                    MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version),
source, handler);
 +                    MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(),
source, handler);
  
                  // We don't call handler.get() because we want to preserve tombstones since
we're still in the middle of merging node results.
                  handler.awaitResults();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/test/unit/org/apache/cassandra/db/ReadCommandTest.java
----------------------------------------------------------------------


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message