cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject cassandra git commit: Allow per partition limit in SELECT queries
Date Thu, 07 Apr 2016 09:42:07 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk bd633377a -> 2fbddbd99


Allow per partition limit in SELECT queries

Patch by Alex Petrov; reviewed by Sylvain Lebresne for CASSANDRA-7017.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2fbddbd9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2fbddbd9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2fbddbd9

Branch: refs/heads/trunk
Commit: 2fbddbd9926eac07031196f74c637730a3953dce
Parents: bd63337
Author: Alex Petrov <oleksandr.petrov@gmail.com>
Authored: Tue Apr 5 17:45:59 2016 +0200
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Thu Apr 7 11:41:49 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/antlr/Lexer.g                               |  2 +
 src/antlr/Parser.g                              |  6 +-
 .../cql3/statements/CreateViewStatement.java    |  2 +-
 .../cql3/statements/SelectStatement.java        | 57 ++++++++----
 .../apache/cassandra/db/filter/DataLimits.java  | 15 +++-
 .../cql3/validation/operations/SelectTest.java  | 95 +++++++++++++++++++-
 7 files changed, 155 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2fbddbd9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e522035..4631178 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.6
+ * Allow per-partition LIMIT clause in CQL (CASSANDRA-7017)
  * Make custom filtering more extensible with UserExpression (CASSANDRA-11295)
  * Improve field-checking and error reporting in cassandra.yaml (CASSANDRA-10649)
  * Print CAS stats in nodetool proxyhistograms (CASSANDRA-11507)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2fbddbd9/src/antlr/Lexer.g
----------------------------------------------------------------------
diff --git a/src/antlr/Lexer.g b/src/antlr/Lexer.g
index c73ccaa..d93a5eb 100644
--- a/src/antlr/Lexer.g
+++ b/src/antlr/Lexer.g
@@ -69,6 +69,8 @@ K_INSERT:      I N S E R T;
 K_UPDATE:      U P D A T E;
 K_WITH:        W I T H;
 K_LIMIT:       L I M I T;
+K_PER:         P E R;
+K_PARTITION:   P A R T I T I O N;
 K_USING:       U S I N G;
 K_USE:         U S E;
 K_DISTINCT:    D I S T I N C T;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2fbddbd9/src/antlr/Parser.g
----------------------------------------------------------------------
diff --git a/src/antlr/Parser.g b/src/antlr/Parser.g
index 78a2d0d..0b21775 100644
--- a/src/antlr/Parser.g
+++ b/src/antlr/Parser.g
@@ -221,6 +221,7 @@ selectStatement returns [SelectStatement.RawStatement expr]
     @init {
         boolean isDistinct = false;
         Term.Raw limit = null;
+        Term.Raw perPartitionLimit = null;
         Map<ColumnIdentifier.Raw, Boolean> orderings = new LinkedHashMap<ColumnIdentifier.Raw,
Boolean>();
         boolean allowFiltering = false;
         boolean isJson = false;
@@ -231,6 +232,7 @@ selectStatement returns [SelectStatement.RawStatement expr]
       K_FROM cf=columnFamilyName
       ( K_WHERE wclause=whereClause )?
       ( K_ORDER K_BY orderByClause[orderings] ( ',' orderByClause[orderings] )* )?
+      ( K_PER K_PARTITION K_LIMIT rows=intValue { perPartitionLimit = rows; } )?
       ( K_LIMIT rows=intValue { limit = rows; } )?
       ( K_ALLOW K_FILTERING  { allowFiltering = true; } )?
       {
@@ -239,7 +241,7 @@ selectStatement returns [SelectStatement.RawStatement expr]
                                                                              allowFiltering,
                                                                              isJson);
           WhereClause where = wclause == null ? WhereClause.empty() : wclause.build();
-          $expr = new SelectStatement.RawStatement(cf, params, sclause, where, limit);
+          $expr = new SelectStatement.RawStatement(cf, params, sclause, where, limit, perPartitionLimit);
       }
     ;
 
@@ -1570,5 +1572,7 @@ basic_unreserved_keyword returns [String str]
         | K_CALLED
         | K_INPUT
         | K_LIKE
