cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [1/5] git commit: track coordinator latency and use that instead of local for read speculation; fix round-trip of percentile through string
Date Sun, 29 Sep 2013 21:55:22 GMT
Updated Branches:
  refs/heads/cassandra-2.0 c3b7669d0 -> d7bf566ae
  refs/heads/trunk ee4b50ca8 -> e9cfc64b1


track coordinator latency and use that instead of local for read speculation; fix round-trip
of percentile through string


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8693a26e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8693a26e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8693a26e

Branch: refs/heads/cassandra-2.0
Commit: 8693a26e42851b99716317f8c44d571f674fb697
Parents: c3b7669
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Fri Sep 27 17:38:00 2013 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Sun Sep 29 16:38:14 2013 -0500

----------------------------------------------------------------------
 .../org/apache/cassandra/config/CFMetaData.java |  3 ++-
 .../apache/cassandra/db/ColumnFamilyStore.java  | 14 +++++++-----
 .../cassandra/metrics/ColumnFamilyMetrics.java  | 24 ++++++++++++--------
 .../cassandra/metrics/LatencyMetrics.java       | 13 ++++-------
 .../cassandra/service/AbstractReadExecutor.java |  9 ++++----
 .../apache/cassandra/service/StorageProxy.java  | 12 +++++++---
 6 files changed, 42 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8693a26e/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 29df8c3..51865c2 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -345,7 +345,8 @@ public final class CFMetaData
             switch (type)
             {
             case PERCENTILE:
-                return value + "PERCENTILE";
+                // TODO switch to BigDecimal so round-tripping isn't lossy
+                return (value * 100) + "PERCENTILE";
             case CUSTOM:
                 return value + "ms";
             default:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8693a26e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index c3bb81b..719e90f 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -109,7 +109,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     private final AtomicLong liveRatioComputedAt = new AtomicLong(32);
 
     public final ColumnFamilyMetrics metric;
-    public volatile long sampleLatency = Long.MAX_VALUE;
+    public volatile long sampleLatencyNanos;
 
     public void reload()
     {
@@ -244,6 +244,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         this.indexManager = new SecondaryIndexManager(this);
         this.metric = new ColumnFamilyMetrics(this);
         fileIndexGenerator.set(generation);
+        sampleLatencyNanos = DatabaseDescriptor.getReadRpcTimeout() / 2;
 
         Caching caching = metadata.getCaching();
 
@@ -300,19 +301,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 {
                     case PERCENTILE:
                         // get percentile in nanos
-                        assert metric.readLatency.latency.durationUnit() == TimeUnit.MICROSECONDS;
-                        sampleLatency = (long) (metric.readLatency.latency.getSnapshot().getValue(retryPolicy.value)
* 1000d);
+                        assert metric.coordinatorReadLatency.durationUnit() == TimeUnit.MICROSECONDS;
+                        logger.info("retryPolicy is {}", retryPolicy.value);
+                        sampleLatencyNanos = (long) (metric.coordinatorReadLatency.getSnapshot().getValue(retryPolicy.value)
* 1000d);
                         break;
                     case CUSTOM:
                         // convert to nanos, since configuration is in millisecond
-                        sampleLatency = (long) (retryPolicy.value * 1000d * 1000d);
+                        sampleLatencyNanos = (long) (retryPolicy.value * 1000d * 1000d);
                         break;
                     default:
-                        sampleLatency = Long.MAX_VALUE;
+                        sampleLatencyNanos = Long.MAX_VALUE;
                         break;
                 }
             }
-        }, 30, 30, TimeUnit.SECONDS);
+        }, DatabaseDescriptor.getReadRpcTimeout(), DatabaseDescriptor.getReadRpcTimeout(),
TimeUnit.MILLISECONDS);
     }
 
     /** call when dropping or renaming a CF. Performs mbean housekeeping and invalidates
CFS to other operations */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8693a26e/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
index b63bbfb..7265c7b 100644
--- a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
@@ -17,11 +17,10 @@
  */
 package org.apache.cassandra.metrics;
 
+import java.util.concurrent.TimeUnit;
+
 import com.yammer.metrics.Metrics;
-import com.yammer.metrics.core.Counter;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.core.Histogram;
-import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.*;
 import com.yammer.metrics.util.RatioGauge;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -49,9 +48,9 @@ public class ColumnFamilyMetrics
     public final Gauge<long[]> estimatedColumnCountHistogram;
     /** Histogram of the number of sstable data files accessed per read */
     public final Histogram sstablesPerReadHistogram;
-    /** Read metrics */
+    /** (Local) read metrics */
     public final LatencyMetrics readLatency;
-    /** Write metrics */
+    /** (Local) write metrics */
     public final LatencyMetrics writeLatency;
     /** Estimated number of tasks pending for this column family */
     public final Gauge<Integer> pendingTasks;
@@ -84,9 +83,12 @@ public class ColumnFamilyMetrics
     /** Live cells scanned in queries on this CF */
     public final Histogram liveScannedHistogram;
 
+    public final Timer coordinatorReadLatency;
+    public final Timer coordinatorScanLatency;
+
     private final MetricNameFactory factory;
 
-    public final Counter speculativeRetry;
+    public final Counter speculativeRetries;
 
     // for backward compatibility
     @Deprecated public final EstimatedHistogram sstablesPerRead = new EstimatedHistogram(35);
@@ -159,7 +161,7 @@ public class ColumnFamilyMetrics
                         total++;
                     }
                 }
-                return total != 0 ? (double)sum/total: 0;
+                return total != 0 ? (double) sum / total : 0;
             }
         });
         readLatency = new LatencyMetrics(factory, "Read");
