cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ble...@apache.org
Subject [1/2] cassandra git commit: Add mutation size and batch metrics
Date Mon, 12 Dec 2016 13:10:47 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk a8acb2a1b -> 31501cc8b


Add mutation size and batch metrics

patch by Alwyn Davis; reviewed by Benjamin Lerer for CASSANDRA-12649


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3b84de4d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3b84de4d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3b84de4d

Branch: refs/heads/trunk
Commit: 3b84de4da4243eaf5d1353c7154fb22c866eff7b
Parents: 8883554
Author: Alwyn Davis <alwyn@instaclustr.com>
Authored: Mon Dec 12 14:05:39 2016 +0100
Committer: Benjamin Lerer <b.lerer@gmail.com>
Committed: Mon Dec 12 14:05:39 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +-
 doc/source/operating/metrics.rst                |  25 ++++
 .../cql3/statements/BatchStatement.java         |  35 ++++--
 src/java/org/apache/cassandra/db/IMutation.java |  16 +++
 .../apache/cassandra/metrics/BatchMetrics.java  |  38 ++++++
 .../metrics/CASClientRequestMetrics.java        |   5 -
 .../metrics/CASClientWriteRequestMetrics.java   |  52 ++++++++
 .../metrics/ClientWriteRequestMetrics.java      |  47 ++++++++
 .../apache/cassandra/service/StorageProxy.java  |  16 ++-
 .../cassandra/metrics/BatchMetricsTest.java     | 119 +++++++++++++++++++
 10 files changed, 336 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6e41b2b..dfb849d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 3.12
- * add method to get size of endpoints to TokenMetadata (CASSANDRA-12999)
+ * Add mutation size and batch metrics (CASSANDRA-12649)
+ * Add method to get size of endpoints to TokenMetadata (CASSANDRA-12999)
  * Fix primary index calculation for SASI (CASSANDRA-12910)
  * Expose time spent waiting in thread pool queue (CASSANDRA-8398)
  * Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/doc/source/operating/metrics.rst
----------------------------------------------------------------------
diff --git a/doc/source/operating/metrics.rst b/doc/source/operating/metrics.rst
index 546d9c2..ef43128 100644
--- a/doc/source/operating/metrics.rst
+++ b/doc/source/operating/metrics.rst
@@ -249,6 +249,7 @@ Reported name format:
     UnfinishedCommit      Counter        Number of transactions that were committed on write.
     ConditionNotMet       Counter        Number of transaction preconditions did not match
current values.
     ContentionHistogram   Histogram      How many contended writes were encountered
+    MutationSizeHistogram Histogram      Total size in bytes of the requests mutations.
     ===================== ============== =============================================================
 
 
@@ -286,6 +287,7 @@ Reported name format:
     Failures              Counter        Number of write failures encountered.
     |nbsp|                Latency        Write latency.
     Unavailables          Counter        Number of unavailable exceptions encountered.
+    MutationSizeHistogram Histogram      Total size in bytes of the requests mutations.
     ===================== ============== =============================================================
 
 
@@ -585,6 +587,29 @@ connectedNativeClients      Counter        Number of clients connected
to this n
 connectedThriftClients      Counter        Number of clients connected to this nodes thrift
protocol server
 =========================== ============== ===========
 
+
+Batch Metrics
+^^^^^^^^^^^^^
+
+Metrics specifc to batch statements.
+
+Reported name format:
+
+**Metric Name**
+    ``org.apache.cassandra.metrics.Batch.<MetricName>``
+
+**JMX MBean**
+    ``org.apache.cassandra.metrics:type=Batch name=<MetricName>``
+
+=========================== ============== ===========
+Name                        Type           Description
+=========================== ============== ===========
+PartitionsPerCounterBatch   Histogram      Distribution of the number of partitions processed
per counter batch
+PartitionsPerLoggedBatch    Histogram      Distribution of the number of partitions processed
per logged batch
+PartitionsPerUnloggedBatch  Histogram      Distribution of the number of partitions processed
per unlogged batch
+=========================== ============== ===========
+
+
 JVM Metrics
 ^^^^^^^^^^^
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 83a8324..60a8df5 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.metrics.BatchMetrics;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.messages.ResultMessage;
@@ -79,6 +80,8 @@ public class BatchStatement implements CQLStatement
                                                                 "tables involved in an atomic
