cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject cassandra git commit: Provide additional metrics for materialized views
Date Fri, 02 Oct 2015 15:10:45 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 425bfaf20 -> 62ffa355e


Provide additional metrics for materialized views

patch by clnwsu; reviewed by carlyeks for CASSANDRA-10323


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/62ffa355
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/62ffa355
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/62ffa355

Branch: refs/heads/cassandra-3.0
Commit: 62ffa355ee0431c87ee9ec665c35f881bcad4c74
Parents: 425bfaf
Author: Chris Lohfink <Chris.Lohfink@datastax.com>
Authored: Mon Sep 28 16:45:02 2015 -0500
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Fri Oct 2 17:09:25 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/db/Keyspace.java  | 27 +++++++---
 src/java/org/apache/cassandra/db/Mutation.java  |  6 ++-
 src/java/org/apache/cassandra/db/view/View.java |  3 ++
 .../apache/cassandra/db/view/ViewBuilder.java   |  4 +-
 .../apache/cassandra/db/view/ViewManager.java   | 19 ++-----
 .../cassandra/metrics/KeyspaceMetrics.java      |  8 ++-
 .../apache/cassandra/metrics/TableMetrics.java  | 57 ++++++++++++++++++--
 .../cassandra/metrics/ViewWriteMetrics.java     | 18 ++++++-
 .../apache/cassandra/service/StorageProxy.java  | 22 +++++---
 10 files changed, 128 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffa355/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5fddce9..5bf70ca 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Provide additional metrics for materialized views (CASSANDRA-10323)
  * Flush system schema tables after local schema changes (CASSANDRA-10429)
 Merged from 2.2:
  * cqlsh prompt includes name of keyspace after failed `use` statement (CASSANDRA-10369)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffa355/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index cf34e9a..293f8a3 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -20,9 +20,8 @@ package org.apache.cassandra.db;
 import java.io.File;
 import java.io.IOException;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 
 import com.google.common.base.Function;
@@ -30,10 +29,7 @@ import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.CompactionManager;
@@ -420,6 +416,7 @@ public class Keyspace
 
         if (requiresViewUpdate)
         {
+            mutation.viewLockAcquireStart.compareAndSet(0L, System.currentTimeMillis());
             lock = ViewManager.acquireLockFor(mutation.key().getKey());
 
             if (lock == null)
@@ -444,6 +441,17 @@ public class Keyspace
                     return;
                 }
             }
+            else
+            {
+                long acquireTime = System.currentTimeMillis() - mutation.viewLockAcquireStart.get();
+                if (!isClReplay)
+                {
+                    for(UUID cfid : mutation.getColumnFamilyIds())
+                    {
+                        columnFamilyStores.get(cfid).metric.viewLockAcquireTime.update(acquireTime,
TimeUnit.MILLISECONDS);
+                    }
+                }
+            }
         }
         int nowInSec = FBUtilities.nowInSeconds();
         try (OpOrder.Group opGroup = writeOrder.start())
@@ -464,13 +472,14 @@ public class Keyspace
                     logger.error("Attempting to mutate non-existant table {} ({}.{})", upd.metadata().cfId,
upd.metadata().ksName, upd.metadata().cfName);
                     continue;
                 }
+                AtomicLong baseComplete = new AtomicLong(Long.MAX_VALUE);
 
                 if (requiresViewUpdate)
                 {
                     try
                     {
                         Tracing.trace("Creating materialized view mutations from base table
replica");
-                        viewManager.pushViewReplicaUpdates(upd, !isClReplay);
+                        viewManager.pushViewReplicaUpdates(upd, !isClReplay, baseComplete);
                     }
                     catch (Throwable t)
                     {
@@ -486,6 +495,8 @@ public class Keyspace
                                                      ? cfs.indexManager.newUpdateTransaction(upd,
opGroup, nowInSec)
                                                      : UpdateTransaction.NO_OP;
                 cfs.apply(upd, indexTransaction, opGroup, replayPosition);
+                if (requiresViewUpdate)
+                    baseComplete.set(System.currentTimeMillis());
             }
         }
         finally

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffa355/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 7696e04..cbc7e17 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -20,13 +20,14 @@ package org.apache.cassandra.db;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.SerializationHelper;
-import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -56,6 +57,9 @@ public class Mutation implements IMutation
 
     // Time at which this mutation was instantiated
     public final long createdAt = System.currentTimeMillis();
+    // keep track of when mutation has started waiting for a MV partition lock
+    public final AtomicLong viewLockAcquireStart = new AtomicLong(0);
+
     public Mutation(String keyspaceName, DecoratedKey key)
     {
         this(keyspaceName, key, new HashMap<>());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffa355/src/java/org/apache/cassandra/db/view/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java
index 0a7f747..87ea2ec 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.view;
 
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import javax.annotation.Nullable;
@@ -527,6 +528,7 @@ public class View
      */
     private void readLocalRows(TemporalRow.Set rowSet)
     {
+        long start = System.currentTimeMillis();
         SinglePartitionSliceBuilder builder = new SinglePartitionSliceBuilder(baseCfs, rowSet.dk);
 
         for (TemporalRow temporalRow : rowSet)
@@ -551,6 +553,7 @@ public class View
                 }
             }
         }