+        | K_PER
+        | K_PARTITION
         ) { $str = $k.text; }
     ;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2fbddbd9/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
index 5af4887..909fe4f 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
@@ -195,7 +195,7 @@ public class CreateViewStatement extends SchemaAlteringStatement
         // build the select statement
         Map<ColumnIdentifier.Raw, Boolean> orderings = Collections.emptyMap();
         SelectStatement.Parameters parameters = new SelectStatement.Parameters(orderings,
false, true, false);
-        SelectStatement.RawStatement rawSelect = new SelectStatement.RawStatement(baseName,
parameters, selectClause, whereClause, null);
+        SelectStatement.RawStatement rawSelect = new SelectStatement.RawStatement(baseName,
parameters, selectClause, whereClause, null, null);
 
         ClientState state = ClientState.forInternalCalls();
         state.setKeyspace(keyspace());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2fbddbd9/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 2f4d468..9b68d7a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -60,6 +60,7 @@ import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
 import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkNull;
 import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue;
 import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
 import static org.apache.cassandra.utils.ByteBufferUtil.UNSET_BYTE_BUFFER;
@@ -83,6 +84,7 @@ public class SelectStatement implements CQLStatement
     public final Parameters parameters;
     private final Selection selection;
     private final Term limit;
+    private final Term perPartitionLimit;
 
     private final StatementRestrictions restrictions;
 
@@ -105,7 +107,8 @@ public class SelectStatement implements CQLStatement
                            StatementRestrictions restrictions,
                            boolean isReversed,
                            Comparator<List<ByteBuffer>> orderingComparator,
-                           Term limit)
+                           Term limit,
+                           Term perPartitionLimit)
     {
         this.cfm = cfm;
         this.boundTerms = boundTerms;
@@ -115,6 +118,7 @@ public class SelectStatement implements CQLStatement
         this.orderingComparator = orderingComparator;
         this.parameters = parameters;
         this.limit = limit;
+        this.perPartitionLimit = perPartitionLimit;
         this.queriedColumns = gatherQueriedColumns();
     }
 
@@ -122,7 +126,8 @@ public class SelectStatement implements CQLStatement
     {
         return Iterables.concat(selection.getFunctions(),
                                 restrictions.getFunctions(),
-                                limit != null ? limit.getFunctions() : Collections.<Function>emptySet());
+                                limit != null ? limit.getFunctions() : Collections.<Function>emptySet(),
+                                perPartitionLimit != null ? perPartitionLimit.getFunctions()
: Collections.<Function>emptySet());
     }
 
     // Note that the queried columns internally is different from the one selected by the
@@ -154,6 +159,7 @@ public class SelectStatement implements CQLStatement
                                    StatementRestrictions.empty(StatementType.SELECT, cfm),
                                    false,
                                    null,
+                                   null,
                                    null);
     }
 
@@ -197,8 +203,9 @@ public class SelectStatement implements CQLStatement
         cl.validateForRead(keyspace());
 
         int nowInSec = FBUtilities.nowInSeconds();
-        int userLimit = getLimit(options);
-        ReadQuery query = getQuery(options, nowInSec, userLimit);
+        int userLimit = getLimit(limit, options);
+        int userPerPartitionLimit = getLimit(perPartitionLimit, options);
+        ReadQuery query = getQuery(options, nowInSec, userLimit, userPerPartitionLimit);
 
         int pageSize = getPageSize(options);
 
@@ -224,12 +231,12 @@ public class SelectStatement implements CQLStatement
 
     public ReadQuery getQuery(QueryOptions options, int nowInSec) throws RequestValidationException
     {
-        return getQuery(options, nowInSec, getLimit(options));
+        return getQuery(options, nowInSec, getLimit(limit, options), getLimit(perPartitionLimit,
options));
     }
 