@@ -283,7 +285,7 @@ public class ColumnFamilyMetrics
                 return total;
             }
         });
-        speculativeRetry = Metrics.newCounter(factory.createMetricName("SpeculativeRetry"));
+        speculativeRetries = Metrics.newCounter(factory.createMetricName("SpeculativeRetries"));
         keyCacheHitRate = Metrics.newGauge(factory.createMetricName("KeyCacheHitRate"), new
RatioGauge()
         {
             protected double getNumerator()
@@ -304,6 +306,8 @@ public class ColumnFamilyMetrics
         });
         tombstoneScannedHistogram = Metrics.newHistogram(factory.createMetricName("TombstoneScannedHistogram"));
         liveScannedHistogram = Metrics.newHistogram(factory.createMetricName("LiveScannedHistogram"));
+        coordinatorReadLatency = Metrics.newTimer(factory.createMetricName("CoordinatorReadLatency"),
TimeUnit.MICROSECONDS, TimeUnit.SECONDS);
+        coordinatorScanLatency = Metrics.newTimer(factory.createMetricName("CoordinatorScanLatency"),
TimeUnit.MICROSECONDS, TimeUnit.SECONDS);
     }
 
     public void updateSSTableIterated(int count)
@@ -343,6 +347,8 @@ public class ColumnFamilyMetrics
         Metrics.defaultRegistry().removeMetric(factory.createMetricName("SpeculativeRetry"));
         Metrics.defaultRegistry().removeMetric(factory.createMetricName("TombstoneScannedHistogram"));
         Metrics.defaultRegistry().removeMetric(factory.createMetricName("LiveScannedHistogram"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("CoordinatorReadLatency"));
+        Metrics.defaultRegistry().removeMetric(factory.createMetricName("CoordinatorScanLatency"));
     }
 
     class ColumnFamilyMetricNameFactory implements MetricNameFactory

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8693a26e/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/LatencyMetrics.java b/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
index 01ba997..b53449c 100644
--- a/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/LatencyMetrics.java
@@ -85,15 +85,10 @@ public class LatencyMetrics
     public void addNano(long nanos)
     {
         // convert to microseconds. 1 millionth
-        addMicro(nanos / 1000);
-    }
-
-    public void addMicro(long micros)
-    {
-        latency.update(micros, TimeUnit.MICROSECONDS);
-        totalLatency.inc(micros);
-        totalLatencyHistogram.add(micros);
-        recentLatencyHistogram.add(micros);
+        latency.update(nanos, TimeUnit.NANOSECONDS);
+        totalLatency.inc(nanos / 1000);
+        totalLatencyHistogram.add(nanos / 1000);
+        recentLatencyHistogram.add(nanos / 1000);
     }
 
     public void release()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8693a26e/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 280715a..c56975c 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.service;
 
 import java.net.InetAddress;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -265,10 +264,10 @@ public abstract class AbstractReadExecutor
         public void maybeTryAdditionalReplicas()
         {
             // no latency information, or we're overloaded
-            if (cfs.sampleLatency > TimeUnit.MILLISECONDS.toNanos(command.getTimeout()))
+            if (cfs.sampleLatencyNanos > TimeUnit.MILLISECONDS.toNanos(command.getTimeout()))
                 return;
 
-            if (!handler.await(cfs.sampleLatency, TimeUnit.NANOSECONDS))
+            if (!handler.await(cfs.sampleLatencyNanos, TimeUnit.NANOSECONDS))
             {
                 // Could be waiting on the data, or on enough digests.
                 ReadCommand retryCommand = command;
@@ -283,7 +282,7 @@ public abstract class AbstractReadExecutor
                 MessagingService.instance().sendRR(retryCommand.createMessage(), extraReplica,
handler);
                 speculated = true;
 
-                cfs.metric.speculativeRetry.inc();
+                cfs.metric.speculativeRetries.inc();
             }
         }
 
@@ -324,7 +323,7 @@ public abstract class AbstractReadExecutor
             makeDataRequests(targetReplicas.subList(0, targetReplicas.size() > 1 ? 2 :
1));
             if (targetReplicas.size() > 2)
                 makeDigestRequests(targetReplicas.subList(2, targetReplicas.size()));
-            cfs.metric.speculativeRetry.inc();
+            cfs.metric.speculativeRetries.inc();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8693a26e/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index ffc65b9..51f171d 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1181,7 +1181,11 @@ public class StorageProxy implements StorageProxyMBean
         }
         finally
         {
-            readMetrics.addNano(System.nanoTime() - start);
+            long latency = System.nanoTime() - start;
+            readMetrics.addNano(latency);
+            // TODO avoid giving every command the same latency number.  Can fix this in
CASSADRA-5329
+            for (ReadCommand command : commands)
+                Keyspace.open(command.ksName).getColumnFamilyStore(command.cfName).metric.coordinatorReadLatency.update(latency,
TimeUnit.NANOSECONDS);
         }
         return rows;
     }
@@ -1560,7 +1564,9 @@ public class StorageProxy implements StorageProxyMBean
         }
         finally
         {
-            rangeMetrics.addNano(System.nanoTime() - startTime);
+            long latency = System.nanoTime() - startTime;
+            rangeMetrics.addNano(latency);
+            Keyspace.open(command.keyspace).getColumnFamilyStore(command.columnFamily).metric.coordinatorScanLatency.update(latency,
TimeUnit.NANOSECONDS);
         }
         return trim(command, rows);
     }
@@ -1576,7 +1582,7 @@ public class StorageProxy implements StorageProxyMBean
 
     public Map<String, List<String>> getSchemaVersions()
     {
-        return this.describeSchemaVersions();
+        return describeSchemaVersions();
     }
 
     /**


Mime
View raw message