cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject git commit: add PBSPredictor consistency modeler patch by Peter Bailis and Shivaram Venkataraman; reviewed by jbellis for CASSANDRA-4261
Date Thu, 27 Sep 2012 20:31:06 GMT
Updated Branches:
  refs/heads/trunk 6ca75ef9b -> 0b94b191d


add PBSPredictor consistency modeler
patch by Peter Bailis and Shivaram Venkataraman; reviewed by jbellis for CASSANDRA-4261


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0b94b191
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0b94b191
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0b94b191

Branch: refs/heads/trunk
Commit: 0b94b191d803f2a59e39c0e14fca45f5fb2ceb65
Parents: 6ca75ef
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Thu Sep 27 15:28:25 2012 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Thu Sep 27 15:28:25 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 build.xml                                          |    5 +
 .../org/apache/cassandra/net/MessagingService.java |   34 +-
 .../cassandra/service/PBSPredictionResult.java     |  127 +++
 .../org/apache/cassandra/service/PBSPredictor.java |  636 +++++++++++++++
 .../cassandra/service/PBSPredictorMBean.java       |   35 +
 .../apache/cassandra/service/StorageService.java   |    2 +
 src/java/org/apache/cassandra/tools/NodeCmd.java   |   65 ++-
 src/java/org/apache/cassandra/tools/NodeProbe.java |    8 +
 .../org/apache/cassandra/tools/NodeToolHelp.yaml   |    4 +-
 .../apache/cassandra/service/PBSPredictorTest.java |  114 +++
 11 files changed, 1023 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b94b191/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6d044af..fa8b8cb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2-beta2
+ * add PBSPredictor consistency modeler (CASSANDRA-4261)
  * remove vestiges of Thrift unframed mode (CASSANDRA-4729)
  * optimize single-row PK lookups (CASSANDRA-4710)
  * adjust blockFor calculation to account for pending ranges due to node 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b94b191/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 1477a4a..7c288cc 100644
--- a/build.xml
+++ b/build.xml
@@ -1134,6 +1134,11 @@
     </testmacro>
   </target>
 
+  <target name="pbs-test" depends="build-test" description="Tests PBS predictor">
+    <testmacro suitename="unit" inputdir="${test.unit.src}"
+      timeout="15000" filter="**/PBSPredictorTest.java"/>
+  </target>
+
   <target name="long-test" depends="build-test" description="Execute functional tests">
     <testmacro suitename="long" inputdir="${test.long.src}"
                timeout="${test.long.timeout}">

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b94b191/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index a2c5939..63d6e38 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -36,18 +36,17 @@ import javax.management.ObjectName;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.dht.BootStrapper;
+import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.GossipDigestAck;
 import org.apache.cassandra.gms.GossipDigestAck2;
 import org.apache.cassandra.gms.GossipDigestSyn;
@@ -58,14 +57,12 @@ import org.apache.cassandra.metrics.ConnectionMetrics;
 import org.apache.cassandra.metrics.DroppedMessageMetrics;
 import org.apache.cassandra.net.sink.SinkManager;
 import org.apache.cassandra.security.SSLFactory;
