cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aweisb...@apache.org
Subject cassandra git commit: Add additional statistics for speculative retry.
Date Mon, 03 Apr 2017 22:31:34 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk ab7aa57b3 -> 2c6924b56


Add additional statistics for speculative retry.

Patch by Ariel Weisberg; Reviewed by Blake Eggleston for CASSANDRA-13373


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2c6924b5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2c6924b5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2c6924b5

Branch: refs/heads/trunk
Commit: 2c6924b561ddf0b0df9315946b21260d6e27fdb9
Parents: ab7aa57
Author: Ariel Weisberg <aweisberg@apple.com>
Authored: Thu Mar 23 19:25:54 2017 -0400
Committer: Ariel Weisberg <aweisberg@apple.com>
Committed: Mon Apr 3 18:24:02 2017 -0400

----------------------------------------------------------------------
 doc/source/operating/metrics.rst                |   3 +
 .../cassandra/metrics/KeyspaceMetrics.java      |  52 +++++
 .../apache/cassandra/metrics/TableMetrics.java  |  12 ++
 .../cassandra/service/AbstractReadExecutor.java | 102 ++++++---
 .../apache/cassandra/service/ReadCallback.java  |   2 +-
 .../cassandra/service/ReadExecutorTest.java     | 215 +++++++++++++++++++
 6 files changed, 360 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6924b5/doc/source/operating/metrics.rst
----------------------------------------------------------------------
diff --git a/doc/source/operating/metrics.rst b/doc/source/operating/metrics.rst
index af2e36e..6e1b212 100644
--- a/doc/source/operating/metrics.rst
+++ b/doc/source/operating/metrics.rst
@@ -127,6 +127,9 @@ CasPropose                              Latency        Latency of paxos
propose
 CasCommit                               Latency        Latency of paxos commit round.
 PercentRepaired                         Gauge<Double>  Percent of table data that is
repaired on disk.
 SpeculativeRetries                      Counter        Number of times speculative retries
were sent for this table.
+SpeculativeFailedRetries                Counter        Number of speculative retries that
failed to prevent a timeout
+SpeculativeInsufficientReplicas         Counter        Number of speculative retries that
couldn't be attempted due to lack of replicas
+SpeculativeSampleLatencyNanos           Gauge<Long>    Number of nanoseconds to wait
before speculation is attempted. Value may be statically configured or updated periodically
based on coordinator latency.
 WaitingOnFreeMemtableSpace              Histogram      Histogram of time spent waiting for
free memtable space, either on- or off-heap.
 DroppedMutations                        Counter        Number of dropped mutations on this
table.
 ======================================= ============== ===========

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6924b5/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index 63f8dd0..3c6b604 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -97,6 +97,12 @@ public class KeyspaceMetrics
     public final Counter writeFailedIdealCL;
     /** Ideal CL write latency metrics */
     public final LatencyMetrics idealCLWriteLatency;
+    /** Speculative retries **/
+    public final Counter speculativeRetries;
+    /** Speculative retry occured but still timed out **/
+    public final Counter speculativeFailedRetries;
+    /** Needed to speculate, but didn't have enough replicas **/
+    public final Counter speculativeInsufficientReplicas;
 
     public final MetricNameFactory factory;
     private Keyspace keyspace;
@@ -244,6 +250,28 @@ public class KeyspaceMetrics
         casCommit = new LatencyMetrics(factory, "CasCommit");
         writeFailedIdealCL = Metrics.counter(factory.createMetricName("WriteFailedIdealCL"));
         idealCLWriteLatency = new LatencyMetrics(factory, "IdealCLWrite");
