Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6B50A17AA4 for ; Tue, 4 Nov 2014 20:48:55 +0000 (UTC) Received: (qmail 58237 invoked by uid 500); 4 Nov 2014 20:48:55 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 58191 invoked by uid 500); 4 Nov 2014 20:48:55 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 58166 invoked by uid 99); 4 Nov 2014 20:48:55 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Nov 2014 20:48:55 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 160378AEC13; Tue, 4 Nov 2014 20:38:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jmckenzie@apache.org To: commits@cassandra.apache.org Date: Tue, 04 Nov 2014 20:38:54 -0000 Message-Id: <6189057521b24246bdf9044a346b96e2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/4] git commit: Add column update delta histogram Repository: cassandra Updated Branches: refs/heads/trunk bd32104ec -> 254cd85a5 Add column update delta histogram Patch by Sankalp Kohli, review by jmckenzie for CASSANDRA-7979 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e0197607 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e0197607 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e0197607 Branch: refs/heads/trunk Commit: e019760783e4d4ebf2d3ed60c33e7e63c226fa71 Parents: 7bfb347 Author: Sankalp Kohli Authored: Tue Nov 4 14:00:41 2014 -0600 Committer: Joshua McKenzie Committed: Tue Nov 4 14:00:41 2014 -0600 ---------------------------------------------------------------------- .../cassandra/db/AtomicSortedColumns.java | 28 +++++++++++++------- .../apache/cassandra/db/ColumnFamilyStore.java | 4 ++- src/java/org/apache/cassandra/db/Memtable.java | 13 ++++----- .../cassandra/metrics/ColumnFamilyMetrics.java | 3 +++ .../cassandra/metrics/KeyspaceMetrics.java | 3 +++ 5 files changed, 35 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0197607/src/java/org/apache/cassandra/db/AtomicSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java index cacd3bb..993df5b 100644 --- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java +++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java @@ -22,7 +22,6 @@ import java.util.*; import java.util.concurrent.atomic.AtomicReference; import com.google.common.base.Function; -import com.google.common.collect.Iterables; import edu.stanford.ppl.concurrent.SnapTreeMap; @@ -30,7 +29,7 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.filter.ColumnSlice; import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.utils.Allocator; +import org.apache.cassandra.utils.*; /** * A thread-safe and atomic ISortedColumns implementation. @@ -51,6 +50,8 @@ public class AtomicSortedColumns extends ColumnFamily { private final AtomicReference ref; + private static final boolean enableColUpdateTimeDelta = Boolean.parseBoolean(System.getProperty("cassandra.enableColUpdateTimeDelta", "false")); + public static final ColumnFamily.Factory factory = new Factory() { public AtomicSortedColumns create(CFMetaData metadata, boolean insertReversed) @@ -155,9 +156,9 @@ public class AtomicSortedColumns extends ColumnFamily /** * This is only called by Memtable.resolve, so only AtomicSortedColumns needs to implement it. * - * @return the difference in size seen after merging the given columns + * @return the difference in size seen after merging the given columns and minimum time delta between timestamps of two reconciled columns. */ - public long addAllWithSizeDelta(ColumnFamily cm, Allocator allocator, Function transformation, SecondaryIndexManager.Updater indexer) + public Pair addAllWithSizeDelta(ColumnFamily cm, Allocator allocator, Function transformation, SecondaryIndexManager.Updater indexer) { /* * This operation needs to atomicity and isolation. To that end, we @@ -172,11 +173,13 @@ public class AtomicSortedColumns extends ColumnFamily */ Holder current, modified; long sizeDelta; + long timeDelta; main_loop: do { sizeDelta = 0; + timeDelta = Long.MAX_VALUE; current = ref.get(); DeletionInfo newDelInfo = current.deletionInfo; if (cm.deletionInfo().mayModify(newDelInfo)) @@ -188,7 +191,13 @@ public class AtomicSortedColumns extends ColumnFamily for (Column column : cm) { - sizeDelta += modified.addColumn(transformation.apply(column), allocator, indexer); + final Pair pair = modified.addColumn(transformation.apply(column), allocator, indexer); + sizeDelta += pair.left; + + //We will store the minimum delta for all columns if enabled + if(enableColUpdateTimeDelta) + timeDelta = Math.min(pair.right, timeDelta); + // bail early if we know we've been beaten if (ref.get() != current) continue main_loop; @@ -198,7 +207,7 @@ public class AtomicSortedColumns extends ColumnFamily indexer.updateRowLevelIndexes(); - return sizeDelta; + return Pair.create(sizeDelta, timeDelta); } public boolean replace(Column oldColumn, Column newColumn) @@ -311,7 +320,7 @@ public class AtomicSortedColumns extends ColumnFamily return new Holder(new SnapTreeMap(map.comparator()), LIVE); } - long addColumn(Column column, Allocator allocator, SecondaryIndexManager.Updater indexer) + Pair addColumn(Column column, Allocator allocator, SecondaryIndexManager.Updater indexer) { ByteBuffer name = column.name(); while (true) @@ -320,14 +329,15 @@ public class AtomicSortedColumns extends ColumnFamily if (oldColumn == null) { indexer.insert(column); - return column.dataSize(); + return Pair.create(column.dataSize(), Long.MAX_VALUE); } Column reconciledColumn = column.reconcile(oldColumn, allocator); if (map.replace(name, oldColumn, reconciledColumn)) { indexer.update(oldColumn, reconciledColumn); - return reconciledColumn.dataSize() - oldColumn.dataSize(); + return Pair.create(reconciledColumn.dataSize() - oldColumn.dataSize(), + enableColUpdateTimeDelta? Math.abs(oldColumn.timestamp - column.timestamp): Long.MAX_VALUE ); } // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying. // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0197607/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index d0ff951..8a18347 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -897,9 +897,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean long start = System.nanoTime(); Memtable mt = getMemtableThreadSafe(); - mt.put(key, columnFamily, indexer); + final long timeDelta = mt.put(key, columnFamily, indexer); maybeUpdateRowCache(key); metric.writeLatency.addNano(System.nanoTime() - start); + if(timeDelta < Long.MAX_VALUE) + metric.colUpdateTimeDeltaHistogram.update(timeDelta); mt.maybeUpdateLiveRatio(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0197607/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index f9a6719..0d55bb2 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Function; import com.google.common.base.Throwables; +import org.apache.cassandra.utils.*; import org.cliffc.high_scale_lib.NonBlockingHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +40,6 @@ import org.apache.cassandra.io.sstable.SSTableMetadata; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.sstable.SSTableWriter; import org.apache.cassandra.io.util.DiskAwareRunnable; -import org.apache.cassandra.utils.Allocator; import org.github.jamm.MemoryMeter; public class Memtable @@ -160,9 +160,9 @@ public class Memtable * (CFS handles locking to avoid submitting an op * to a flushing memtable. Any other way is unsafe.) */ - void put(DecoratedKey key, ColumnFamily columnFamily, SecondaryIndexManager.Updater indexer) + long put(DecoratedKey key, ColumnFamily columnFamily, SecondaryIndexManager.Updater indexer) { - resolve(key, columnFamily, indexer); + return resolve(key, columnFamily, indexer); } public void maybeUpdateLiveRatio() @@ -202,7 +202,7 @@ public class Memtable meterExecutor.submit(new MeteringRunnable(cfs)); } - private void resolve(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer) + private long resolve(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer) { AtomicSortedColumns previous = rows.get(key); @@ -215,9 +215,10 @@ public class Memtable previous = empty; } - long sizeDelta = previous.addAllWithSizeDelta(cf, allocator, localCopyFunction, indexer); - currentSize.addAndGet(sizeDelta); + final Pair pair = previous.addAllWithSizeDelta(cf, allocator, localCopyFunction, indexer); + currentSize.addAndGet(pair.left); currentOperations.addAndGet(cf.getColumnCount() + (cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount()); + return pair.right; } // for debugging http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0197607/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java index a3838a0..c2b7d61 100644 --- a/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ColumnFamilyMetrics.java @@ -93,6 +93,8 @@ public class ColumnFamilyMetrics public final ColumnFamilyHistogram tombstoneScannedHistogram; /** Live cells scanned in queries on this CF */ public final ColumnFamilyHistogram liveScannedHistogram; + /** Column update time delta on this CF */ + public final ColumnFamilyHistogram colUpdateTimeDeltaHistogram; /** CAS Prepare metrics */ public final LatencyMetrics casPrepare; /** CAS Propose metrics */ @@ -448,6 +450,7 @@ public class ColumnFamilyMetrics }); tombstoneScannedHistogram = createColumnFamilyHistogram("TombstoneScannedHistogram", cfs.keyspace.metric.tombstoneScannedHistogram); liveScannedHistogram = createColumnFamilyHistogram("LiveScannedHistogram", cfs.keyspace.metric.liveScannedHistogram); + colUpdateTimeDeltaHistogram = createColumnFamilyHistogram("ColUpdateTimeDeltaHistogram", cfs.keyspace.metric.colUpdateTimeDeltaHistogram); coordinatorReadLatency = Metrics.newTimer(factory.createMetricName("CoordinatorReadLatency"), TimeUnit.MICROSECONDS, TimeUnit.SECONDS); coordinatorScanLatency = Metrics.newTimer(factory.createMetricName("CoordinatorScanLatency"), TimeUnit.MICROSECONDS, TimeUnit.SECONDS); casPrepare = new LatencyMetrics(factory, "CasPrepare", cfs.keyspace.metric.casPrepare); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e0197607/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java index 7a768b8..0ea982e 100644 --- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java +++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java @@ -62,6 +62,8 @@ public class KeyspaceMetrics public final Histogram tombstoneScannedHistogram; /** Live cells scanned in queries on this Keyspace */ public final Histogram liveScannedHistogram; + /** Column update time delta on this Keyspace */ + public final Histogram colUpdateTimeDeltaHistogram; /** CAS Prepare metric */ public final LatencyMetrics casPrepare; /** CAS Propose metrics */ @@ -155,6 +157,7 @@ public class KeyspaceMetrics sstablesPerReadHistogram = Metrics.newHistogram(factory.createMetricName("SSTablesPerReadHistogram"), true); tombstoneScannedHistogram = Metrics.newHistogram(factory.createMetricName("TombstoneScannedHistogram"), true); liveScannedHistogram = Metrics.newHistogram(factory.createMetricName("LiveScannedHistogram"), true); + colUpdateTimeDeltaHistogram = Metrics.newHistogram(factory.createMetricName("ColUpdateTimeDeltaHistogram"), true); // add manually since histograms do not use createKeyspaceGauge method allMetrics.addAll(Lists.newArrayList("SSTablesPerReadHistogram", "TombstoneScannedHistogram", "LiveScannedHistogram"));