+        baseCfs.metric.viewReadTime.update(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffa355/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 0a0fe08..35b023b 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.db.view;
 import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 
 import com.google.common.base.Function;
@@ -70,6 +71,7 @@ public class ViewBuilder extends CompactionInfo.Holder
 
     private void buildKey(DecoratedKey key)
     {
+        AtomicLong noBase = new AtomicLong(Long.MAX_VALUE);
         ReadQuery selectQuery = view.getReadQuery();
         if (!selectQuery.selectsKey(key))
             return;
@@ -92,7 +94,7 @@ public class ViewBuilder extends CompactionInfo.Holder
                    Collection<Mutation> mutations = view.createMutations(partition,
temporalRows, true);
 
                    if (mutations != null)
-                       StorageProxy.mutateMV(key.getKey(), mutations, true);
+                       StorageProxy.mutateMV(key.getKey(), mutations, true, noBase);
                }
            }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffa355/src/java/org/apache/cassandra/db/view/ViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java b/src/java/org/apache/cassandra/db/view/ViewManager.java
index 2364ed1..efadd72 100644
--- a/src/java/org/apache/cassandra/db/view/ViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/ViewManager.java
@@ -18,15 +18,10 @@
 package org.apache.cassandra.db.view;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 
 import com.google.common.collect.Lists;
@@ -34,11 +29,7 @@ import com.google.common.util.concurrent.Striped;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ViewDefinition;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.IMutation;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.dht.Token;
@@ -128,7 +119,7 @@ public class ViewManager
      * Calculates and pushes updates to the views replicas. The replicas are determined by
      * {@link ViewUtils#getViewNaturalEndpoint(String, Token, Token)}.
      */
-    public void pushViewReplicaUpdates(PartitionUpdate update, boolean writeCommitLog)
+    public void pushViewReplicaUpdates(PartitionUpdate update, boolean writeCommitLog, AtomicLong
baseComplete)
     {
         List<Mutation> mutations = null;
         TemporalRow.Set temporalRows = null;
@@ -146,7 +137,7 @@ public class ViewManager
         }
 
         if (mutations != null)
-            StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog);
+            StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog,
baseComplete);
     }
 
     public boolean updatesAffectView(Collection<? extends IMutation> mutations, boolean
coordinatorBatchlog)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffa355/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index 9c886b0..62add07 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -21,6 +21,7 @@ import java.util.Set;
 
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Timer;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 
@@ -29,7 +30,6 @@ import com.google.common.collect.Sets;
 
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 
-
 /**
  * Metrics for {@link ColumnFamilyStore}.
  */
@@ -81,6 +81,10 @@ public class KeyspaceMetrics
     public final Histogram liveScannedHistogram;
     /** Column update time delta on this Keyspace */
     public final Histogram colUpdateTimeDeltaHistogram;
+    /** time taken acquiring the partition lock for materialized view updates on this keyspace
*/
+    public final Timer viewLockAcquireTime;
+    /** time taken during the local read of a materialized view update */
+    public final Timer viewReadTime;
     /** CAS Prepare metric */
     public final LatencyMetrics casPrepare;
     /** CAS Propose metrics */
@@ -224,6 +228,8 @@ public class KeyspaceMetrics
         tombstoneScannedHistogram = Metrics.histogram(factory.createMetricName("TombstoneScannedHistogram"));
         liveScannedHistogram = Metrics.histogram(factory.createMetricName("LiveScannedHistogram"));
         colUpdateTimeDeltaHistogram = Metrics.histogram(factory.createMetricName("ColUpdateTimeDeltaHistogram"));