+
+        speculativeRetries = createKeyspaceCounter("SpeculativeRetries", new MetricValue()
+        {
+            public Long getValue(TableMetrics metric)
+            {
+                return metric.speculativeRetries.getCount();
+            }
+        });
+        speculativeFailedRetries = createKeyspaceCounter("SpeculativeFailedRetries", new
MetricValue()
+        {
+            public Long getValue(TableMetrics metric)
+            {
+                return metric.speculativeFailedRetries.getCount();
+            }
+        });
+        speculativeInsufficientReplicas = createKeyspaceCounter("SpeculativeInsufficientReplicas",
new MetricValue()
+        {
+            public Long getValue(TableMetrics metric)
+            {
+                return metric.speculativeInsufficientReplicas.getCount();
+            }
+        });
     }
 
     /**
@@ -298,6 +326,30 @@ public class KeyspaceMetrics
         });
     }
 
+    /**
+     * Creates a counter that will sum the current value of a metric for all column families
in this keyspace
+     * @param name
+     * @param extractor
+     * @return Counter that computes sum of MetricValue.getValue()
+     */
+    private Counter createKeyspaceCounter(String name, final MetricValue extractor)
+    {
+        allMetrics.add(name);
+        return Metrics.register(factory.createMetricName(name), new Counter()
+        {
+            @Override
+            public long getCount()
+            {
+                long sum = 0;
+                for (ColumnFamilyStore cf : keyspace.getColumnFamilyStores())
+                {
+                    sum += extractor.getValue(cf.metric);
+                }
+                return sum;
+            }
+        });
+    }
+
     static class KeyspaceMetricNameFactory implements MetricNameFactory
     {
         private final String keyspaceName;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6924b5/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 126abed..c4b0000 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -162,6 +162,9 @@ public class TableMetrics
     private static final MetricNameFactory globalAliasFactory = new AllTableMetricNameFactory("ColumnFamily");
 
     public final Counter speculativeRetries;
+    public final Counter speculativeFailedRetries;
+    public final Counter speculativeInsufficientReplicas;
+    public final Gauge<Long> speculativeSampleLatencyNanos;
 
     public final static LatencyMetrics globalReadLatency = new LatencyMetrics(globalFactory,
globalAliasFactory, "Read");
     public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory,
globalAliasFactory, "Write");
@@ -633,6 +636,15 @@ public class TableMetrics
             }
         });
         speculativeRetries = createTableCounter("SpeculativeRetries");
+        speculativeFailedRetries = createTableCounter("SpeculativeFailedRetries");
+        speculativeInsufficientReplicas = createTableCounter("SpeculativeInsufficientReplicas");
+        speculativeSampleLatencyNanos = createTableGauge("SpeculativeSampleLatencyNanos",
new Gauge<Long>()
+        {
+            public Long getValue()
+            {
+                return cfs.sampleLatencyNanos;
+            }
+        });
         keyCacheHitRate = Metrics.register(factory.createMetricName("KeyCacheHitRate"),
                                            aliasFactory.createMetricName("KeyCacheHitRate"),
                                            new RatioGauge()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6924b5/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 7a82187..956a40a 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -63,12 +63,14 @@ public abstract class AbstractReadExecutor
     protected final List<InetAddress> targetReplicas;
     protected final ReadCallback handler;
     protected final TraceState traceState;
+    protected final ColumnFamilyStore cfs;
 
-    AbstractReadExecutor(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistencyLevel,
List<InetAddress> targetReplicas, long queryStartNanoTime)
+    AbstractReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel
consistencyLevel, List<InetAddress> targetReplicas, long queryStartNanoTime)
     {
         this.command = command;
         this.targetReplicas = targetReplicas;
         this.handler = new ReadCallback(new DigestResolver(keyspace, command, consistencyLevel,
targetReplicas.size()), consistencyLevel, command, targetReplicas, queryStartNanoTime);
+        this.cfs = cfs;
         this.traceState = Tracing.instance.get();
 
         // Set the digest version (if we request some digests). This is the smallest version
amongst all our target replicas since new nodes
@@ -143,7 +145,21 @@ public abstract class AbstractReadExecutor
      */
     public PartitionIterator get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException
     {
-        return handler.get();
+        try
+        {
+            return handler.get();
+        }
+        catch (ReadTimeoutException e)
+        {
+            try
+            {
+                onReadTimeout();
+            }
+            finally
+            {
+                throw e;
+            }
+        }
     }
 
     private static ReadRepairDecision newReadRepairDecision(TableMetadata metadata)
@@ -187,12 +203,16 @@ public abstract class AbstractReadExecutor
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id);
         SpeculativeRetryParam retry = cfs.metadata().params.speculativeRetry;
 
-        // Speculative retry is disabled *OR* there are simply no extra replicas to speculate.
+        // Speculative retry is disabled *OR*
         // 11980: Disable speculative retry if using EACH_QUORUM in order to prevent miscounting
