cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject cassandra git commit: Add support for top-k custom 2i queries
Date Tue, 05 May 2015 17:46:41 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 5bae5a313 -> 4c7c5be79


Add support for top-k custom 2i queries

patch by Andrés de la Peña; reviewed by Sam Tunnicliffe for
CASSANDRA-8717


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4c7c5be7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4c7c5be7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4c7c5be7

Branch: refs/heads/cassandra-2.1
Commit: 4c7c5be798e2a7d1e72d086bc5011242ea0173dc
Parents: 5bae5a3
Author: Andrés de la Peña <adelapena@stratio.com>
Authored: Tue May 5 20:45:09 2015 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Tue May 5 20:46:12 2015 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/AbstractRangeCommand.java      | 23 ++++++++
 .../db/index/SecondaryIndexManager.java         | 48 ++++++++++------
 .../db/index/SecondaryIndexSearcher.java        | 44 ++++++++++++---
 .../db/index/composites/CompositesSearcher.java |  2 +-
 .../cassandra/db/index/keys/KeysSearcher.java   |  2 +-
 .../apache/cassandra/service/StorageProxy.java  | 58 ++++++++++++--------
 7 files changed, 127 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c7c5be7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 64d0760..da14ca3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.6
+ * Add support for top-k custom 2i queries (CASSANDRA-8717)
  * Fix error when dropping table during compaction (CASSANDRA-9251)
  * cassandra-stress supports validation operations over user profiles (CASSANDRA-8773)
  * Add support for rate limiting log messages (CASSANDRA-9029)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c7c5be7/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractRangeCommand.java b/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
index b358f1b..959b524 100644
--- a/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
+++ b/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.index.*;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.service.IReadCommand;
@@ -35,6 +36,8 @@ public abstract class AbstractRangeCommand implements IReadCommand
     public final IDiskAtomFilter predicate;
     public final List<IndexExpression> rowFilter;
 
+    public final SecondaryIndexSearcher searcher;
+
     public AbstractRangeCommand(String keyspace, String columnFamily, long timestamp, AbstractBounds<RowPosition>
keyRange, IDiskAtomFilter predicate, List<IndexExpression> rowFilter)
     {
         this.keyspace = keyspace;
@@ -43,6 +46,26 @@ public abstract class AbstractRangeCommand implements IReadCommand
         this.keyRange = keyRange;
         this.predicate = predicate;
         this.rowFilter = rowFilter;
+        SecondaryIndexManager indexManager = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily).indexManager;
+        this.searcher = indexManager.getHighestSelectivityIndexSearcher(rowFilter);
+    }
+
+    public boolean requiresScanningAllRanges()
+    {
+        return searcher != null && searcher.requiresScanningAllRanges(rowFilter);
+    }
+
+    public List<Row> postReconciliationProcessing(List<Row> rows)
+    {
+        return searcher == null ? trim(rows) : trim(searcher.postReconciliationProcessing(rowFilter,
rows));
+    }
+
+    private List<Row> trim(List<Row> rows)
+    {
+        if (countCQL3Rows())
+            return rows;
+        else
+            return rows.size() > limit() ? rows.subList(0, limit()) : rows;
     }
 
     public String getKeyspace()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c7c5be7/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index e4a9ff8..ab6df1e 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -638,25 +638,11 @@ public class SecondaryIndexManager
      */
     public List<Row> search(ExtendedFilter filter)
     {
-        List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(filter.getClause());
-
-        if (indexSearchers.isEmpty())
+        SecondaryIndexSearcher mostSelective = getHighestSelectivityIndexSearcher(filter.getClause());
+        if (mostSelective == null)
             return Collections.emptyList();
-
-        SecondaryIndexSearcher mostSelective = null;
-        long bestEstimate = Long.MAX_VALUE;
-        for (SecondaryIndexSearcher searcher : indexSearchers)
-        {
-            SecondaryIndex highestSelectivityIndex = searcher.highestSelectivityIndex(filter.getClause());
-            long estimate = highestSelectivityIndex.estimateResultRows();
-            if (estimate <= bestEstimate)
-            {
-                bestEstimate = estimate;
-                mostSelective = searcher;
-            }
-        }
-
-        return mostSelective.search(filter);
+        else
+            return mostSelective.search(filter);
     }
 
     public Set<SecondaryIndex> getIndexesByNames(Set<String> idxNames)
@@ -849,4 +835,30 @@ public class SecondaryIndexManager
         }
 
     }