batch might cause batchlog " +
                                                                 "entries to expire before
being replayed.";
 
+    public static final BatchMetrics metrics = new BatchMetrics();
+
     /**
      * Creates a new BatchStatement from a list of statements and a
      * Thrift consistency level.
@@ -259,7 +262,7 @@ public class BatchStatement implements CQLStatement
     /**
      * Checks batch size to ensure threshold is met. If not, a warning is logged.
      *
-     * @param updates - the batch mutations.
+     * @param mutations - the batch mutations.
      */
     private static void verifyBatchSize(Collection<? extends IMutation> mutations)
throws InvalidRequestException
     {
@@ -267,14 +270,8 @@ public class BatchStatement implements CQLStatement
         if (mutations.size() <= 1)
             return;
 
-        long size = 0;
         long warnThreshold = DatabaseDescriptor.getBatchSizeWarnThreshold();
-
-        for (IMutation mutation : mutations)
-        {
-            for (PartitionUpdate update : mutation.getPartitionUpdates())
-                size += update.dataSize();
-        }
+        long size = IMutation.dataSize(mutations);
 
         if (size > warnThreshold)
         {
@@ -369,10 +366,23 @@ public class BatchStatement implements CQLStatement
         verifyBatchSize(mutations);
         verifyBatchType(mutations);
 
+        updatePartitionsPerBatchMetrics(mutations.size());
+
         boolean mutateAtomic = (isLogged() && mutations.size() > 1);
         StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic, queryStartNanoTime);
     }
 
+    private void updatePartitionsPerBatchMetrics(int updatedPartitions)
+    {
+        if (isLogged()) {
+            metrics.partitionsPerLoggedBatch.update(updatedPartitions);
+        } else if (isCounter()) {
+            metrics.partitionsPerCounterBatch.update(updatedPartitions);
+        } else {
+            metrics.partitionsPerUnloggedBatch.update(updatedPartitions);
+        }
+    }
+
     private ResultMessage executeWithConditions(BatchQueryOptions options, QueryState state,
long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
@@ -392,11 +402,16 @@ public class BatchStatement implements CQLStatement
                                                    state.getClientState(),
                                                    queryStartNanoTime))
         {
-            return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName,
tableName, result, columnsWithConditions, true, options.forStatement(0)));
+
+            return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName,
+                                                                                  tableName,
+                                                                                  result,
+                                                                                  columnsWithConditions,
+                                                                                  true,
+                                                                                  options.forStatement(0)));
         }
     }
 