-    public ReadQuery getQuery(QueryOptions options, int nowInSec, int userLimit) throws RequestValidationException
+    public ReadQuery getQuery(QueryOptions options, int nowInSec, int userLimit, int perPartitionLimit)
throws RequestValidationException
     {
-        DataLimits limit = getDataLimits(userLimit);
+        DataLimits limit = getDataLimits(userLimit, perPartitionLimit);
         if (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing())
             return getRangeCommand(options, limit, nowInSec);
 
@@ -386,8 +393,9 @@ public class SelectStatement implements CQLStatement
     public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws
RequestExecutionException, RequestValidationException
     {
         int nowInSec = FBUtilities.nowInSeconds();
-        int userLimit = getLimit(options);
-        ReadQuery query = getQuery(options, nowInSec, userLimit);
+        int userLimit = getLimit(limit, options);
+        int userPerPartitionLimit = getLimit(perPartitionLimit, options);
+        ReadQuery query = getQuery(options, nowInSec, userLimit, userPerPartitionLimit);
         int pageSize = getPageSize(options);
 
         try (ReadExecutionController executionController = query.executionController())
@@ -409,7 +417,7 @@ public class SelectStatement implements CQLStatement
 
     public ResultSet process(PartitionIterator partitions, int nowInSec) throws InvalidRequestException
     {
-        return process(partitions, QueryOptions.DEFAULT, nowInSec, getLimit(QueryOptions.DEFAULT));
+        return process(partitions, QueryOptions.DEFAULT, nowInSec, getLimit(limit, QueryOptions.DEFAULT));
     }
 
     public String keyspace()
@@ -576,9 +584,10 @@ public class SelectStatement implements CQLStatement
         return builder.build();
     }
 
-    private DataLimits getDataLimits(int userLimit)
+    private DataLimits getDataLimits(int userLimit, int perPartitionLimit)
     {
         int cqlRowLimit = DataLimits.NO_LIMIT;
+        int cqlPerPartitionLimit = DataLimits.NO_LIMIT;
 
         // If we aggregate, the limit really apply to the number of rows returned to the
user, not to what is queried, and
         // since in practice we currently only aggregate at top level (we have no GROUP BY
support yet), we'll only ever
@@ -587,12 +596,15 @@ public class SelectStatement implements CQLStatement
         // able to apply the user limit properly.
         // If we do post ordering we need to get all the results sorted before we can trim
them.
         if (!selection.isAggregate() && !needsPostQueryOrdering())
+        {
             cqlRowLimit = userLimit;
+            cqlPerPartitionLimit = perPartitionLimit;
+        }
 
         if (parameters.isDistinct)
             return cqlRowLimit == DataLimits.NO_LIMIT ? DataLimits.DISTINCT_NONE : DataLimits.distinctLimits(cqlRowLimit);
 
-        return cqlRowLimit == DataLimits.NO_LIMIT ? DataLimits.NONE : DataLimits.cqlLimits(cqlRowLimit);
+        return DataLimits.cqlLimits(cqlRowLimit, cqlPerPartitionLimit);
     }
 
     /**
@@ -602,7 +614,7 @@ public class SelectStatement implements CQLStatement
      * @return the limit specified by the user or <code>DataLimits.NO_LIMIT</code>
if no value
      * as been specified.
      */