+
+    public SecondaryIndexSearcher getHighestSelectivityIndexSearcher(List<IndexExpression>
clause)
+    {
+        if (clause == null)
+            return null;
+
+        List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(clause);
+
+        if (indexSearchers.isEmpty())
+            return null;
+
+        SecondaryIndexSearcher mostSelective = null;
+        long bestEstimate = Long.MAX_VALUE;
+        for (SecondaryIndexSearcher searcher : indexSearchers)
+        {
+            SecondaryIndex highestSelectivityIndex = searcher.highestSelectivityIndex(clause);
+            long estimate = highestSelectivityIndex.estimateResultRows();
+            if (estimate <= bestEstimate)
+            {
+                bestEstimate = estimate;
+                mostSelective = searcher;
+            }
+        }
+
+        return mostSelective;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c7c5be7/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index 93e0643..ab2cd75 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -43,7 +43,7 @@ public abstract class SecondaryIndexSearcher
 
     public SecondaryIndex highestSelectivityIndex(List<IndexExpression> clause)
     {
-        IndexExpression expr = highestSelectivityPredicate(clause);
+        IndexExpression expr = highestSelectivityPredicate(clause, false);
         return expr == null ? null : indexManager.getIndexForColumn(expr.column);
     }
 
@@ -77,7 +77,7 @@ public abstract class SecondaryIndexSearcher
     {
     }
 
-    protected IndexExpression highestSelectivityPredicate(List<IndexExpression> clause)
+    protected IndexExpression highestSelectivityPredicate(List<IndexExpression> clause,
boolean includeInTrace)
     {
         IndexExpression best = null;
         int bestMeanCount = Integer.MAX_VALUE;
@@ -102,12 +102,40 @@ public abstract class SecondaryIndexSearcher
             }
         }
 
-        if (best == null)
-            Tracing.trace("No applicable indexes found");
-        else
-            Tracing.trace("Candidate index mean cardinalities are {}. Scanning with {}.",
-                          FBUtilities.toString(candidates), indexManager.getIndexForColumn(best.column).getIndexName());
-
+        if (includeInTrace)
+        {
+            if (best == null)
+                Tracing.trace("No applicable indexes found");
+            else if (Tracing.isTracing())
+                // pay for an additional threadlocal get() rather than build the strings
unnecessarily
+                Tracing.trace("Candidate index mean cardinalities are {}. Scanning with {}.",
+                              FBUtilities.toString(candidates),
+                              indexManager.getIndexForColumn(best.column).getIndexName());
+        }
         return best;
     }