-
     private Pair<CQL3CasRequest,Set<ColumnDefinition>> makeCasRequest(BatchQueryOptions
options, QueryState state)
     {
         long now = state.getTimestamp();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/src/java/org/apache/cassandra/db/IMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/IMutation.java b/src/java/org/apache/cassandra/db/IMutation.java
index c734e16..0ac89f7 100644
--- a/src/java/org/apache/cassandra/db/IMutation.java
+++ b/src/java/org/apache/cassandra/db/IMutation.java
@@ -31,4 +31,20 @@ public interface IMutation
     public long getTimeout();
     public String toString(boolean shallow);
     public Collection<PartitionUpdate> getPartitionUpdates();
+
+    /**
+     * Computes the total data size of the specified mutations.
+     * @param mutations the mutations
+     * @return the total data size of the specified mutations
+     */
+    public static long dataSize(Collection<? extends IMutation> mutations)
+    {
+        long size = 0;
+        for (IMutation mutation : mutations)
+        {
+            for (PartitionUpdate update : mutation.getPartitionUpdates())
+                size += update.dataSize();
+        }
+        return size;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/src/java/org/apache/cassandra/metrics/BatchMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/BatchMetrics.java b/src/java/org/apache/cassandra/metrics/BatchMetrics.java
new file mode 100644
index 0000000..9bea162
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/BatchMetrics.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import com.codahale.metrics.Histogram;
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+
+public class BatchMetrics
+{
+    private static final MetricNameFactory factory = new DefaultNameFactory("Batch");
+
+    public final Histogram partitionsPerLoggedBatch;
+    public final Histogram partitionsPerUnloggedBatch;
+    public final Histogram partitionsPerCounterBatch;
+
+    public BatchMetrics()
+    {
+        partitionsPerLoggedBatch = Metrics.histogram(factory.createMetricName("PartitionsPerLoggedBatch"),
false);
+        partitionsPerUnloggedBatch = Metrics.histogram(factory.createMetricName("PartitionsPerUnloggedBatch"),
false);
+        partitionsPerCounterBatch = Metrics.histogram(factory.createMetricName("PartitionsPerCounterBatch"),
false);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java b/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java
index f3f1f64..9884ff1 100644
--- a/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/CASClientRequestMetrics.java
@@ -23,12 +23,9 @@ import com.codahale.metrics.Histogram;
 
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 
-
 public class CASClientRequestMetrics extends ClientRequestMetrics
 {
     public final Histogram contention;
-    /* Used only for write  */
-    public final Counter conditionNotMet;
 
     public final Counter unfinishedCommit;
 
@@ -36,7 +33,6 @@ public class CASClientRequestMetrics extends ClientRequestMetrics
     {
         super(scope);
         contention = Metrics.histogram(factory.createMetricName("ContentionHistogram"), false);
-        conditionNotMet =  Metrics.counter(factory.createMetricName("ConditionNotMet"));
         unfinishedCommit =  Metrics.counter(factory.createMetricName("UnfinishedCommit"));
     }
 
@@ -44,7 +40,6 @@ public class CASClientRequestMetrics extends ClientRequestMetrics
     {
         super.release();
         Metrics.remove(factory.createMetricName("ContentionHistogram"));
-        Metrics.remove(factory.createMetricName("ConditionNotMet"));
         Metrics.remove(factory.createMetricName("UnfinishedCommit"));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/src/java/org/apache/cassandra/metrics/CASClientWriteRequestMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CASClientWriteRequestMetrics.java b/src/java/org/apache/cassandra/metrics/CASClientWriteRequestMetrics.java
new file mode 100644
index 0000000..5971074
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/CASClientWriteRequestMetrics.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+
+/**
+ * Metrics for tracking information about CAS write requests.
+ *
+ */
+public class CASClientWriteRequestMetrics extends CASClientRequestMetrics
+{
+    /**
+     * Metric for tracking the mutation sizes in bytes.
+     */
+    public final Histogram mutationSize;
+
+    public final Counter conditionNotMet;
+
+    public CASClientWriteRequestMetrics(String scope)
+    {
+        super(scope);
+        mutationSize = Metrics.histogram(factory.createMetricName("MutationSizeHistogram"),
false);
+        conditionNotMet =  Metrics.counter(factory.createMetricName("ConditionNotMet"));
+    }
+
+    public void release()
+    {
+        super.release();
+        Metrics.remove(factory.createMetricName("ConditionNotMet"));
+        Metrics.remove(factory.createMetricName("MutationSizeHistogram"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/src/java/org/apache/cassandra/metrics/ClientWriteRequestMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ClientWriteRequestMetrics.java b/src/java/org/apache/cassandra/metrics/ClientWriteRequestMetrics.java
new file mode 100644
index 0000000..50427af
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ClientWriteRequestMetrics.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import com.codahale.metrics.Histogram;
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+
+/**
+ * Metrics for tracking information about write requests.
+ *
+ */
+public class ClientWriteRequestMetrics extends ClientRequestMetrics
+{
+    /**
+     * Metric for tracking the mutation sizes in bytes.
+     */
+    public final Histogram mutationSize;
+
+    public ClientWriteRequestMetrics(String scope)
+    {
+        super(scope);
+        mutationSize = Metrics.histogram(factory.createMetricName("MutationSizeHistogram"),
false);
+    }
+
+    public void release()
+    {
+        super.release();
+        Metrics.remove(factory.createMetricName("MutationSizeHistogram"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index e0be68c..7d77bd4 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -99,12 +99,12 @@ public class StorageProxy implements StorageProxyMBean
     };
     private static final ClientRequestMetrics readMetrics = new ClientRequestMetrics("Read");
     private static final ClientRequestMetrics rangeMetrics = new ClientRequestMetrics("RangeSlice");
-    private static final ClientRequestMetrics writeMetrics = new ClientRequestMetrics("Write");
-    private static final CASClientRequestMetrics casWriteMetrics = new CASClientRequestMetrics("CASWrite");
+    private static final ClientWriteRequestMetrics writeMetrics = new ClientWriteRequestMetrics("Write");
+    private static final CASClientWriteRequestMetrics casWriteMetrics = new CASClientWriteRequestMetrics("CASWrite");
     private static final CASClientRequestMetrics casReadMetrics = new CASClientRequestMetrics("CASRead");
     private static final ViewWriteMetrics viewWriteMetrics = new ViewWriteMetrics("ViewWrite");
     private static final Map<ConsistencyLevel, ClientRequestMetrics> readMetricsMap
= new EnumMap<>(ConsistencyLevel.class);
-    private static final Map<ConsistencyLevel, ClientRequestMetrics> writeMetricsMap
= new EnumMap<>(ConsistencyLevel.class);
+    private static final Map<ConsistencyLevel, ClientWriteRequestMetrics> writeMetricsMap
= new EnumMap<>(ConsistencyLevel.class);
 
     private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10;
 
@@ -175,7 +175,7 @@ public class StorageProxy implements StorageProxyMBean
         for(ConsistencyLevel level : ConsistencyLevel.values())
         {
             readMetricsMap.put(level, new ClientRequestMetrics("Read-" + level.name()));
-            writeMetricsMap.put(level, new ClientRequestMetrics("Write-" + level.name()));
+            writeMetricsMap.put(level, new ClientWriteRequestMetrics("Write-" + level.name()));
         }
     }
 
@@ -273,6 +273,10 @@ public class StorageProxy implements StorageProxyMBean
                 // TODO turn null updates into delete?
                 PartitionUpdate updates = request.makeUpdates(current);
 
+                long size = updates.dataSize();
+                casWriteMetrics.mutationSize.update(size);
+                writeMetricsMap.get(consistencyForPaxos).mutationSize.update(size);
+
                 // Apply triggers to cas updates. A consideration here is that
                 // triggers emit Mutations, and so a given trigger implementation
                 // may generate mutations for partitions other than the one this
@@ -859,6 +863,10 @@ public class StorageProxy implements StorageProxyMBean
                               .viewManager
                               .updatesAffectView(mutations, true);
 
+        long size = IMutation.dataSize(mutations);
+        writeMetrics.mutationSize.update(size);
+        writeMetricsMap.get(consistencyLevel).mutationSize.update(size);
+
         if (augmented != null)
             mutateAtomically(augmented, consistencyLevel, updatesView, queryStartNanoTime);
         else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3b84de4d/test/unit/org/apache/cassandra/metrics/BatchMetricsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/metrics/BatchMetricsTest.java b/test/unit/org/apache/cassandra/metrics/BatchMetricsTest.java
new file mode 100644
index 0000000..60ee725
--- /dev/null
+++ b/test/unit/org/apache/cassandra/metrics/BatchMetricsTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.io.IOException;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static org.apache.cassandra.cql3.statements.BatchStatement.metrics;
+
+@RunWith(OrderedJUnit4ClassRunner.class)
+public class BatchMetricsTest extends SchemaLoader
+{
+    private static EmbeddedCassandraService cassandra;
+
+    private static Cluster cluster;
+    private static Session session;
+
+    private static String KEYSPACE = "junit";
+    private static final String TABLE = "batchmetricstest";
+
+    private static PreparedStatement ps;
+
+    @BeforeClass()
+    public static void setup() throws ConfigurationException, IOException
+    {
+        Schema.instance.clear();
+
+        cassandra = new EmbeddedCassandraService();
+        cassandra.start();
+
+        cluster = Cluster.builder().addContactPoint("127.0.0.1").withPort(DatabaseDescriptor.getNativeTransportPort()).build();
+        session = cluster.connect();
+
+        session.execute("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH replication
= { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };");
+        session.execute("USE " + KEYSPACE);
+        session.execute("CREATE TABLE IF NOT EXISTS " + TABLE + " (id int PRIMARY KEY, val
text);");
+
+        ps = session.prepare("INSERT INTO " + KEYSPACE + '.' + TABLE + " (id, val) VALUES
(?, ?);");
+    }
+
+    private void executeBatch(boolean isLogged, int distinctPartitions, int statementsPerPartition)
+    {
+        BatchStatement.Type batchType;
+
+        if (isLogged) {
+            batchType = BatchStatement.Type.LOGGED;
+        } else {
+            batchType = BatchStatement.Type.UNLOGGED;
+        }
+
+        BatchStatement batch = new BatchStatement(batchType);
+
+        for (int i=0; i<distinctPartitions; i++) {
+            for (int j=0; j<statementsPerPartition; j++) {
+                batch.add(ps.bind(i, "aaaaaaaa"));
+            }
+        }
+
+        session.execute(batch);
+    }
+
+    @Test
+    public void testLoggedPartitionsPerBatch() {
+        int partitionsPerBatchCountPre = (int) metrics.partitionsPerLoggedBatch.getCount();
+        executeBatch(true, 10, 2);
+        assertEquals(partitionsPerBatchCountPre+1, metrics.partitionsPerLoggedBatch.getCount());
+        assertTrue(partitionsPerBatchCountPre <= metrics.partitionsPerLoggedBatch.getSnapshot().getMax());
// decayingBuckets may not have exact value
+
+        partitionsPerBatchCountPre = (int) metrics.partitionsPerLoggedBatch.getCount();
+        executeBatch(true, 21, 2);
+        assertEquals(partitionsPerBatchCountPre+1, metrics.partitionsPerLoggedBatch.getCount());
+        assertTrue(partitionsPerBatchCountPre <= metrics.partitionsPerLoggedBatch.getSnapshot().getMax());
+    }
+
+    @Test
+    public void testUnloggedPartitionsPerBatch() {
+        int partitionsPerBatchCountPre = (int) metrics.partitionsPerUnloggedBatch.getCount();
+        executeBatch(false, 7, 2);
+        assertEquals(partitionsPerBatchCountPre+1, metrics.partitionsPerUnloggedBatch.getCount());
+        assertTrue(partitionsPerBatchCountPre <= metrics.partitionsPerUnloggedBatch.getSnapshot().getMax());
+
+        partitionsPerBatchCountPre = (int) metrics.partitionsPerUnloggedBatch.getCount();
+        executeBatch(false, 25, 2);
+        assertEquals(partitionsPerBatchCountPre+1, metrics.partitionsPerUnloggedBatch.getCount());
+        assertTrue(partitionsPerBatchCountPre <= metrics.partitionsPerUnloggedBatch.getSnapshot().getMax());
+    }
+}


Mime
View raw message