cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmcken...@apache.org
Subject git commit: Add column update delta histogram
Date Tue, 04 Nov 2014 20:37:49 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 7bfb34714 -> e01976078


Add column update delta histogram

Patch by Sankalp Kohli, review by jmckenzie for CASSANDRA-7979


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e0197607
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e0197607
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e0197607

Branch: refs/heads/cassandra-2.0
Commit: e019760783e4d4ebf2d3ed60c33e7e63c226fa71
Parents: 7bfb347
Author: Sankalp Kohli <kohlisankalp@gmail.com>
Authored: Tue Nov 4 14:00:41 2014 -0600
Committer: Joshua McKenzie <jmckenzie@apache.org>
Committed: Tue Nov 4 14:00:41 2014 -0600

----------------------------------------------------------------------
 .../cassandra/db/AtomicSortedColumns.java       | 28 +++++++++++++-------
 .../apache/cassandra/db/ColumnFamilyStore.java  |  4 ++-
 src/java/org/apache/cassandra/db/Memtable.java  | 13 ++++-----
 .../cassandra/metrics/ColumnFamilyMetrics.java  |  3 +++
 .../cassandra/metrics/KeyspaceMetrics.java      |  3 +++
 5 files changed, 35 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0197607/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
index cacd3bb..993df5b 100644
--- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
@@ -22,7 +22,6 @@ import java.util.*;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
 
 import edu.stanford.ppl.concurrent.SnapTreeMap;
 
@@ -30,7 +29,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.filter.ColumnSlice;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.*;
 
 /**
  * A thread-safe and atomic ISortedColumns implementation.
@@ -51,6 +50,8 @@ public class AtomicSortedColumns extends ColumnFamily
 {
     private final AtomicReference<Holder> ref;
 
+    private static final boolean enableColUpdateTimeDelta = Boolean.parseBoolean(System.getProperty("cassandra.enableColUpdateTimeDelta",
"false"));
+
     public static final ColumnFamily.Factory<AtomicSortedColumns> factory = new Factory<AtomicSortedColumns>()
     {
         public AtomicSortedColumns create(CFMetaData metadata, boolean insertReversed)
@@ -155,9 +156,9 @@ public class AtomicSortedColumns extends ColumnFamily
     /**
      *  This is only called by Memtable.resolve, so only AtomicSortedColumns needs to implement
it.
      *
-     *  @return the difference in size seen after merging the given columns
+     *  @return the difference in size seen after merging the given columns and minimum time
delta between timestamps of two reconciled columns.
      */
-    public long addAllWithSizeDelta(ColumnFamily cm, Allocator allocator, Function<Column,
Column> transformation, SecondaryIndexManager.Updater indexer)
+    public Pair<Long,Long> addAllWithSizeDelta(ColumnFamily cm, Allocator allocator,
Function<Column, Column> transformation, SecondaryIndexManager.Updater indexer)
     {
         /*
          * This operation needs to atomicity and isolation. To that end, we
@@ -172,11 +173,13 @@ public class AtomicSortedColumns extends ColumnFamily
          */
         Holder current, modified;
         long sizeDelta;
+        long timeDelta;
 
         main_loop:
         do
         {
             sizeDelta = 0;
+            timeDelta = Long.MAX_VALUE;
             current = ref.get();
             DeletionInfo newDelInfo = current.deletionInfo;
             if (cm.deletionInfo().mayModify(newDelInfo))
@@ -188,7 +191,13 @@ public class AtomicSortedColumns extends ColumnFamily
 
             for (Column column : cm)
             {
-                sizeDelta += modified.addColumn(transformation.apply(column), allocator,
indexer);
+                final Pair<Integer, Long> pair = modified.addColumn(transformation.apply(column),
allocator, indexer);
+                sizeDelta += pair.left;
+
+                //We will store the minimum delta for all columns if enabled
+                if(enableColUpdateTimeDelta)
+                    timeDelta = Math.min(pair.right, timeDelta);
+
                 // bail early if we know we've been beaten
                 if (ref.get() != current)
                     continue main_loop;
@@ -198,7 +207,7 @@ public class AtomicSortedColumns extends ColumnFamily
 
         indexer.updateRowLevelIndexes();
 
-        return sizeDelta;
+        return Pair.create(sizeDelta, timeDelta);
     }
 
     public boolean replace(Column oldColumn, Column newColumn)
@@ -311,7 +320,7 @@ public class AtomicSortedColumns extends ColumnFamily
             return new Holder(new SnapTreeMap<ByteBuffer, Column>(map.comparator()),
LIVE);
         }
 
-        long addColumn(Column column, Allocator allocator, SecondaryIndexManager.Updater
indexer)
+        Pair<Integer, Long> addColumn(Column column, Allocator allocator, SecondaryIndexManager.Updater
indexer)
         {
             ByteBuffer name = column.name();
             while (true)
@@ -320,14 +329,15 @@ public class AtomicSortedColumns extends ColumnFamily
                 if (oldColumn == null)
                 {
                     indexer.insert(column);
-                    return column.dataSize();
+                    return Pair.create(column.dataSize(), Long.MAX_VALUE);
                 }
 
                 Column reconciledColumn = column.reconcile(oldColumn, allocator);
                 if (map.replace(name, oldColumn, reconciledColumn))
                 {
                     indexer.update(oldColumn, reconciledColumn);
-                    return reconciledColumn.dataSize() - oldColumn.dataSize();
+                    return Pair.create(reconciledColumn.dataSize() - oldColumn.dataSize(),
+                            enableColUpdateTimeDelta?  Math.abs(oldColumn.timestamp - column.timestamp):
Long.MAX_VALUE );
                 }
                 // We failed to replace column due to a concurrent update or a concurrent
removal. Keep trying.
                 // (Currently, concurrent removal should not happen (only updates), but let
us support that anyway.)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0197607/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d0ff951..8a18347 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -897,9 +897,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         long start = System.nanoTime();
 
         Memtable mt = getMemtableThreadSafe();
-        mt.put(key, columnFamily, indexer);
+        final long timeDelta = mt.put(key, columnFamily, indexer);
         maybeUpdateRowCache(key);
         metric.writeLatency.addNano(System.nanoTime() - start);
+        if(timeDelta < Long.MAX_VALUE)
+            metric.colUpdateTimeDeltaHistogram.update(timeDelta);
         mt.maybeUpdateLiveRatio();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0197607/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index f9a6719..0d55bb2 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Function;
 import com.google.common.base.Throwables;
+import org.apache.cassandra.utils.*;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,7 +40,6 @@ import org.apache.cassandra.io.sstable.SSTableMetadata;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.util.DiskAwareRunnable;
-import org.apache.cassandra.utils.Allocator;
 import org.github.jamm.MemoryMeter;
 
 public class Memtable
@@ -160,9 +160,9 @@ public class Memtable
      * (CFS handles locking to avoid submitting an op
      *  to a flushing memtable.  Any other way is unsafe.)
     */
-    void put(DecoratedKey key, ColumnFamily columnFamily, SecondaryIndexManager.Updater indexer)
+    long put(DecoratedKey key, ColumnFamily columnFamily, SecondaryIndexManager.Updater indexer)
     {
-        resolve(key, columnFamily, indexer);
+        return resolve(key, columnFamily, indexer);
     }
 
     public void maybeUpdateLiveRatio()
@@ -202,7 +202,7 @@ public class Memtable
         meterExecutor.submit(new MeteringRunnable(cfs));
     }
 