DC responses
         if (retry.equals(SpeculativeRetryParam.NONE)
-            || consistencyLevel == ConsistencyLevel.EACH_QUORUM
-            || consistencyLevel.blockFor(keyspace) == allReplicas.size())
-            return new NeverSpeculatingReadExecutor(keyspace, command, consistencyLevel,
targetReplicas, queryStartNanoTime);
+            | consistencyLevel == ConsistencyLevel.EACH_QUORUM)
+            return new NeverSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel,
targetReplicas, queryStartNanoTime, false);
+
+        // There are simply no extra replicas to speculate.
+        // Handle this separately so it can log failed attempts to speculate due to lack
of replicas
+        if (consistencyLevel.blockFor(keyspace) == allReplicas.size())
+            return new NeverSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel,
targetReplicas, queryStartNanoTime, true);
 
         if (targetReplicas.size() == allReplicas.size())
         {
@@ -225,11 +245,34 @@ public abstract class AbstractReadExecutor
             return new SpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel,
targetReplicas, queryStartNanoTime);
     }
 
+    /**
+     *  Returns true if speculation should occur and if it should then block until it is
time to
+     *  send the speculative reads
+     */
+    boolean shouldSpeculateAndMaybeWait()
+    {
+        // no latency information, or we're overloaded
+        if (cfs.sampleLatencyNanos > TimeUnit.MILLISECONDS.toNanos(command.getTimeout()))
+            return false;
+
+        return !handler.await(cfs.sampleLatencyNanos, TimeUnit.NANOSECONDS);
+    }
+
+    void onReadTimeout() {}
+
     public static class NeverSpeculatingReadExecutor extends AbstractReadExecutor
     {
-        public NeverSpeculatingReadExecutor(Keyspace keyspace, ReadCommand command, ConsistencyLevel
consistencyLevel, List<InetAddress> targetReplicas, long queryStartNanoTime)
+        /**
+         * If never speculating due to lack of replicas
+         * log it is as a failure if it should have happened
+         * but couldn't due to lack of replicas
+         */
+        private final boolean logFailedSpeculation;
+
+        public NeverSpeculatingReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand
command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas, long queryStartNanoTime,
boolean logFailedSpeculation)
         {
-            super(keyspace, command, consistencyLevel, targetReplicas, queryStartNanoTime);
+            super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
+            this.logFailedSpeculation = logFailedSpeculation;
         }
 
         public void executeAsync()
@@ -241,7 +284,10 @@ public abstract class AbstractReadExecutor
 
         public void maybeTryAdditionalReplicas()
         {
-            // no-op
+            if (shouldSpeculateAndMaybeWait() && logFailedSpeculation)
+            {
+                cfs.metric.speculativeInsufficientReplicas.inc();
+            }
         }
 
         public Collection<InetAddress> getContactedReplicas()
@@ -250,9 +296,8 @@ public abstract class AbstractReadExecutor
         }
     }
 
-    private static class SpeculatingReadExecutor extends AbstractReadExecutor
+    static class SpeculatingReadExecutor extends AbstractReadExecutor
     {
-        private final ColumnFamilyStore cfs;
         private volatile boolean speculated = false;
 
         public SpeculatingReadExecutor(Keyspace keyspace,
@@ -262,8 +307,7 @@ public abstract class AbstractReadExecutor
                                        List<InetAddress> targetReplicas,
                                        long queryStartNanoTime)
         {
-            super(keyspace, command, consistencyLevel, targetReplicas, queryStartNanoTime);
-            this.cfs = cfs;
+            super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
         }
 
         public void executeAsync()
@@ -293,12 +337,11 @@ public abstract class AbstractReadExecutor
 
         public void maybeTryAdditionalReplicas()
         {
-            // no latency information, or we're overloaded
-            if (cfs.sampleLatencyNanos > TimeUnit.MILLISECONDS.toNanos(command.getTimeout()))
-                return;
-
-            if (!handler.await(cfs.sampleLatencyNanos, TimeUnit.NANOSECONDS))
+            if (shouldSpeculateAndMaybeWait())
             {
+                //Handle speculation stats first in case the callback fires immediately
+                speculated = true;
+                cfs.metric.speculativeRetries.inc();
                 // Could be waiting on the data, or on enough digests.
                 ReadCommand retryCommand = command;
                 if (handler.resolver.isDataPresent())
@@ -309,9 +352,6 @@ public abstract class AbstractReadExecutor
                     traceState.trace("speculating read retry on {}", extraReplica);
                 logger.trace("speculating read retry on {}", extraReplica);
                 MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(),
extraReplica, handler);
-                speculated = true;
-
-                cfs.metric.speculativeRetries.inc();
             }
         }
 