-    public int getLimit(QueryOptions options)
+    public int getLimit(Term limit, QueryOptions options)
     {
         int userLimit = DataLimits.NO_LIMIT;
 
@@ -784,14 +796,20 @@ public class SelectStatement implements CQLStatement
         public final List<RawSelector> selectClause;
         public final WhereClause whereClause;
         public final Term.Raw limit;
+        public final Term.Raw perPartitionLimit;
 
-        public RawStatement(CFName cfName, Parameters parameters, List<RawSelector>
selectClause, WhereClause whereClause, Term.Raw limit)
+        public RawStatement(CFName cfName, Parameters parameters,
+                            List<RawSelector> selectClause,
+                            WhereClause whereClause,
+                            Term.Raw limit,
+                            Term.Raw perPartitionLimit)
         {
             super(cfName);
             this.parameters = parameters;
             this.selectClause = selectClause;
             this.whereClause = whereClause;
             this.limit = limit;
+            this.perPartitionLimit = perPartitionLimit;
         }
 
         public ParsedStatement.Prepared prepare() throws InvalidRequestException
@@ -811,7 +829,10 @@ public class SelectStatement implements CQLStatement
             StatementRestrictions restrictions = prepareRestrictions(cfm, boundNames, selection,
forView);
 
             if (parameters.isDistinct)
+            {
+                checkNull(perPartitionLimit, "PER PARTITION LIMIT is not allowed with SELECT
DISTINCT queries");
                 validateDistinctSelection(cfm, selection, restrictions);
+            }
 
             Comparator<List<ByteBuffer>> orderingComparator = null;
             boolean isReversed = false;
@@ -835,7 +856,8 @@ public class SelectStatement implements CQLStatement
                                                         restrictions,
                                                         isReversed,
                                                         orderingComparator,
-                                                        prepareLimit(boundNames));
+                                                        prepareLimit(boundNames, limit, keyspace(),
limitReceiver()),
+                                                        prepareLimit(boundNames, perPartitionLimit,
keyspace(), limitReceiver()));
 
             return new ParsedStatement.Prepared(stmt, boundNames, boundNames.getPartitionKeyBindIndexes(cfm));
         }
@@ -874,12 +896,13 @@ public class SelectStatement implements CQLStatement
         }
 
         /** Returns a Term for the limit or null if no limit is set */
-        private Term prepareLimit(VariableSpecifications boundNames) throws InvalidRequestException
+        private Term prepareLimit(VariableSpecifications boundNames, Term.Raw limit,
+                                  String keyspace, ColumnSpecification limitReceiver) throws
InvalidRequestException
         {
             if (limit == null)
                 return null;
 
-            Term prepLimit = limit.prepare(keyspace(), limitReceiver());
+            Term prepLimit = limit.prepare(keyspace, limitReceiver);
             prepLimit.collectMarkerSpecification(boundNames);
             return prepLimit;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2fbddbd9/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index f6fdcdd..ebd2ebb 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -72,12 +72,21 @@ public abstract class DataLimits
 
     public static DataLimits cqlLimits(int cqlRowLimit)
     {
-        return new CQLLimits(cqlRowLimit);
+        return cqlRowLimit == NO_LIMIT ? NONE : new CQLLimits(cqlRowLimit);
     }
 
     public static DataLimits cqlLimits(int cqlRowLimit, int perPartitionLimit)
     {
-        return new CQLLimits(cqlRowLimit, perPartitionLimit);
+        return cqlRowLimit == NO_LIMIT && perPartitionLimit == NO_LIMIT
+             ? NONE
+             : new CQLLimits(cqlRowLimit, perPartitionLimit);
+    }
+
+    private static DataLimits cqlLimits(int cqlRowLimit, int perPartitionLimit, boolean isDistinct)
+    {
+        return cqlRowLimit == NO_LIMIT && perPartitionLimit == NO_LIMIT &&
!isDistinct
+             ? NONE
+             : new CQLLimits(cqlRowLimit, perPartitionLimit, isDistinct);
     }
 
     public static DataLimits distinctLimits(int cqlRowLimit)
@@ -766,7 +775,7 @@ public abstract class DataLimits
                     int perPartitionLimit = (int)in.readUnsignedVInt();
                     boolean isDistinct = in.readBoolean();
                     if (kind == Kind.CQL_LIMIT)
-                        return new CQLLimits(rowLimit, perPartitionLimit, isDistinct);
+                        return cqlLimits(rowLimit, perPartitionLimit, isDistinct);
 
                     ByteBuffer lastKey = ByteBufferUtil.readWithVIntLength(in);
                     int lastRemaining = (int)in.readUnsignedVInt();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2fbddbd9/test/unit/org/apache/cassandra/cql3/validation/operations/SelectTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectTest.java
index 077712e..be62f6c 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectTest.java
@@ -2353,4 +2353,97 @@ public class SelectTest extends CQLTester
                              "SELECT * FROM %s WHERE c CONTAINS KEY ? ALLOW FILTERING",
                              unset());
     }