+
+    /**
+     * Returns {@code true} if the specified list of {@link IndexExpression}s require a full
scan of all the nodes.
+     *
+     * @param clause A list of {@link IndexExpression}s
+     * @return {@code true} if the {@code IndexExpression}s require a full scan, {@code false}
otherwise
+     */
+    public boolean requiresScanningAllRanges(List<IndexExpression> clause)
+    {
+        return false;
+    }
+
+    /**
+     * Combines index query results from multiple nodes. This is done by the coordinator
node after it has reconciled
+     * the replica responses.
+     *
+     * @param clause A list of {@link IndexExpression}s
+     * @param rows The index query results to be combined
+     * @return The combination of the index query results
+     */
+    public List<Row> postReconciliationProcessing(List<IndexExpression> clause,
List<Row> rows)
+    {
+        return rows;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c7c5be7/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
index 3e523f4..a2d08e7 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
@@ -59,7 +59,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher
     public List<Row> search(ExtendedFilter filter)
     {
         assert filter.getClause() != null && !filter.getClause().isEmpty();
-        final IndexExpression primary = highestSelectivityPredicate(filter.getClause());
+        final IndexExpression primary = highestSelectivityPredicate(filter.getClause(), true);
         final CompositesIndex index = (CompositesIndex)indexManager.getIndexForColumn(primary.column);
         // TODO: this should perhaps not open and maintain a writeOp for the full duration,
but instead only *try* to delete stale entries, without blocking if there's no room
         // as it stands, we open a writeOp and keep it open for the duration to ensure that
should this CF get flushed to make room we don't block the reclamation of any room being made

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c7c5be7/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index 4055b7c..634bb0c 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -53,7 +53,7 @@ public class KeysSearcher extends SecondaryIndexSearcher
     public List<Row> search(ExtendedFilter filter)
     {
         assert filter.getClause() != null && !filter.getClause().isEmpty();
-        final IndexExpression primary = highestSelectivityPredicate(filter.getClause());
+        final IndexExpression primary = highestSelectivityPredicate(filter.getClause(), true);
         final SecondaryIndex index = indexManager.getIndexForColumn(primary.column);
         // TODO: this should perhaps not open and maintain a writeOp for the full duration,
but instead only *try* to delete stale entries, without blocking if there's no room
         // as it stands, we open a writeOp and keep it open for the duration to ensure that
should this CF get flushed to make room we don't block the reclamation of any room  being
made

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c7c5be7/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index b41429e..1536e46 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1621,17 +1621,39 @@ public class StorageProxy implements StorageProxyMBean
             else
                 ranges = getRestrictedRanges(command.keyRange);
 
-            // our estimate of how many result rows there will be per-range
-            float resultRowsPerRange = estimateResultRowsPerRange(command, keyspace);
-            // underestimate how many rows we will get per-range in order to increase the
likelihood that we'll
-            // fetch enough rows in the first round
-            resultRowsPerRange -= resultRowsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
-            int concurrencyFactor = resultRowsPerRange == 0.0
+            // determine the number of rows to be fetched and the concurrency factor
+            int rowsToBeFetched = command.limit();
+            int concurrencyFactor;
+            if (command.requiresScanningAllRanges())
+            {
+                // all nodes must be queried
+                rowsToBeFetched *= ranges.size();
+                concurrencyFactor = ranges.size();
+                logger.debug("Requested rows: {}, ranges.size(): {}; concurrent range requests:
{}",
+                             command.limit(),
+                             ranges.size(),
+                             concurrencyFactor);
+                Tracing.trace("Submitting range requests on {} ranges with a concurrency
of {}",
+                              new Object[]{ ranges.size(), concurrencyFactor});
+            }
+            else
+            {
+                // our estimate of how many result rows there will be per-range
+                float resultRowsPerRange = estimateResultRowsPerRange(command, keyspace);
+                // underestimate how many rows we will get per-range in order to increase
the likelihood that we'll
+                // fetch enough rows in the first round
+                resultRowsPerRange -= resultRowsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
+                concurrencyFactor = resultRowsPerRange == 0.0
                                   ? 1
                                   : Math.max(1, Math.min(ranges.size(), (int) Math.ceil(command.limit()
/ resultRowsPerRange)));
-            logger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size():
{}; concurrent range requests: {}",
-                         resultRowsPerRange, command.limit(), ranges.size(), concurrencyFactor);
-            Tracing.trace("Submitting range requests on {} ranges with a concurrency of {}
({} rows per range expected)", new Object[]{ ranges.size(), concurrencyFactor, resultRowsPerRange});
+                logger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size():
{}; concurrent range requests: {}",
+                             resultRowsPerRange,
+                             command.limit(),
+                             ranges.size(),
+                             concurrencyFactor);
+                Tracing.trace("Submitting range requests on {} ranges with a concurrency
of {} ({} rows per range expected)",
+                              new Object[]{ ranges.size(), concurrencyFactor, resultRowsPerRange});
+            }
 
             boolean haveSufficientRows = false;
             int i = 0;
@@ -1723,7 +1745,6 @@ public class StorageProxy implements StorageProxyMBean
                 List<AsyncOneResponse> repairResponses = new ArrayList<>();
                 for (Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>>
cmdPairHandler : scanHandlers)
                 {
-                    AbstractRangeCommand nodeCmd = cmdPairHandler.left;
                     ReadCallback<RangeSliceReply, Iterable<Row>> handler = cmdPairHandler.right;
                     RangeSliceResponseResolver resolver = (RangeSliceResponseResolver)handler.resolver;
 
@@ -1765,7 +1786,7 @@ public class StorageProxy implements StorageProxyMBean
 
                     // if we're done, great, otherwise, move to the next range
                     int count = countLiveRows ? liveRowCount : rows.size();
-                    if (count >= nodeCmd.limit())
+                    if (count >= rowsToBeFetched)
                     {
                         haveSufficientRows = true;
                         break;
@@ -1788,14 +1809,14 @@ public class StorageProxy implements StorageProxyMBean
                 }
 
                 if (haveSufficientRows)
-                    return trim(command, rows);
+                    return command.postReconciliationProcessing(rows);
 
                 // we didn't get enough rows in our concurrent fetch; recalculate our concurrency
factor
                 // based on the results we've seen so far (as long as we still have ranges
left to query)
                 if (i < ranges.size())
                 {
                     float fetchedRows = countLiveRows ? liveRowCount : rows.size();
-                    float remainingRows = command.limit() - fetchedRows;
+                    float remainingRows = rowsToBeFetched - fetchedRows;
                     float actualRowsPerRange;
                     if (fetchedRows == 0.0)
                     {
@@ -1819,16 +1840,7 @@ public class StorageProxy implements StorageProxyMBean
             rangeMetrics.addNano(latency);
             Keyspace.open(command.keyspace).getColumnFamilyStore(command.columnFamily).metric.coordinatorScanLatency.update(latency,
TimeUnit.NANOSECONDS);
         }
-        return trim(command, rows);
-    }
-
-    private static List<Row> trim(AbstractRangeCommand command, List<Row> rows)
-    {
-        // for CQL3 queries, let the caller trim the results
-        if (command.countCQL3Rows() || command.ignoredTombstonedPartitions())
-            return rows;
-        else
-            return rows.size() > command.limit() ? rows.subList(0, command.limit()) :
rows;
+        return command.postReconciliationProcessing(rows);
     }
 
     public Map<String, List<String>> getSchemaVersions()


Mime
View raw message