@@ -321,12 +361,19 @@ public abstract class AbstractReadExecutor
                  ? targetReplicas
                  : targetReplicas.subList(0, targetReplicas.size() - 1);
         }
+
+        @Override
+        void onReadTimeout()
+        {
+            //Shouldn't be possible to get here without first attempting to speculate even
if the
+            //timing is bad
+            assert speculated;
+            cfs.metric.speculativeFailedRetries.inc();
+        }
     }
 
     private static class AlwaysSpeculatingReadExecutor extends AbstractReadExecutor
     {
-        private final ColumnFamilyStore cfs;
-
         public AlwaysSpeculatingReadExecutor(Keyspace keyspace,
                                              ColumnFamilyStore cfs,
                                              ReadCommand command,
@@ -334,8 +381,7 @@ public abstract class AbstractReadExecutor
                                              List<InetAddress> targetReplicas,
                                              long queryStartNanoTime)
         {
-            super(keyspace, command, consistencyLevel, targetReplicas, queryStartNanoTime);
-            this.cfs = cfs;
+            super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
         }
 
         public void maybeTryAdditionalReplicas()
@@ -356,5 +402,11 @@ public abstract class AbstractReadExecutor
                 makeDigestRequests(targetReplicas.subList(2, targetReplicas.size()));
             cfs.metric.speculativeRetries.inc();
         }