-}
+
+    @Test
+    public void testPerPartitionLimit() throws Throwable
+    {
+        perPartitionLimitTest(false);
+    }
+
+    @Test
+    public void testPerPartitionLimitWithCompactStorage() throws Throwable
+    {
+        perPartitionLimitTest(true);
+    }
+
+    private void perPartitionLimitTest(boolean withCompactStorage) throws Throwable
+    {
+        String query = "CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))";
+
+        if (withCompactStorage)
+            createTable(query + " WITH COMPACT STORAGE");
+        else
+            createTable(query);
+
+        for (int i = 0; i < 5; i++)
+        {
+            for (int j = 0; j < 5; j++)
+            {
+                execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", i, j, j);
+            }
+        }
+
+        assertInvalidMessage("LIMIT must be strictly positive",
+                             "SELECT * FROM %s PER PARTITION LIMIT ?", 0);
+        assertInvalidMessage("LIMIT must be strictly positive",
+                             "SELECT * FROM %s PER PARTITION LIMIT ?", -1);
+
+        assertRowsIgnoringOrder(execute("SELECT * FROM %s PER PARTITION LIMIT ?", 2),
+                                row(0, 0, 0),
+                                row(0, 1, 1),
+                                row(1, 0, 0),
+                                row(1, 1, 1),
+                                row(2, 0, 0),
+                                row(2, 1, 1),
+                                row(3, 0, 0),
+                                row(3, 1, 1),
+                                row(4, 0, 0),
+                                row(4, 1, 1));
+
+
+        // Combined Per Partition and "global" limit
+        assertRowCount(execute("SELECT * FROM %s PER PARTITION LIMIT ? LIMIT ?", 2, 6),
+                       6);
+
+        // odd amount of results
+        assertRowCount(execute("SELECT * FROM %s PER PARTITION LIMIT ? LIMIT ?", 2, 5),
+                       5);
+
+        // IN query
+        assertRowsIgnoringOrder(execute("SELECT * FROM %s WHERE a IN (2,3) PER PARTITION
LIMIT ?", 2),
+                                row(2, 0, 0),
+                                row(2, 1, 1),
+                                row(3, 0, 0),
+                                row(3, 1, 1));
+
+        assertRowCount(execute("SELECT * FROM %s WHERE a IN (2,3) PER PARTITION LIMIT ? LIMIT
3", 2), 3);
+        assertRowCount(execute("SELECT * FROM %s WHERE a IN (1,2,3) PER PARTITION LIMIT ?
LIMIT 3", 2), 3);
+
+
+        // with restricted partition key
+        assertRows(execute("SELECT * FROM %s WHERE a = ? PER PARTITION LIMIT ?", 2, 3),
+                   row(2, 0, 0),
+                   row(2, 1, 1),
+                   row(2, 2, 2));
+
+        // with ordering
+        assertRows(execute("SELECT * FROM %s WHERE a = ? ORDER BY b DESC PER PARTITION LIMIT
?", 2, 3),
+                   row(2, 4, 4),
+                   row(2, 3, 3),
+                   row(2, 2, 2));
+
+        // with filtering
+        assertRows(execute("SELECT * FROM %s WHERE a = ? AND b > ? PER PARTITION LIMIT
? ALLOW FILTERING", 2, 0, 2),
+                   row(2, 1, 1),
+                   row(2, 2, 2));
+
+        assertRows(execute("SELECT * FROM %s WHERE a = ? AND b > ? ORDER BY b DESC PER
PARTITION LIMIT ? ALLOW FILTERING", 2, 2, 2),
+                   row(2, 4, 4),
+                   row(2, 3, 3));
+
+        assertInvalidMessage("PER PARTITION LIMIT is not allowed with SELECT DISTINCT queries",
+                             "SELECT DISTINCT a FROM %s PER PARTITION LIMIT ?", 3);
+        assertInvalidMessage("PER PARTITION LIMIT is not allowed with SELECT DISTINCT queries",
+                             "SELECT DISTINCT a FROM %s PER PARTITION LIMIT ? LIMIT ?", 3,
4);
+    }
+}
\ No newline at end of file


Mime
View raw message