-    private void resolve(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater
indexer)
+    private long resolve(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater
indexer)
     {
         AtomicSortedColumns previous = rows.get(key);
 
@@ -215,9 +215,10 @@ public class Memtable
                 previous = empty;
         }
 
-        long sizeDelta = previous.addAllWithSizeDelta(cf, allocator, localCopyFunction, indexer);
-        currentSize.addAndGet(sizeDelta);
+        final Pair<Long, Long> pair = previous.addAllWithSizeDelta(cf, allocator, localCopyFunction,
indexer);
+        currentSize.addAndGet(pair.left);
         currentOperations.addAndGet(cf.getColumnCount() + (cf.isMarkedForDelete() ? 1 : 0)
+ cf.deletionInfo().rangeCount());
+        return pair.right;
     }
 
     // for debugging

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0197607/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
index a3838a0..c2b7d61 100644
--- a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java
@@ -93,6 +93,8 @@ public class ColumnFamilyMetrics
     public final ColumnFamilyHistogram tombstoneScannedHistogram;
     /** Live cells scanned in queries on this CF */
     public final ColumnFamilyHistogram liveScannedHistogram;
+    /** Column update time delta on this CF */
+    public final ColumnFamilyHistogram colUpdateTimeDeltaHistogram;
     /** CAS Prepare metrics */
     public final LatencyMetrics casPrepare;
     /** CAS Propose metrics */
@@ -448,6 +450,7 @@ public class ColumnFamilyMetrics
         });
         tombstoneScannedHistogram = createColumnFamilyHistogram("TombstoneScannedHistogram",
cfs.keyspace.metric.tombstoneScannedHistogram);
         liveScannedHistogram = createColumnFamilyHistogram("LiveScannedHistogram", cfs.keyspace.metric.liveScannedHistogram);
+        colUpdateTimeDeltaHistogram = createColumnFamilyHistogram("ColUpdateTimeDeltaHistogram",
cfs.keyspace.metric.colUpdateTimeDeltaHistogram);
         coordinatorReadLatency = Metrics.newTimer(factory.createMetricName("CoordinatorReadLatency"),
TimeUnit.MICROSECONDS, TimeUnit.SECONDS);
         coordinatorScanLatency = Metrics.newTimer(factory.createMetricName("CoordinatorScanLatency"),
TimeUnit.MICROSECONDS, TimeUnit.SECONDS);
         casPrepare = new LatencyMetrics(factory, "CasPrepare", cfs.keyspace.metric.casPrepare);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0197607/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index 7a768b8..0ea982e 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -62,6 +62,8 @@ public class KeyspaceMetrics
     public final Histogram tombstoneScannedHistogram;
     /** Live cells scanned in queries on this Keyspace */
     public final Histogram liveScannedHistogram;
+    /** Column update time delta on this Keyspace */
+    public final Histogram colUpdateTimeDeltaHistogram;
     /** CAS Prepare metric */
     public final LatencyMetrics casPrepare;
     /** CAS Propose metrics */
@@ -155,6 +157,7 @@ public class KeyspaceMetrics
         sstablesPerReadHistogram = Metrics.newHistogram(factory.createMetricName("SSTablesPerReadHistogram"),
true);
         tombstoneScannedHistogram = Metrics.newHistogram(factory.createMetricName("TombstoneScannedHistogram"),
true);
         liveScannedHistogram = Metrics.newHistogram(factory.createMetricName("LiveScannedHistogram"),
true);
+        colUpdateTimeDeltaHistogram = Metrics.newHistogram(factory.createMetricName("ColUpdateTimeDeltaHistogram"),
true);
         // add manually since histograms do not use createKeyspaceGauge method
         allMetrics.addAll(Lists.newArrayList("SSTablesPerReadHistogram", "TombstoneScannedHistogram",
"LiveScannedHistogram"));
 


Mime
View raw message