+
+        @Override
+        void onReadTimeout()
+        {
+            cfs.metric.speculativeFailedRetries.inc();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6924b5/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 2e75e1f..7ee6386 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -53,7 +53,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
     protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
 
     public final ResponseResolver resolver;
-    private final SimpleCondition condition = new SimpleCondition();
+    final SimpleCondition condition = new SimpleCondition();
     private final long queryStartNanoTime;
     final int blockfor;
     final List<InetAddress> endpoints;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6924b5/test/unit/org/apache/cassandra/service/ReadExecutorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ReadExecutorTest.java b/test/unit/org/apache/cassandra/service/ReadExecutorTest.java
new file mode 100644
index 0000000..fca8eca
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/ReadExecutorTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.exceptions.ReadFailureException;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ReadExecutorTest
+{
+    static Keyspace ks;
+    static ColumnFamilyStore cfs;
+    static List<InetAddress> targets;
+
+    @BeforeClass
+    public static void setUpClass() throws Throwable
+    {
+        SchemaLoader.loadSchema();
+        SchemaLoader.createKeyspace("Foo", KeyspaceParams.simple(3), SchemaLoader.standardCFMD("Foo",
"Bar"));
+        ks = Keyspace.open("Foo");
+        cfs = ks.getColumnFamilyStore("Bar");
+        targets = ImmutableList.of(InetAddress.getByName("127.0.0.255"), InetAddress.getByName("127.0.0.254"),
InetAddress.getByName("127.0.0.253"));
+        cfs.sampleLatencyNanos = 0;
+    }
+
+    @Before
+    public void resetCounters() throws Throwable
+    {
+        cfs.metric.speculativeInsufficientReplicas.dec(cfs.metric.speculativeInsufficientReplicas.getCount());
+        cfs.metric.speculativeRetries.dec(cfs.metric.speculativeRetries.getCount());
+        cfs.metric.speculativeFailedRetries.dec(cfs.metric.speculativeFailedRetries.getCount());
+    }
+
+    /**
+     * If speculation would have been beneficial but could not be attempted due to lack of
replicas
+     * count that it occured
+     */
+    @Test
+    public void testUnableToSpeculate() throws Throwable
+    {
+        assertEquals(0, cfs.metric.speculativeInsufficientReplicas.getCount());
+        assertEquals(0, ks.metric.speculativeInsufficientReplicas.getCount());
+        AbstractReadExecutor executor = new AbstractReadExecutor.NeverSpeculatingReadExecutor(ks,
cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime(),
true);
+        executor.maybeTryAdditionalReplicas();
+        try
+        {
+            executor.get();
+            fail();
+        }
+        catch (ReadTimeoutException e)
+        {
+            //expected
+        }
+        assertEquals(1, cfs.metric.speculativeInsufficientReplicas.getCount());
+        assertEquals(1, ks.metric.speculativeInsufficientReplicas.getCount());
+
+        //Shouldn't increment
+        executor = new AbstractReadExecutor.NeverSpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(),
ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime(), false);
+        executor.maybeTryAdditionalReplicas();
+        try
+        {
+            executor.get();
+            fail();
+        }
+        catch (ReadTimeoutException e)
+        {
+            //expected
+        }
+        assertEquals(1, cfs.metric.speculativeInsufficientReplicas.getCount());
+        assertEquals(1, ks.metric.speculativeInsufficientReplicas.getCount());
+    }
+
+    /**
+     *  Test that speculation when it is attempted is countedc, and when it succeed
+     *  no failure is counted.
+     */
+    @Test
+    public void testSpeculateSucceeded() throws Throwable
+    {
+        assertEquals(0, cfs.metric.speculativeRetries.getCount());
+        assertEquals(0, cfs.metric.speculativeFailedRetries.getCount());
+        assertEquals(0, ks.metric.speculativeRetries.getCount());
+        assertEquals(0, ks.metric.speculativeFailedRetries.getCount());
+        AbstractReadExecutor executor = new AbstractReadExecutor.SpeculatingReadExecutor(ks,
cfs, new MockSinglePartitionReadCommand(TimeUnit.DAYS.toMillis(365)), ConsistencyLevel.LOCAL_QUORUM,
targets, System.nanoTime());
+        executor.maybeTryAdditionalReplicas();
+        new Thread()
+        {
+            @Override
+            public void run()
+            {
+                //Failures end the read promptly but don't require mock data to be suppleid
+                executor.handler.onFailure(targets.get(0), RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
+                executor.handler.onFailure(targets.get(1), RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
+                executor.handler.condition.signalAll();
+            }
+        }.start();
+
+        try
+        {
+            executor.get();
+            fail();
+        }
+        catch (ReadFailureException e)
+        {
+            //expected
+        }
+        assertEquals(1, cfs.metric.speculativeRetries.getCount());
+        assertEquals(0, cfs.metric.speculativeFailedRetries.getCount());
+        assertEquals(1, ks.metric.speculativeRetries.getCount());
+        assertEquals(0, ks.metric.speculativeFailedRetries.getCount());
+
+    }
+
+    /**
+     * Test that speculation failure statistics are incremented if speculation occurs
+     * and the read still times out.
+     */
+    @Test
+    public void testSpeculateFailed() throws Throwable
+    {
+        assertEquals(0, cfs.metric.speculativeRetries.getCount());
+        assertEquals(0, cfs.metric.speculativeFailedRetries.getCount());
+        assertEquals(0, ks.metric.speculativeRetries.getCount());
+        assertEquals(0, ks.metric.speculativeFailedRetries.getCount());
+        AbstractReadExecutor executor = new AbstractReadExecutor.SpeculatingReadExecutor(ks,
cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime());
+        executor.maybeTryAdditionalReplicas();
+        try
+        {
+            executor.get();
+            fail();
+        }
+        catch (ReadTimeoutException e)
+        {
+            //expected
+        }
+        assertEquals(1, cfs.metric.speculativeRetries.getCount());
+        assertEquals(1, cfs.metric.speculativeFailedRetries.getCount());
+        assertEquals(1, ks.metric.speculativeRetries.getCount());
+        assertEquals(1, ks.metric.speculativeFailedRetries.getCount());
+    }
+
+    public static class MockSinglePartitionReadCommand extends SinglePartitionReadCommand
+    {
+        private final long timeout;
+
+        MockSinglePartitionReadCommand()
+        {
+            this(0);
+        }
+
+        MockSinglePartitionReadCommand(long timeout)
+        {
+            super(false, 0, cfs.metadata(), 0, null, null, null, Util.dk("ry@n_luvs_teh_y@nk33z"),
null);
+            this.timeout = timeout;
+        }
+
+        @Override
+        public long getTimeout()
+        {
+            return timeout;
+        }
+
+        @Override
+        public MessageOut createMessage()
+        {
+            return new MessageOut(MessagingService.Verb.BATCH_REMOVE)
+            {
+                @Override
+                public int serializedSize(int version)
+                {
+                    return 0;
+                }
+            };
+        }
+
+    }
+
+}


Mime
View raw message