-import org.apache.cassandra.service.AntiEntropyService;
-import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.*;
 import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.streaming.compress.CompressedFileStreamTask;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 public final class MessagingService implements MessagingServiceMBean
 {
@@ -555,6 +552,16 @@ public final class MessagingService implements MessagingServiceMBean
     public String sendRR(MessageOut message, InetAddress to, IMessageCallback cb, long timeout)
     {
         String id = addCallback(cb, message, to, timeout);
+
+        if (cb instanceof AbstractWriteResponseHandler)
+        {
+            PBSPredictor.instance().startWriteOperation(id);
+        }
+        else if (cb instanceof ReadCallback)
+        {
+            PBSPredictor.instance().startReadOperation(id);
+        }
+
         sendOneWay(message, id, to);
         return id;
     }
@@ -695,6 +702,21 @@ public final class MessagingService implements MessagingServiceMBean
         Runnable runnable = new MessageDeliveryTask(message, id, timestamp);
         ExecutorService stage = StageManager.getStage(message.getMessageType());
         assert stage != null : "No stage for message type " + message.verb;
+
+        if (message.verb == Verb.REQUEST_RESPONSE && PBSPredictor.instance().isLoggingEnabled())
+        {
+            IMessageCallback cb = MessagingService.instance().getRegisteredCallback(id).callback;
+
+            if (cb instanceof AbstractWriteResponseHandler)
+            {
+                PBSPredictor.instance().logWriteResponse(id, timestamp);
+            }
+            else if (cb instanceof ReadCallback)
+            {
+                PBSPredictor.instance().logReadResponse(id, timestamp);
+            }
+        }
+
         stage.execute(runnable);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b94b191/src/java/org/apache/cassandra/service/PBSPredictionResult.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/PBSPredictionResult.java b/src/java/org/apache/cassandra/service/PBSPredictionResult.java
new file mode 100644
index 0000000..92c5491
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/PBSPredictionResult.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.Serializable;
+
+public class PBSPredictionResult implements Serializable
+{
+    private int n;
+    private int r;
+    private int w;
+
+    private float timeSinceWrite;
+    private int numberVersionsStale;
+
+    private float consistencyProbability;
+
+    private float averageReadLatency;
+    private float averageWriteLatency;
+    private long percentileReadLatencyValue;
+    private float percentileReadLatencyPercentile;
+    private long percentileWriteLatencyValue;
+    private float percentileWriteLatencyPercentile;
+
+    public PBSPredictionResult(int n,
+                               int r,
+                               int w,
+                               float timeSinceWrite,
+                               int numberVersionsStale,
+                               float consistencyProbability,
+                               float averageReadLatency,
+                               float averageWriteLatency,
+                               long percentileReadLatencyValue,
+                               float percentileReadLatencyPercentile,
+                               long percentileWriteLatencyValue,
+                               float percentileWriteLatencyPercentile) {
+        this.n = n;
+        this.r = r;
+        this.w = w;
+        this.timeSinceWrite = timeSinceWrite;
+        this.numberVersionsStale = numberVersionsStale;
+        this.consistencyProbability = consistencyProbability;
+        this.averageReadLatency = averageReadLatency;
+        this.averageWriteLatency = averageWriteLatency;
+        this.percentileReadLatencyValue = percentileReadLatencyValue;
+        this.percentileReadLatencyPercentile = percentileReadLatencyPercentile;
+        this.percentileWriteLatencyValue = percentileWriteLatencyValue;
+        this.percentileWriteLatencyPercentile = percentileWriteLatencyPercentile;
+    }
+
+    public int getN()
+    {
+        return n;
+    }
+
+    public int getR()
+    {
+        return r;
+    }
+
+    public int getW()
+    {
+        return w;
+    }
+
+    public float getTimeSinceWrite()
+    {
+        return timeSinceWrite;
+    }
+
+    public int getNumberVersionsStale()
+    {
+        return numberVersionsStale;
+    }
+
+    public float getConsistencyProbability()
+    {
+        return consistencyProbability;
+    }
+
+    public float getAverageReadLatency()
+    {
+        return averageReadLatency;
+    }
+
+    public float getAverageWriteLatency()
+    {
+        return averageWriteLatency;
+    }
+
+    public long getPercentileReadLatencyValue()
+    {
+        return percentileReadLatencyValue;
+    }
+
+    public float getPercentileReadLatencyPercentile()
+    {
+        return percentileReadLatencyPercentile;
+    }
+
+    public long getPercentileWriteLatencyValue()
+    {
+        return percentileWriteLatencyValue;
+    }
+
+    public float getPercentileWriteLatencyPercentile()
+    {
+        return percentileWriteLatencyPercentile;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b94b191/src/java/org/apache/cassandra/service/PBSPredictor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/PBSPredictor.java b/src/java/org/apache/cassandra/service/PBSPredictor.java
new file mode 100644
index 0000000..4bd6381
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/PBSPredictor.java
@@ -0,0 +1,636 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import org.apache.cassandra.net.MessageIn;
+
+import java.lang.management.ManagementFactory;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import com.google.common.primitives.Longs;
+
+import org.apache.cassandra.thrift.InvalidRequestException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Performs latency and consistency predictions as described in
+ * <a href="http://arxiv.org/pdf/1204.6082.pdf">
+ * "Probabilistically Bounded Staleness for Practical Partial Quorums"</a>
+ * by Bailis et al. in VLDB 2012. The predictions are of the form:
+ * <p/>
+ * <i>With ReplicationFactor <tt>N</tt>, read consistency level of
+ * <tt>R</tt>, and write consistency level <tt>W</tt>, after
+ * <tt>t</tt> seconds, <tt>p</tt>% of reads will return a version
+ * within <tt>k</tt> versions of the last written; this should result
+ * in a latency of <tt>L</tt> ms.</i>
+ * <p/>
+ * <p/>
+ * These predictions should be used as a rough guideline for system
+ * operators. This interface is exposed through nodetool.
+ * <p/>
+ * <p/>
+ * The class accomplishes this by measuring latencies for reads and
+ * writes, then using Monte Carlo simulation to predict behavior under
+ * a given N,R, and W based on those latencies.
+ * <p/>
+ * <p/>
+ * We capture four distributions:
+ * <p/>
+ * <ul>
+ * <li>
+ * <tt>W</tt>: time from when the coordinator sends a mutation to the time
+ * that a replica begins to serve the new value(s)
+ * </li>
+ * <p/>
+ * <li>
+ * <tt>A</tt>: time from when a replica accepting a mutation sends an
+ * acknowledgment to the time the coordinator hears of it
+ * </li>
+ * <p/>
+ * <li>
+ * <tt>R</tt>: time from when the coordinator sends a read request to the time
+ * that the replica performs the read
+ * </li>
+ * <p/>
+ * <li>
+ * <tt>S</tt>: time from when the replica sends a read response to the time
+ * when the coordinator receives it
+ * </li>
+ * </ul>
+ * <p/>
+ * <tt>A</tt> and <tt>S</tt> are mostly network-bound, while W and R
+ * depend on both the network and local processing time.
+ * <p/>
+ * <p/>
+ * <b>Caveats:</b>
+ * Prediction is only as good as the latencies collected. Accurate
+ * prediction requires synchronizing clocks between replicas.  We
+ * collect a running sample of latencies, but, if latencies change
+ * dramatically, predictions will be off.
+ * <p/>
+ * <p/>
+ * The predictions are conservative, or worst-case, meaning we may
+ * predict more staleness than in practice in the following ways:
+ * <ul>
+ * <li>
+ * We do not account for read repair.
+ * </li>
+ * <li>
+ * We do not account for Merkle tree exchange.
+ * </li>
+ * <li>
+ * Multi-version staleness is particularly conservative.
+ * </li>
+ * <li>
+ * We simulate non-local reads and writes. We assume that the
+ * coordinating Cassandra node is not itself a replica for a given key.
+ * </li>
+ * </ul>
+ * <p/>
+ * <p/>
+ * The predictions are optimistic in the following ways:
+ * <ul>
+ * <li>
+ * We do not predict the impact of node failure.
+ * </li>
+ * <li>
+ * We do not model hinted handoff.
+ * </li>
+ * </ul>
+ *
+ * @see org.apache.cassandra.thrift.ConsistencyLevel
+ * @see org.apache.cassandra.locator.AbstractReplicationStrategy
+ */
+
+public class PBSPredictor implements PBSPredictorMBean
+{
+    private static final Logger logger = LoggerFactory.getLogger(PBSPredictor.class);
+
+    public static final String MBEAN_NAME = "org.apache.cassandra.service:type=PBSPredictor";
+    private static final boolean DEFAULT_DO_LOG_LATENCIES = false;
+    private static final int DEFAULT_MAX_LOGGED_LATENCIES = 10000;
+    private static final int DEFAULT_NUMBER_TRIALS_PREDICTION = 10000;
+
+    /*
+     * We record a fixed size set of WARS latencies for read and
+     * mutation operations.  We store the order in which each
+     * operation arrived, and use an LRU policy to evict old
+     * messages.
+     *
+     * This information is stored as a mapping from messageIDs to
+     * latencies.
+     */
+
+    /*
+     * Helper class which minimizes the number of HashMaps we maintain.
+     * For a given messageId, this class maintains the startTime of the message,
+     * and a queue for send times and reply times.
+     *
+     * sendLats corresponds to W and R, while replyLats is used for A and S.
+     */
+    private class MessageLatencyCollection
+    {
+        MessageLatencyCollection(Long startTime)
+        {
+            this.startTime = startTime;
+            this.sendLats = new ConcurrentLinkedQueue<Long>();
+            this.replyLats = new ConcurrentLinkedQueue<Long>();
+        }
+
+        void addSendLat(Long sendLat)
+        {
+            sendLats.add(sendLat);
+        }
+
+        void addReplyLat(Long replyLat)
+        {
+            replyLats.add(replyLat);
+        }
+
+        Collection<Long> getSendLats()
+        {
+            return sendLats;
+        }
+
+        Collection<Long> getReplyLats()
+        {
+            return replyLats;
+        }
+
+        Long getStartTime()
+        {
+            return startTime;
+        }
+
+        Long startTime;
+        Collection<Long> sendLats;
+        Collection<Long> replyLats;
+    }
+
+    // used for LRU replacement
+    private final Queue<String> writeMessageIds = new LinkedBlockingQueue<String>();
+    private final Queue<String> readMessageIds = new LinkedBlockingQueue<String>();
+
+    private final Map<String, MessageLatencyCollection> messageIdToWriteLats = new ConcurrentHashMap<String, MessageLatencyCollection>();
+    private final Map<String, MessageLatencyCollection> messageIdToReadLats = new ConcurrentHashMap<String, MessageLatencyCollection>();
+
+    private Random random;
+    private boolean initialized = false;
+
+    private boolean logLatencies = DEFAULT_DO_LOG_LATENCIES;
+    private int maxLoggedLatencies = DEFAULT_MAX_LOGGED_LATENCIES;
+    private int numberTrialsPrediction = DEFAULT_NUMBER_TRIALS_PREDICTION;
+
+    private static final PBSPredictor instance = new PBSPredictor();
+
+    public static PBSPredictor instance()
+    {
+        return instance;
+    }
+
+    private PBSPredictor()
+    {
+        init();
+    }
+
+    public void enableConsistencyPredictionLogging()
+    {
+        logLatencies = true;
+    }
+
+    public void disableConsistencyPredictionLogging()
+    {
+        logLatencies = false;
+    }
+
+    public boolean isLoggingEnabled()
+    {
+        return logLatencies;
+    }
+
+    public void setMaxLoggedLatenciesForConsistencyPrediction(int maxLogged)
+    {
+        maxLoggedLatencies = maxLogged;
+    }
+
+    public void setNumberTrialsForConsistencyPrediction(int numTrials)
+    {
+        numberTrialsPrediction = numTrials;
+    }
+
+    public void init()
+    {
+        if (!initialized)
+        {
+            random = new Random();
+
+            MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+            try
+            {
+                mbs.registerMBean(this, new ObjectName(PBSPredictor.MBEAN_NAME));
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+            initialized = true;
+        }
+    }
+
+
+    // used for random sampling from the latencies
+    private long getRandomElement(List<Long> list)
+    {
+        if (list.size() == 0)
+            throw new RuntimeException("Not enough data for prediction");
+        return list.get(random.nextInt(list.size()));
+    }
+
+    // used for calculating the average latency of a read or write operation
+    // given a set of simulated latencies
+    private float listAverage(List<Long> list)
+    {
+        long accum = 0;
+        for (long value : list)
+            accum += value;
+        return (float) accum / list.size();
+    }
+
+    // calculate the percentile entry of a list
+    private long getPercentile(List<Long> list, float percentile)
+    {
+        Collections.sort(list);
+        return list.get((int) (list.size() * percentile));
+    }
+
+    /* 
+     * For our trials, sample the latency for the (replicaNumber)th
+     * reply for one of WARS
+     * if replicaNumber > the number of replicas we have data for
+     *   (say we have data for ReplicationFactor 2 but ask for N=3)
+     * then we randomly sample from all response times
+     */
+    private long getRandomLatencySample(Map<Integer, List<Long>> samples, int replicaNumber)
+    {
+        if (samples.containsKey(replicaNumber))
+        {
+            return getRandomElement(samples.get(replicaNumber));
+        }
+
+        return getRandomElement(samples.get(samples.keySet().toArray()[random.nextInt(samples.keySet().size())]));
+    }
+
+    /*
+     *  To perform the prediction, we randomly sample from the
+     *  collected WARS latencies, simulating writes followed by reads
+     *  exactly t milliseconds afterwards. We count the number of
+     *  reordered reads and writes to calculate the probability of
+     *  staleness along with recording operation latencies.
+     */
+
+
+    public PBSPredictionResult doPrediction(int n,
+                                            int r,
+                                            int w,
+                                            float timeSinceWrite,
+                                            int numberVersionsStale,
+                                            float percentileLatency) throws Exception
+    {
+        if (r > n)
+            throw new IllegalArgumentException("r must be less than n");
+        if (r < 0)
+            throw new IllegalArgumentException("r must be positive");
+        if (w > n)
+            throw new IllegalArgumentException("w must be less than n");
+        if (w < 0)
+            throw new IllegalArgumentException("w must be positive");
+        if (percentileLatency < 0 || percentileLatency > 1)
+            throw new IllegalArgumentException("percentileLatency must be between 0 and 1 inclusive");
+        if (numberVersionsStale < 0)
+            throw new IllegalArgumentException("numberVersionsStale must be positive");
+
+        if (!logLatencies)
+            throw new InvalidRequestException("Latency logging is not enabled");
+
+        // get a mapping of {replica number : latency} for each of WARS
+        Map<Integer, List<Long>> wLatencies = getOrderedWLatencies();
+        Map<Integer, List<Long>> aLatencies = getOrderedALatencies();
+        Map<Integer, List<Long>> rLatencies = getOrderedRLatencies();
+        Map<Integer, List<Long>> sLatencies = getOrderedSLatencies();
+
+        if (wLatencies.isEmpty() || aLatencies.isEmpty())
+            throw new InvalidRequestException("No write latencies have been recorded so far. Run some (non-local) inserts.");
+
+        if (rLatencies.isEmpty() || sLatencies.isEmpty())
+            throw new InvalidRequestException("No read latencies have been recorded so far. Run some (non-local) reads.");
+
+        // storage for simulated read and write latencies
+        ArrayList<Long> readLatencies = new ArrayList<Long>();
+        ArrayList<Long> writeLatencies = new ArrayList<Long>();
+
+        long consistentReads = 0;
+
+        // storage for latencies for each replica for a given Monte Carlo trial
+        // arr[i] will hold the ith replica's latency for one of WARS
+        ArrayList<Long> trialWLatencies = new ArrayList<Long>();
+        ArrayList<Long> trialRLatencies = new ArrayList<Long>();
+
+        ArrayList<Long> replicaWriteLatencies = new ArrayList<Long>();
+        ArrayList<Long> replicaReadLatencies = new ArrayList<Long>();
+
+        //run repeated trials and observe staleness
+        for (int i = 0; i < numberTrialsPrediction; ++i)
+        {
+            //simulate sending a write to N replicas then sending a
+            //read to N replicas and record the latencies by randomly
+            //sampling from gathered latencies
+            for (int replicaNo = 0; replicaNo < n; ++replicaNo)
+            {
+                long trialWLatency = getRandomLatencySample(wLatencies, replicaNo);
+                long trialALatency = getRandomLatencySample(aLatencies, replicaNo);
+
+                trialWLatencies.add(trialWLatency);
+
+                replicaWriteLatencies.add(trialWLatency + trialALatency);
+            }
+
+            // reads are only sent to R replicas - so pick R random read and
+            // response latencies
+            for (int replicaNo = 0; replicaNo < r; ++replicaNo)
+            {
+                long trialRLatency = getRandomLatencySample(rLatencies, replicaNo);
+                long trialSLatency = getRandomLatencySample(sLatencies, replicaNo);
+
+                trialRLatencies.add(trialRLatency);
+
+                replicaReadLatencies.add(trialRLatency + trialSLatency);
+            }
+
+            // the write latency for this trial is the time it takes
+            // for the wth replica to respond (W+A)
+            Collections.sort(replicaWriteLatencies);
+            long writeLatency = replicaWriteLatencies.get(w - 1);
+            writeLatencies.add(writeLatency);
+
+            ArrayList<Long> sortedReplicaReadLatencies = new ArrayList<Long>(replicaReadLatencies);
+            Collections.sort(sortedReplicaReadLatencies);
+
+            // the read latency for this trial is the time it takes
+            // for the rth replica to respond (R+S)
+            readLatencies.add(sortedReplicaReadLatencies.get(r - 1));
+
+            // were all of the read responses reordered?
+
+            // for each of the first r messages (the ones the
+            // coordinator will pick from):
+            //--if the read message came in after this replica saw the
+            // write, it will be consistent
+            //--each read request is sent at time
+            // writeLatency+timeSinceWrite
+
+            for (int responseNumber = 0; responseNumber < r; ++responseNumber)
+            {
+                int replicaNumber = replicaReadLatencies.indexOf(sortedReplicaReadLatencies.get(responseNumber));
+
+                if (writeLatency + timeSinceWrite + trialRLatencies.get(replicaNumber) >=
+                    trialWLatencies.get(replicaNumber))
+                {
+                    consistentReads++;
+                    break;
+                }
+
+                // tombstone this replica in the case that we have
+                // duplicate read latencies
+                replicaReadLatencies.set(replicaNumber, -1L);
+            }
+
+            // clear storage for the next trial
+            trialWLatencies.clear();
+            trialRLatencies.clear();
+
+            replicaReadLatencies.clear();
+            replicaWriteLatencies.clear();
+        }
+
+        float oneVersionConsistencyProbability = (float) consistentReads / numberTrialsPrediction;
+
+        // to calculate multi-version staleness, we exponentiate the staleness probability by the number of versions
+        float consistencyProbability = (float) (1 - Math.pow((double) (1 - oneVersionConsistencyProbability),
+                                                             numberVersionsStale));
+
+        float averageWriteLatency = listAverage(writeLatencies);
+        float averageReadLatency = listAverage(readLatencies);
+
+        long percentileWriteLatency = getPercentile(writeLatencies, percentileLatency);
+        long percentileReadLatency = getPercentile(readLatencies, percentileLatency);
+
+        return new PBSPredictionResult(n,
+                                       r,
+                                       w,
+                                       timeSinceWrite,
+                                       numberVersionsStale,
+                                       consistencyProbability,
+                                       averageReadLatency,
+                                       averageWriteLatency,
+                                       percentileReadLatency,
+                                       percentileLatency,
+                                       percentileWriteLatency,
+                                       percentileLatency);
+    }
+
+    public void startWriteOperation(String id)
+    {
+        if (!logLatencies)
+            return;
+
+        startWriteOperation(id, System.currentTimeMillis());
+    }
+
+    public void startWriteOperation(String id, long startTime)
+    {
+        if (!logLatencies)
+            return;
+
+        assert (!messageIdToWriteLats.containsKey(id));
+
+        writeMessageIds.add(id);
+
+        // LRU replacement of latencies
+        // the maximum number of entries is sloppy, but that's acceptable for our purposes
+        if (writeMessageIds.size() > maxLoggedLatencies)
+        {
+            String toEvict = writeMessageIds.remove();
+            messageIdToWriteLats.remove(toEvict);
+        }
+
+        messageIdToWriteLats.put(id, new MessageLatencyCollection(startTime));
+    }
+
+    public void startReadOperation(String id)
+    {
+        if (!logLatencies)
+            return;
+
+        startReadOperation(id, System.currentTimeMillis());
+    }
+
+    public void startReadOperation(String id, long startTime)
+    {
+        if (!logLatencies)
+            return;
+
+        assert (!messageIdToReadLats.containsKey(id));
+        readMessageIds.add(id);
+
+        // LRU replacement of latencies
+        // the maximum number of entries is sloppy, but that's acceptable for our purposes
+        if (readMessageIds.size() > maxLoggedLatencies)
+        {
+            String toEvict = readMessageIds.remove();
+            messageIdToReadLats.remove(toEvict);
+        }
+
+        messageIdToReadLats.put(id, new MessageLatencyCollection(startTime));
+    }
+
+    public void logWriteResponse(String id, long constructionTime)
+    {
+        if (!logLatencies)
+            return;
+
+        logWriteResponse(id, constructionTime, System.currentTimeMillis());
+    }
+
+    public void logWriteResponse(String id, long responseCreationTime, long receivedTime)
+    {
+        if (!logLatencies)
+            return;
+
+        MessageLatencyCollection writeLatsCollection = messageIdToWriteLats.get(id);
+        if (writeLatsCollection == null)
+        {
+            return;
+        }
+
+        Long startTime = writeLatsCollection.getStartTime();
+        writeLatsCollection.addSendLat(Math.max(0, responseCreationTime - startTime));
+        writeLatsCollection.addReplyLat(Math.max(0, receivedTime - responseCreationTime));
+    }
+
+    public void logReadResponse(String id, long constructionTime)
+    {
+        if (!logLatencies)
+            return;
+
+        logReadResponse(id, constructionTime, System.currentTimeMillis());
+    }
+
+    public void logReadResponse(String id, long responseCreationTime, long receivedTime)
+    {
+        if (!logLatencies)
+            return;
+
+        MessageLatencyCollection readLatsCollection = messageIdToReadLats.get(id);
+        if (readLatsCollection == null)
+        {
+            return;
+        }
+
+        Long startTime = readLatsCollection.getStartTime();
+        readLatsCollection.addSendLat(Math.max(0, responseCreationTime - startTime));
+        readLatsCollection.addReplyLat(Math.max(0, receivedTime - responseCreationTime));
+    }
+
+    Map<Integer, List<Long>> getOrderedWLatencies()
+    {
+        Collection<Collection<Long>> allWLatencies = new ArrayList<Collection<Long>>();
+        for (MessageLatencyCollection wlc : messageIdToWriteLats.values())
+        {
+            allWLatencies.add(wlc.getSendLats());
+        }
+
+        return getOrderedLatencies(allWLatencies);
+    }
+
+    Map<Integer, List<Long>> getOrderedALatencies()
+    {
+        Collection<Collection<Long>> allALatencies = new ArrayList<Collection<Long>>();
+        for (MessageLatencyCollection wlc : messageIdToWriteLats.values())
+            allALatencies.add(wlc.getReplyLats());
+        return getOrderedLatencies(allALatencies);
+    }
+
+    Map<Integer, List<Long>> getOrderedRLatencies()
+    {
+        Collection<Collection<Long>> allRLatencies = new ArrayList<Collection<Long>>();
+        for (MessageLatencyCollection rlc : messageIdToReadLats.values())
+        {
+            allRLatencies.add(rlc.getSendLats());
+        }
+        return getOrderedLatencies(allRLatencies);
+    }
+
+    Map<Integer, List<Long>> getOrderedSLatencies()
+    {
+        Collection<Collection<Long>> allSLatencies = new ArrayList<Collection<Long>>();
+        for (MessageLatencyCollection rlc : messageIdToReadLats.values())
+            allSLatencies.add(rlc.getReplyLats());
+        return getOrderedLatencies(allSLatencies);
+    }
+
+    // Return the collected latencies indexed by response number instead of by messageID
+    private Map<Integer, List<Long>> getOrderedLatencies(Collection<Collection<Long>> latencyLists)
+    {
+        Map<Integer, List<Long>> ret = new HashMap<Integer, List<Long>>();
+
+        // N may vary
+        int maxResponses = 0;
+
+        for (Collection<Long> latencies : latencyLists)
+        {
+            List<Long> sortedLatencies = new ArrayList<Long>(latencies);
+            Collections.sort(sortedLatencies);
+
+            if (sortedLatencies.size() > maxResponses)
+            {
+                for (int i = maxResponses + 1; i <= sortedLatencies.size(); ++i)
+                {
+                    ret.put(i, new Vector<Long>());
+                }
+
+                maxResponses = sortedLatencies.size();
+            }
+
+            // indexing by 0 is awkward since we're talking about the ith response
+            for (int i = 1; i <= sortedLatencies.size(); ++i)
+            {
+                ret.get(i).add(sortedLatencies.get(i - 1));
+            }
+        }
+
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b94b191/src/java/org/apache/cassandra/service/PBSPredictorMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/PBSPredictorMBean.java b/src/java/org/apache/cassandra/service/PBSPredictorMBean.java
new file mode 100644
index 0000000..8c7773d
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/PBSPredictorMBean.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+public interface PBSPredictorMBean
+{
+    public PBSPredictionResult doPrediction(int n,
+                                            int r,
+                                            int w,
+                                            float timeSinceWrite, 
+                                            int numberVersionsStale,
+                                            float percentileLatency) throws Exception;
+
+    public void enableConsistencyPredictionLogging();
+    public void disableConsistencyPredictionLogging();
+
+    public void setMaxLoggedLatenciesForConsistencyPrediction(int maxLogged);
+    public void setNumberTrialsForConsistencyPrediction(int numTrials);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b94b191/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index e800d88..723a838 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -405,6 +405,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
             throw new AssertionError(e);
         }
 
+        PBSPredictor.instance().init();
+
         if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true")))
         {
             logger.info("Loading persisted ring state");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b94b191/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index 704be4d..ae65832 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -33,6 +33,12 @@ import com.google.common.collect.LinkedHashMultimap;
 import com.google.common.collect.Maps;
 import org.apache.commons.cli.*;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.service.CacheServiceMBean;
+import org.apache.cassandra.service.PBSPredictionResult;
+import org.apache.cassandra.service.PBSPredictorMBean;
+import org.apache.cassandra.service.StorageProxyMBean;
+
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
 import org.apache.cassandra.db.Table;
@@ -137,7 +143,8 @@ public class NodeCmd
         DESCRIBERING,
         RANGEKEYSAMPLE,
         REBUILD_INDEX,
-        RESETLOCALSCHEMA
+        RESETLOCALSCHEMA,
+        PREDICTCONSISTENCY
     }
 
 
@@ -866,6 +873,48 @@ public class NodeCmd
         outs.println(probe.isThriftServerRunning() ? "running" : "not running");
     }
 
+    public void predictConsistency(Integer replicationFactor,
+                                   Integer timeAfterWrite,
+                                   Integer numVersions,
+                                   Float percentileLatency,
+                                   PrintStream output)
+    {
+        PBSPredictorMBean predictorMBean = probe.getPBSPredictorMBean();
+
+        for(int r = 1; r <= replicationFactor; ++r) {
+            for(int w = 1; w <= replicationFactor; ++w) {
+                if(w+r > replicationFactor+1)
+                    continue;
+
+                try {
+                    PBSPredictionResult result = predictorMBean.doPrediction(replicationFactor,
+                                                                             r,
+                                                                             w,
+                                                                             timeAfterWrite,
+                                                                             numVersions,
+                                                                             percentileLatency);
+
+                    if(r == 1 && w == 1) {
+                        output.printf("%dms after a given write, with maximum version staleness of k=%d\n", timeAfterWrite, numVersions);
+                    }
+
+                    output.printf("N=%d, R=%d, W=%d\n", replicationFactor, r, w);
+                    output.printf("Probability of consistent reads: %f\n", result.getConsistencyProbability());
+                    output.printf("Average read latency: %fms (%.3fth %%ile %dms)\n", result.getAverageReadLatency(),
+                                                                                   result.getPercentileReadLatencyPercentile()*100,
+                                                                                   result.getPercentileReadLatencyValue());
+                    output.printf("Average write latency: %fms (%.3fth %%ile %dms)\n\n", result.getAverageWriteLatency(),
+                                                                                      result.getPercentileWriteLatencyPercentile()*100,
+                                                                                      result.getPercentileWriteLatencyValue());
+                } catch (Exception e) {
+                        System.out.println(e.getMessage());
+                        e.printStackTrace();
+                        return;
+                }
+            }
+        }
+    }
+
     public static void main(String[] args) throws IOException, InterruptedException, ConfigurationException, ParseException
     {
         CommandLineParser parser = new PosixParser();
@@ -1134,6 +1183,20 @@ public class NodeCmd
                     nodeCmd.printRangeKeySample(System.out);
                     break;
 
+                case PREDICTCONSISTENCY:
+                    if (arguments.length < 2) { badUse("Requires replication factor and time"); }
+                    int numVersions = 1;
+                    if (arguments.length == 3) { numVersions = Integer.parseInt(arguments[2]); }
+                    float percentileLatency = .999f;
+                    if (arguments.length == 4) { percentileLatency = Float.parseFloat(arguments[3]); }
+
+                    nodeCmd.predictConsistency(Integer.parseInt(arguments[0]),
+                                               Integer.parseInt(arguments[1]),
+                                               numVersions,
+                                               percentileLatency,
+                                               System.out);
+                    break;
+
                 default :
                     throw new RuntimeException("Unreachable code.");
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b94b191/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 814e6d1..d5968c1 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -73,6 +73,7 @@ public class NodeProbe
     public MessagingServiceMBean msProxy;
     private FailureDetectorMBean fdProxy;
     private CacheServiceMBean cacheService;
+    private PBSPredictorMBean PBSPredictorProxy;
     private StorageProxyMBean spProxy;
 
     /**
@@ -142,6 +143,8 @@ public class NodeProbe
         {
             ObjectName name = new ObjectName(ssObjName);
             ssProxy = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class);
+            name = new ObjectName(PBSPredictor.MBEAN_NAME);
+            PBSPredictorProxy = JMX.newMBeanProxy(mbeanServerConn, name, PBSPredictorMBean.class);
             name = new ObjectName(MessagingService.MBEAN_NAME);
             msProxy = JMX.newMBeanProxy(mbeanServerConn, name, MessagingServiceMBean.class);
             name = new ObjectName(StreamingService.MBEAN_OBJECT_NAME);
@@ -714,6 +717,11 @@ public class NodeProbe
         return ssProxy.describeRingJMX(keyspaceName);
     }
 
+    public PBSPredictorMBean getPBSPredictorMBean()
+    {
+        return PBSPredictorProxy;
+    }
+
     public void rebuild(String sourceDc)
     {
         ssProxy.rebuild(sourceDc);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b94b191/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
----------------------------------------------------------------------
diff --git a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
index f2e6f9d..bbea055 100644
--- a/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
+++ b/src/resources/org/apache/cassandra/tools/NodeToolHelp.yaml
@@ -149,4 +149,6 @@ commands:
   - name: getsstables <keyspace> <cf> <key>
     help: |
       Print the sstable filenames that own the key
-
+  - name: predictconsistency <replication_factor> <time> [versions] [latency_percentile]
+    help: |
+      Predict latency and consistency "t" ms after writes

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0b94b191/test/unit/org/apache/cassandra/service/PBSPredictorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/PBSPredictorTest.java b/test/unit/org/apache/cassandra/service/PBSPredictorTest.java
new file mode 100644
index 0000000..92e863d
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/PBSPredictorTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.service;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class PBSPredictorTest
+{
+    private static PBSPredictor predictor = PBSPredictor.instance();
+
+    private void createWriteResponse(long W, long A, String id)
+    {
+        predictor.startWriteOperation(id, 0);
+        predictor.logWriteResponse(id, W, W+A);
+    }
+
+    private void createReadResponse(long R, long S, String id)
+    {
+        predictor.startReadOperation(id, 0);
+        predictor.logReadResponse(id, R, R+S);
+    }
+
+    @Test
+    public void testDoPrediction()
+    {
+        try {
+            predictor.enableConsistencyPredictionLogging();
+            predictor.init();
+
+            /*
+                Ensure accuracy given a set of basic latencies
+                Predictions here match a prior Python implementation
+             */
+
+            for (int i = 0; i < 10; ++i)
+            {
+                createWriteResponse(10, 0, String.format("W%d", i));
+                createReadResponse(0, 0, String.format("R%d", i));
+            }
+
+            for (int i = 0; i < 10; ++i)
+            {
+                createWriteResponse(0, 0, String.format("WS%d", i));
+            }
+
+            // 10ms after write
+            PBSPredictionResult result = predictor.doPrediction(2,1,1,10.0f,1, 0.99f);
+
+            assertEquals(1, result.getConsistencyProbability(), 0);
+            assertEquals(2.5, result.getAverageWriteLatency(), .5);
+
+            // 0ms after write
+            result = predictor.doPrediction(2,1,1,0f,1, 0.99f);
+
+            assertEquals(.75, result.getConsistencyProbability(), 0.05);
+
+            // k=5 versions staleness
+            result = predictor.doPrediction(2,1,1,5.0f,5, 0.99f);
+            assertEquals(.98, result.getConsistencyProbability(), 0.05);
+            assertEquals(2.5, result.getAverageWriteLatency(), .5);
+
+            for (int i = 0; i < 10; ++i)
+            {
+                createWriteResponse(20, 0, String.format("WL%d", i));
+            }
+
+            // 5ms after write
+            result = predictor.doPrediction(2,1,1,5.0f,1, 0.99f);
+
+            assertEquals(.67, result.getConsistencyProbability(), .05);
+
+            // N = 5
+            result = predictor.doPrediction(5,1,1,5.0f,1, 0.99f);
+
+            assertEquals(.42, result.getConsistencyProbability(), .05);
+            assertEquals(1.33, result.getAverageWriteLatency(), .5);
+
+            for (int i = 0; i < 10; ++i)
+            {
+                createWriteResponse(100, 100, String.format("WVL%d", i));
+                createReadResponse(100, 100, String.format("RL%d", i));
+            }
+
+            result = predictor.doPrediction(2,1,1,0f,1, 0.99f);
+
+            assertEquals(.860, result.getConsistencyProbability(), .05);
+            assertEquals(26.5, result.getAverageWriteLatency(), 1);
+            assertEquals(100.33, result.getAverageReadLatency(), 4);
+
+            result = predictor.doPrediction(2,2,1,0f,1, 0.99f);
+
+            assertEquals(1, result.getConsistencyProbability(), 0);
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+}


Mime
View raw message