cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject cassandra git commit: Optimise max purgeable timestamp calculation in compaction
Date Tue, 31 Mar 2015 16:32:48 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 16499ca9b -> 8b3221a8f


Optimise max purgeable timestamp calculation in compaction

patch by benedict; reviewed by marcus for CASSANDRA-8920


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b3221a8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b3221a8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b3221a8

Branch: refs/heads/trunk
Commit: 8b3221a8fcdcafc78e4cee908768b9f8612df31e
Parents: 16499ca
Author: Benedict Elliott Smith <benedict@apache.org>
Authored: Tue Mar 31 17:32:18 2015 +0100
Committer: Benedict Elliott Smith <benedict@apache.org>
Committed: Tue Mar 31 17:32:18 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/DataTracker.java    |   7 +-
 .../db/compaction/CompactionController.java     |  21 ++--
 .../cassandra/utils/AsymmetricOrdering.java     |   3 -
 .../org/apache/cassandra/utils/Interval.java    |   2 +
 .../apache/cassandra/utils/OverlapIterator.java |  58 ++++++++++
 .../cassandra/utils/OverlapIteratorTest.java    | 114 +++++++++++++++++++
 7 files changed, 191 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b3221a8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 22bdc5e..e51be76 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Optimise max purgeable timestamp calculation in compaction (CASSANDRA-8920)
  * Constrain internode message buffer sizes, and improve IO class hierarchy (CASSANDRA-8670)

  * New tool added to validate all sstables in a node (CASSANDRA-5791)
  * Push notification when tracing completes for an operation (CASSANDRA-7807)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b3221a8/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index dd1dc5a..6de0b2c 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -575,10 +575,15 @@ public class DataTracker
 
     public static SSTableIntervalTree buildIntervalTree(Iterable<SSTableReader> sstables)
     {
+        return new SSTableIntervalTree(buildIntervals(sstables));
+    }
+
+    public static List<Interval<RowPosition, SSTableReader>> buildIntervals(Iterable<SSTableReader>
sstables)
+    {
         List<Interval<RowPosition, SSTableReader>> intervals = new ArrayList<>(Iterables.size(sstables));
         for (SSTableReader sstable : sstables)
             intervals.add(Interval.<RowPosition, SSTableReader>create(sstable.first,
sstable.last, sstable));
-        return new SSTableIntervalTree(intervals);
+        return intervals;
     }
 
     public Set<SSTableReader> getCompacting()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b3221a8/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 148b1b6..a49a3ea 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -17,24 +17,22 @@
  */
 package org.apache.cassandra.db.compaction;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DataTracker;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.utils.AlwaysPresentFilter;
 
+import org.apache.cassandra.utils.OverlapIterator;
 import org.apache.cassandra.utils.concurrent.Refs;
 
+import static org.apache.cassandra.db.DataTracker.buildIntervals;
+
 /**
  * Manage compaction options.
  */
@@ -43,8 +41,8 @@ public class CompactionController implements AutoCloseable
     private static final Logger logger = LoggerFactory.getLogger(CompactionController.class);
 
     public final ColumnFamilyStore cfs;
-    private DataTracker.SSTableIntervalTree overlappingTree;
     private Refs<SSTableReader> overlappingSSTables;
+    private OverlapIterator<RowPosition, SSTableReader> overlapIterator;
     private final Iterable<SSTableReader> compacting;
 
     public final int gcBefore;
@@ -84,7 +82,7 @@ public class CompactionController implements AutoCloseable
             overlappingSSTables = Refs.tryRef(Collections.<SSTableReader>emptyList());
         else
             overlappingSSTables = cfs.getAndReferenceOverlappingSSTables(compacting);
-        this.overlappingTree = DataTracker.buildIntervalTree(overlappingSSTables);
+        this.overlapIterator = new OverlapIterator<>(buildIntervals(overlappingSSTables));
     }
 
     public Set<SSTableReader> getFullyExpiredSSTables()