+        viewLockAcquireTime =  Metrics.timer(factory.createMetricName("ViewLockAcquireTime"));
+        viewReadTime = Metrics.timer(factory.createMetricName("ViewReadTime"));
         // add manually since histograms do not use createKeyspaceGauge method
         allMetrics.addAll(Lists.newArrayList("SSTablesPerReadHistogram", "TombstoneScannedHistogram",
"LiveScannedHistogram"));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffa355/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 124b8ca..3cd5b5b 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -20,16 +20,15 @@ package org.apache.cassandra.metrics;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
-
-import com.google.common.collect.Maps;
+import java.util.concurrent.TimeUnit;
 
 import com.codahale.metrics.*;
 import com.codahale.metrics.Timer;
+import com.google.common.collect.Maps;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.EstimatedHistogram;
@@ -37,7 +36,6 @@ import org.apache.cassandra.utils.TopKSampler;
 
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 
-
 /**
  * Metrics for {@link ColumnFamilyStore}.
  */
@@ -116,6 +114,10 @@ public class TableMetrics
     public final TableHistogram liveScannedHistogram;
     /** Column update time delta on this CF */
     public final TableHistogram colUpdateTimeDeltaHistogram;
+    /** time taken acquiring the partition lock for materialized view updates for this table
*/
+    public final TableTimer viewLockAcquireTime;
+    /** time taken during the local read of a materialized view update */
+    public final TableTimer viewReadTime;
     /** Disk space used by snapshot files which */
     public final Gauge<Long> trueSnapshotsSize;
     /** Row cache hits, but result out of range */
@@ -615,6 +617,19 @@ public class TableMetrics
         coordinatorScanLatency = Metrics.timer(factory.createMetricName("CoordinatorScanLatency"));
         waitingOnFreeMemtableSpace = Metrics.histogram(factory.createMetricName("WaitingOnFreeMemtableSpace"));
 
+        // We do not want to capture view mutation specific metrics for a view
+        // They only makes sense to capture on the base table
+        if (cfs.metadata.isView())
+        {
+            viewLockAcquireTime = null;
+            viewReadTime = null;
+        }
+        else
+        {
+            viewLockAcquireTime = createTableTimer("ViewLockAcquireTime", cfs.keyspace.metric.viewLockAcquireTime);
+            viewReadTime = createTableTimer("ViewReadTime", cfs.keyspace.metric.viewReadTime);
+        }
+
         trueSnapshotsSize = createTableGauge("SnapshotsSize", new Gauge<Long>()
         {
             public Long getValue()
@@ -751,6 +766,21 @@ public class TableMetrics
                                                     globalAliasFactory.createMetricName(alias)));
     }
 
