cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [6/6] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
Date Mon, 13 Nov 2017 13:20:51 GMT
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/07258a96
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/07258a96
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/07258a96

Branch: refs/heads/trunk
Commit: 07258a96bfde3a6df839b4cc2c79e500d95163f0
Parents: 7707b73 9ee44db
Author: Aleksey Yeschenko <aleksey@yeschenko.com>
Authored: Mon Nov 13 13:15:15 2017 +0000
Committer: Aleksey Yeschenko <aleksey@yeschenko.com>
Committed: Mon Nov 13 13:18:03 2017 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/metrics/TableMetrics.java  |  2 +
 .../apache/cassandra/service/DataResolver.java  | 51 +++++++++++++++++---
 3 files changed, 46 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/07258a96/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/07258a96/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/metrics/TableMetrics.java
index 04fbf46,e78bb66..5c4a849
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@@ -248,33 -201,7 +248,34 @@@ public class TableMetric
          }
      });
  
 +    public static final Gauge<Long> globalBytesRepaired = Metrics.register(globalFactory.createMetricName("BytesRepaired"),
 +                                                                           new Gauge<Long>()
 +    {
 +        public Long getValue()
 +        {
 +            return totalNonSystemTablesSize(SSTableReader::isRepaired).left;
 +        }
 +    });
 +
 +    public static final Gauge<Long> globalBytesUnrepaired = Metrics.register(globalFactory.createMetricName("BytesUnrepaired"),
 +                                                                             new Gauge<Long>()
 +    {
 +        public Long getValue()
 +        {
 +            return totalNonSystemTablesSize(s -> !s.isRepaired() && !s.isPendingRepair()).left;
 +        }
 +    });
 +
 +    public static final Gauge<Long> globalBytesPendingRepair = Metrics.register(globalFactory.createMetricName("BytesPendingRepair"),
 +                                                                                new Gauge<Long>()
 +    {
 +        public Long getValue()
 +        {
 +            return totalNonSystemTablesSize(SSTableReader::isPendingRepair).left;
 +        }
 +    });
 +
+     public final Meter readRepairRequests;
      public final Meter shortReadProtectionRequests;
  
      public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
@@@ -825,26 -698,7 +826,27 @@@
          casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose);
          casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit);
  
 +        repairsStarted = createTableCounter("RepairJobsStarted");
 +        repairsCompleted = createTableCounter("RepairJobsCompleted");
 +
 +        anticompactionTime = createTableTimer("AnticompactionTime", cfs.keyspace.metric.anticompactionTime);
 +        validationTime = createTableTimer("ValidationTime", cfs.keyspace.metric.validationTime);
 +        syncTime = createTableTimer("SyncTime", cfs.keyspace.metric.repairSyncTime);
 +
 +        bytesValidated = createTableHistogram("BytesValidated", cfs.keyspace.metric.bytesValidated,
false);
 +        partitionsValidated = createTableHistogram("PartitionsValidated", cfs.keyspace.metric.partitionsValidated,
false);
 +        bytesAnticompacted = createTableCounter("BytesAnticompacted");
 +        bytesMutatedAnticompaction = createTableCounter("BytesMutatedAnticompaction");
 +        mutatedAnticompactionGauge = createTableGauge("MutatedAnticompactionGauge", () ->
 +        {
 +            double bytesMutated = bytesMutatedAnticompaction.getCount();
 +            double bytesAnticomp = bytesAnticompacted.getCount();
 +            if (bytesAnticomp + bytesMutated > 0)
 +                return bytesMutated / (bytesAnticomp + bytesMutated);
 +            return 0.0;
 +        });
 +
+         readRepairRequests = Metrics.meter(factory.createMetricName("ReadRepairRequests"));
          shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/07258a96/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index d4c77d1,f63f4f5..933014f
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -464,15 -468,49 +467,47 @@@ public class DataResolver extends Respo
              public void close()
              {
                  for (int i = 0; i < repairs.length; i++)
+                     if (null != repairs[i])
+                         sendRepairMutation(repairs[i], sources[i]);
+             }
+ 
+             private void sendRepairMutation(PartitionUpdate partition, InetAddress destination)
+             {
+                 Mutation mutation = new Mutation(partition);
+                 int messagingVersion = MessagingService.instance().getVersion(destination);
+ 
+                 int    mutationSize = (int) Mutation.serializer.serializedSize(mutation,
messagingVersion);
+                 int maxMutationSize = DatabaseDescriptor.getMaxMutationSize();
+ 
+                 if (mutationSize <= maxMutationSize)
                  {
-                     if (repairs[i] == null)
-                         continue;
- 
-                     // use a separate verb here because we don't want these to be get the
white glove hint-
-                     // on-timeout behavior that a "real" mutation gets
-                     Tracing.trace("Sending read-repair-mutation to {}", sources[i]);
-                     MessageOut<Mutation> msg = new Mutation(repairs[i]).createMessage(MessagingService.Verb.READ_REPAIR);
-                     repairResults.add(MessagingService.instance().sendRR(msg, sources[i]));
+                     Tracing.trace("Sending read-repair-mutation to {}", destination);
+                     // use a separate verb here to avoid writing hints on timeouts
+                     MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.READ_REPAIR);
+                     repairResults.add(MessagingService.instance().sendRR(message, destination));
 -                    ColumnFamilyStore.metricsFor(command.metadata().cfId).readRepairRequests.mark();
++                    ColumnFamilyStore.metricsFor(command.metadata().id).readRepairRequests.mark();
+                 }
+                 else if (DROP_OVERSIZED_READ_REPAIR_MUTATIONS)
+                 {
 -                    logger.debug("Encountered an oversized ({}/{}) read repair mutation
for table {}.{}, key {}, node {}",
++                    logger.debug("Encountered an oversized ({}/{}) read repair mutation
for table {}, key {}, node {}",
+                                  mutationSize,
+                                  maxMutationSize,
 -                                 command.metadata().ksName,
 -                                 command.metadata().cfName,
 -                                 command.metadata().getKeyValidator().getString(partitionKey.getKey()),
++                                 command.metadata(),
++                                 command.metadata().partitionKeyType.getString(partitionKey.getKey()),
+                                  destination);
+                 }
+                 else
+                 {
 -                    logger.warn("Encountered an oversized ({}/{}) read repair mutation for
table {}.{}, key {}, node {}",
++                    logger.warn("Encountered an oversized ({}/{}) read repair mutation for
table {}, key {}, node {}",
+                                 mutationSize,
+                                 maxMutationSize,
 -                                command.metadata().ksName,
 -                                command.metadata().cfName,
 -                                command.metadata().getKeyValidator().getString(partitionKey.getKey()),
++                                command.metadata(),
++                                command.metadata().partitionKeyType.getString(partitionKey.getKey()),
+                                 destination);
+ 
+                     int blockFor = consistency.blockFor(keyspace);
+                     Tracing.trace("Timed out while read-repairing after receiving all {}
data and digest responses", blockFor);
+                     throw new ReadTimeoutException(consistency, blockFor - 1, blockFor,
true);
                  }
              }
          }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message