@@ -170,9 +168,9 @@ public class CompactionController implements AutoCloseable
      */
     public long maxPurgeableTimestamp(DecoratedKey key)
     {
-        List<SSTableReader> filteredSSTables = overlappingTree.search(key);
         long min = Long.MAX_VALUE;
-        for (SSTableReader sstable : filteredSSTables)
+        overlapIterator.update(key);
+        for (SSTableReader sstable : overlapIterator.overlaps())
         {
             // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing),
             // we check index file instead.
@@ -193,4 +191,5 @@ public class CompactionController implements AutoCloseable
     {
         overlappingSSTables.release();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b3221a8/src/java/org/apache/cassandra/utils/AsymmetricOrdering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/AsymmetricOrdering.java b/src/java/org/apache/cassandra/utils/AsymmetricOrdering.java
index ed8c99f..74597f5 100644
--- a/src/java/org/apache/cassandra/utils/AsymmetricOrdering.java
+++ b/src/java/org/apache/cassandra/utils/AsymmetricOrdering.java
@@ -93,7 +93,6 @@ public abstract class AsymmetricOrdering<T1, T2> extends Ordering<T1>
             // { a[m] >= v   ==>   a[ub] >= v   ==>   a[lb] < v ^ a[ub] >=
v }
             // { a[m] <  v   ==>   a[lb] <  v   ==>   a[lb] < v ^ a[ub] >=
v }
         }
-
         throw new IllegalStateException();
     }
 
@@ -116,8 +115,6 @@ public abstract class AsymmetricOrdering<T1, T2> extends Ordering<T1>
         throw new IllegalStateException();
     }
 