+    protected TableTimer createTableTimer(String name, Timer keyspaceTimer)
+    {
+        return createTableTimer(name, name, keyspaceTimer);
+    }
+
+    protected TableTimer createTableTimer(String name, String alias, Timer keyspaceTimer)
+    {
+        Timer cfTimer = Metrics.timer(factory.createMetricName(name), aliasFactory.createMetricName(alias));
+        register(name, alias, cfTimer);
+        return new TableTimer(cfTimer,
+                              keyspaceTimer,
+                              Metrics.timer(globalFactory.createMetricName(name),
+                                            globalAliasFactory.createMetricName(alias)));
+    }
+
     /**
      * Registers a metric to be removed when unloading CF.
      * @return true if first time metric with that name has been registered
@@ -782,6 +812,25 @@ public class TableMetrics
         }
     }
 
+    public static class TableTimer
+    {
+        public final Timer[] all;
+        public final Timer cf;
+        private TableTimer(Timer cf, Timer keyspace, Timer global)
+        {
+            this.cf = cf;
+            this.all = new Timer[]{cf, keyspace, global};
+        }
+
+        public void update(long i, TimeUnit unit)
+        {
+            for(Timer timer : all)
+            {
+                timer.update(i, unit);
+            }
+        }
+    }
+
     static class TableMetricNameFactory implements MetricNameFactory
     {
         private final String keyspaceName;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffa355/src/java/org/apache/cassandra/metrics/ViewWriteMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ViewWriteMetrics.java b/src/java/org/apache/cassandra/metrics/ViewWriteMetrics.java
index c99cc5c..df98865 100644
--- a/src/java/org/apache/cassandra/metrics/ViewWriteMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ViewWriteMetrics.java
@@ -18,19 +18,31 @@
 
 package org.apache.cassandra.metrics;
 
-import com.codahale.metrics.Counter;
-
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Timer;
+import com.codahale.metrics.Gauge;
+
 public class ViewWriteMetrics extends ClientRequestMetrics
 {
     public final Counter viewReplicasAttempted;
     public final Counter viewReplicasSuccess;
+    // time between when mutation is applied to local memtable to when CL.ONE is achieved
on MV
+    public final Timer viewWriteLatency;
 
     public ViewWriteMetrics(String scope) {
         super(scope);
         viewReplicasAttempted = Metrics.counter(factory.createMetricName("ViewReplicasAttempted"));
         viewReplicasSuccess = Metrics.counter(factory.createMetricName("ViewReplicasSuccess"));
+        viewWriteLatency = Metrics.timer(factory.createMetricName("ViewWriteLatency"));
+        Metrics.register(factory.createMetricName("ViewPendingMutations"), new Gauge<Long>()
+                {
+                    public Long getValue()
+                    {
+                        return viewReplicasAttempted.getCount() - viewReplicasSuccess.getCount();
+                    }
+                });
     }
 
     public void release()
@@ -38,5 +50,7 @@ public class ViewWriteMetrics extends ClientRequestMetrics
         super.release();
         Metrics.remove(factory.createMetricName("ViewReplicasAttempted"));
         Metrics.remove(factory.createMetricName("ViewReplicasSuccess"));
+        Metrics.remove(factory.createMetricName("ViewWriteLatency"));
+        Metrics.remove(factory.createMetricName("ViewPendingMutations"));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffa355/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 5c94f08..d1142fc 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -35,20 +36,19 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.batchlog.Batch;
+import org.apache.cassandra.batchlog.BatchlogManager;
+import org.apache.cassandra.batchlog.LegacyBatchlogMigrator;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.HintedHandOffManager;
-import org.apache.cassandra.batchlog.*;
-import org.apache.cassandra.batchlog.LegacyBatchlogMigrator;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.RowIterator;
-import org.apache.cassandra.db.view.ViewManager;
 import org.apache.cassandra.db.view.ViewUtils;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.*;
@@ -650,8 +650,10 @@ public class StorageProxy implements StorageProxyMBean
      * across all replicas.
      *
      * @param mutations the mutations to be applied across the replicas
+     * @param writeCommitLog if commitlog should be written
+     * @param baseComplete time from epoch in ms that the local base mutation was(or will
be) completed
      */
-    public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations,
boolean writeCommitLog)
+    public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations,
boolean writeCommitLog, AtomicLong baseComplete)
     throws UnavailableException, OverloadedException, WriteTimeoutException
     {
         Tracing.trace("Determining replicas for mutation");
@@ -684,13 +686,17 @@ public class StorageProxy implements StorageProxyMBean
                                                                                    consistencyLevel,
                                                                                    consistencyLevel,
                                                                                    naturalEndpoints,
+                                                                                   baseComplete,
                                                                                    WriteType.BATCH,
                                                                                    cleanup);
 
                 // When local node is the endpoint and there are no pending nodes we can
                 // Just apply the mutation locally.
                 if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) && wrapper.handler.pendingEndpoints.isEmpty())
+                {
                     mutation.apply(writeCommitLog);
+                    viewWriteMetrics.viewReplicasSuccess.inc();
+                }
                 else
                     wrappers.add(wrapper);
             }
@@ -980,6 +986,7 @@ public class StorageProxy implements StorageProxyMBean
                                                                             ConsistencyLevel
consistency_level,
                                                                             ConsistencyLevel
batchConsistencyLevel,
                                                                             List<InetAddress>
naturalEndpoints,
+                                                                            AtomicLong baseComplete,
                                                                             WriteType writeType,
                                                                             BatchlogResponseHandler.BatchlogCleanup
cleanup)
     {
@@ -988,7 +995,10 @@ public class StorageProxy implements StorageProxyMBean
         String keyspaceName = mutation.getKeyspaceName();
         Token tk = mutation.key().getToken();
         Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk,
keyspaceName);
-        AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints,
pendingEndpoints, consistency_level, null, writeType);
+        AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints,
pendingEndpoints, consistency_level, () -> {
+            long delay = Math.max(0, System.currentTimeMillis() - baseComplete.get());
+            viewWriteMetrics.viewWriteLatency.update(delay, TimeUnit.MILLISECONDS);
+        }, writeType);
         BatchlogResponseHandler<IMutation> batchHandler = new ViewWriteMetricsWrapped(writeHandler,
batchConsistencyLevel.blockFor(keyspace), cleanup);
         return new WriteResponseHandlerWrapper(batchHandler, mutation);
     }


Mime
View raw message