-
-
     private class Reversed extends AsymmetricOrdering<T1, T2>
     {
         public int compareAsymmetric(T1 left, T2 right)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b3221a8/src/java/org/apache/cassandra/utils/Interval.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Interval.java b/src/java/org/apache/cassandra/utils/Interval.java
index 9398144..335ef27 100644
--- a/src/java/org/apache/cassandra/utils/Interval.java
+++ b/src/java/org/apache/cassandra/utils/Interval.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.utils;
 
+import java.util.Comparator;
+
 import com.google.common.base.Objects;
 
 public class Interval<C, D>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b3221a8/src/java/org/apache/cassandra/utils/OverlapIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/OverlapIterator.java b/src/java/org/apache/cassandra/utils/OverlapIterator.java
new file mode 100644
index 0000000..bc43742
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/OverlapIterator.java
@@ -0,0 +1,58 @@
+package org.apache.cassandra.utils;
+
+import java.util.*;
+
+import org.apache.cassandra.utils.AsymmetricOrdering.Op;
+
+/**
+ * A class for iterating sequentially through an ordered collection and efficiently
+ * finding the overlapping set of matching intervals.
+ *
+ * The algorithm is quite simple: the intervals are sorted ascending by both min and max
+ * in two separate lists. These lists are walked forwards each time we visit a new point,
+ * with the set of intervals in the min-ordered list being added to our set of overlaps,
+ * and those in the max-ordered list being removed.
+ */
+public class OverlapIterator<I extends Comparable<? super I>, V>
+{
+    // indexing into sortedByMin, tracks the next interval to include
+    int nextToInclude;
+    final List<Interval<I, V>> sortedByMin;
+    // indexing into sortedByMax, tracks the next interval to exclude
+    int nextToExclude;
+    final List<Interval<I, V>> sortedByMax;
+    final Set<V> overlaps = new HashSet<>();
+    final Set<V> accessible = Collections.unmodifiableSet(overlaps);
+
+    I mostRecent;
+
+    public OverlapIterator(Collection<Interval<I, V>> intervals)
+    {
+        sortedByMax = new ArrayList<>(intervals);
+        Collections.sort(sortedByMax, Interval.<I, V>maxOrdering());
+        // we clone after first sorting by max;  this is quite likely to make sort cheaper,
since a.max < b.max
+        // generally increases the likelihood that a.min < b.min, so the list may be partially
sorted already.
+        // this also means if (in future) we sort either collection (or a subset thereof)
by the other's comparator
+        // all items, including equal, will occur in the same order, including
+        sortedByMin = new ArrayList<>(sortedByMax);
+        Collections.sort(sortedByMin, Interval.<I, V>minOrdering());
+    }
+
+    // move the iterator forwards to the overlaps matching point
+    public void update(I point)
+    {
+        // we don't use binary search here since we expect points to be a superset of the
min/max values
+        mostRecent = point;
+        // add those we are now after the start of
+        while (nextToInclude < sortedByMin.size() && sortedByMin.get(nextToInclude).min.compareTo(point)
<= 0)
+            overlaps.add(sortedByMin.get(nextToInclude++).data);
+        // remove those we are now after the end of
+        while (nextToExclude < sortedByMax.size() && sortedByMax.get(nextToExclude).max.compareTo(point)
< 0)
+            overlaps.remove(sortedByMax.get(nextToExclude++).data);
+    }
+
+    public Set<V> overlaps()
+    {
+        return accessible;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b3221a8/test/unit/org/apache/cassandra/utils/OverlapIteratorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/OverlapIteratorTest.java b/test/unit/org/apache/cassandra/utils/OverlapIteratorTest.java
new file mode 100644
index 0000000..5bbe267
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/OverlapIteratorTest.java
@@ -0,0 +1,114 @@
+package org.apache.cassandra.utils;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class OverlapIteratorTest
+{
+
+    private static List<Interval<Integer, Integer>> randomIntervals(int range,
int increment, int count)
+    {
+        List<Integer> a = random(range, increment, count);
+        List<Integer> b = random(range, increment, count);
+        List<Interval<Integer, Integer>> r = new ArrayList<>();
+        for (int i = 0 ; i < count ; i++)
+        {
+            r.add(a.get(i) < b.get(i) ? Interval.create(a.get(i), b.get(i), i)
+                                      : Interval.create(b.get(i), a.get(i), i));
+        }
+        return r;
+    }
+
+    private static List<Integer> random(int range, int increment, int count)
+    {
+        List<Integer> random = new ArrayList<>();
+        for (int i = 0 ; i < count ; i++)
+        {
+            int base = i * increment;
+            random.add(ThreadLocalRandom.current().nextInt(base, base + range));
+        }
+        return random;
+    }
+
+    @Test
+    public void test()
+    {
+        for (int i = 0 ; i < 10 ; i++)
+        {
+            test(1000, 0, 1000);
+            test(100000, 100, 1000);
+            test(1000000, 0, 1000);
+        }
+    }
+
+    private void test(int range, int increment, int count)
+    {
+        compare(randomIntervals(range, increment, count), random(range, increment, count),
1);
+        compare(randomIntervals(range, increment, count), random(range, increment, count),
2);
+        compare(randomIntervals(range, increment, count), random(range, increment, count),
3);
+    }
+
+    private <I extends Comparable<I>, V> void compare(List<Interval<I,
V>> intervals, List<I> points, int initCount)
+    {
+        Collections.sort(points);
+        IntervalTree<I, V, Interval<I, V>> tree = IntervalTree.build(intervals);
+        OverlapIterator<I, V> iter = new OverlapIterator<>(intervals);
+        int initPoint = points.size() / initCount;
+        int i = 0;
+        for (I point : points)
+        {
+            if (i++ == initPoint)
+                iter = new OverlapIterator<>(intervals);
+            iter.update(point);
+            TreeSet<V> act = new TreeSet<>(iter.overlaps);
+            TreeSet<V> exp = new TreeSet<>(tree.search(point));
+            TreeSet<V> extra = new TreeSet<>(act);
+            extra.removeAll(exp);
+            TreeSet<V> missing = new TreeSet<>(exp);
+            missing.removeAll(act);
+            assertTrue(extra.isEmpty());
+            assertTrue(missing.isEmpty());
+        }
+    }
+
+}


Mime
View raw message